result.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package prom
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/core/pkg/util"
  10. )
  11. var (
  12. // Static Warnings for data point parsing
  13. InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
  14. NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
  15. )
  16. func DataFieldFormatErr(query string, promResponse interface{}) error {
  17. return fmt.Errorf("Error parsing Prometheus response: 'data' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  18. }
  19. func DataPointFormatErr(query string, promResponse interface{}) error {
  20. return fmt.Errorf("Error parsing Prometheus response: improperly formatted datapoint. Query: '%s'. Response: '%+v'", query, promResponse)
  21. }
  22. func MetricFieldDoesNotExistErr(query string, promResponse interface{}) error {
  23. return fmt.Errorf("Error parsing Prometheus response: 'metric' field does not exist in data result vector. Query: '%s'. Response: '%+v'", query, promResponse)
  24. }
  25. func MetricFieldFormatErr(query string, promResponse interface{}) error {
  26. return fmt.Errorf("Error parsing Prometheus response: 'metric' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  27. }
  28. func NoDataErr(query string) error {
  29. return source.NewNoDataError(query)
  30. }
  31. func PromUnexpectedResponseErr(query string, promResponse interface{}) error {
  32. return fmt.Errorf("Error parsing Prometheus response: unexpected response. Query: '%s'. Response: '%+v'", query, promResponse)
  33. }
  34. func QueryResultNilErr(query string) error {
  35. return source.NewCommError(query)
  36. }
  37. func ResultFieldDoesNotExistErr(query string, promResponse interface{}) error {
  38. return fmt.Errorf("Error parsing Prometheus response: 'result' field does not exist. Query: '%s'. Response: '%+v'", query, promResponse)
  39. }
  40. func ResultFieldFormatErr(query string, promResponse interface{}) error {
  41. return fmt.Errorf("Error parsing Prometheus response: 'result' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  42. }
  43. func ResultFormatErr(query string, promResponse interface{}) error {
  44. return fmt.Errorf("Error parsing Prometheus response: 'result' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  45. }
  46. func ValueFieldDoesNotExistErr(query string, promResponse interface{}) error {
  47. return fmt.Errorf("Error parsing Prometheus response: 'value' field does not exist in data result vector. Query: '%s'. Response: '%+v'", query, promResponse)
  48. }
  49. // NewQueryResultError returns a QueryResults object with an error set and does not parse a result.
  50. func NewQueryResultError(query string, err error) *source.QueryResults {
  51. qrs := source.NewQueryResults(query)
  52. qrs.Error = err
  53. return qrs
  54. }
  55. // NewQueryResults accepts the raw prometheus query result and returns an array of
  56. // QueryResult objects
  57. func NewQueryResults(query string, queryResult interface{}, resultKeys *source.ResultKeys) *source.QueryResults {
  58. qrs := source.NewQueryResults(query)
  59. if queryResult == nil {
  60. qrs.Error = QueryResultNilErr(query)
  61. return qrs
  62. }
  63. data, ok := queryResult.(map[string]interface{})["data"]
  64. if !ok {
  65. e, err := wrapPrometheusError(query, queryResult)
  66. if err != nil {
  67. qrs.Error = err
  68. return qrs
  69. }
  70. qrs.Error = fmt.Errorf("%s", e)
  71. return qrs
  72. }
  73. // Deep Check for proper formatting
  74. d, ok := data.(map[string]interface{})
  75. if !ok {
  76. qrs.Error = DataFieldFormatErr(query, data)
  77. return qrs
  78. }
  79. resultData, ok := d["result"]
  80. if !ok {
  81. qrs.Error = ResultFieldDoesNotExistErr(query, d)
  82. return qrs
  83. }
  84. resultsData, ok := resultData.([]interface{})
  85. if !ok {
  86. qrs.Error = ResultFieldFormatErr(query, resultData)
  87. return qrs
  88. }
  89. // Result vectors from the query
  90. var results []*source.QueryResult
  91. // Parse raw results and into QueryResults
  92. for _, val := range resultsData {
  93. resultInterface, ok := val.(map[string]interface{})
  94. if !ok {
  95. qrs.Error = ResultFormatErr(query, val)
  96. return qrs
  97. }
  98. metricInterface, ok := resultInterface["metric"]
  99. if !ok {
  100. qrs.Error = MetricFieldDoesNotExistErr(query, resultInterface)
  101. return qrs
  102. }
  103. metricMap, ok := metricInterface.(map[string]interface{})
  104. if !ok {
  105. qrs.Error = MetricFieldFormatErr(query, metricInterface)
  106. return qrs
  107. }
  108. // Define label string for values to ensure that we only run labelsForMetric once
  109. // if we receive multiple warnings.
  110. var labelString string = ""
  111. // Determine if the result is a ranged data set or single value
  112. _, isRange := resultInterface["values"]
  113. var vectors []*util.Vector
  114. if !isRange {
  115. dataPoint, ok := resultInterface["value"]
  116. if !ok {
  117. qrs.Error = ValueFieldDoesNotExistErr(query, resultInterface)
  118. return qrs
  119. }
  120. // Append new data point, log warnings
  121. v, warn, err := parseDataPoint(query, dataPoint)
  122. if err != nil {
  123. qrs.Error = err
  124. return qrs
  125. }
  126. if warn != nil {
  127. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
  128. }
  129. vectors = append(vectors, v)
  130. } else {
  131. values, ok := resultInterface["values"].([]interface{})
  132. if !ok {
  133. qrs.Error = fmt.Errorf("Values field is improperly formatted")
  134. return qrs
  135. }
  136. // Append new data points, log warnings
  137. for _, value := range values {
  138. v, warn, err := parseDataPoint(query, value)
  139. if err != nil {
  140. qrs.Error = err
  141. return qrs
  142. }
  143. if warn != nil {
  144. if labelString == "" {
  145. labelString = labelsForMetric(metricMap)
  146. }
  147. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
  148. }
  149. vectors = append(vectors, v)
  150. }
  151. }
  152. results = append(results, source.NewQueryResult(metricMap, vectors, resultKeys))
  153. }
  154. qrs.Results = results
  155. return qrs
  156. }
  157. // parseDataPoint parses a data point from raw prometheus query results and returns
  158. // a new Vector instance containing the parsed data along with any warnings or errors.
  159. func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {
  160. var w warning = nil
  161. value, ok := dataPoint.([]interface{})
  162. if !ok || len(value) != 2 {
  163. return nil, w, DataPointFormatErr(query, dataPoint)
  164. }
  165. strVal := value[1].(string)
  166. v, err := strconv.ParseFloat(strVal, 64)
  167. if err != nil {
  168. return nil, w, err
  169. }
  170. // Test for +Inf and -Inf (sign: 0), Test for NaN
  171. if math.IsInf(v, 0) {
  172. w = InfWarning
  173. v = 0.0
  174. } else if math.IsNaN(v) {
  175. w = NaNWarning
  176. v = 0.0
  177. }
  178. return &util.Vector{
  179. Timestamp: math.Round(value[0].(float64)/10) * 10,
  180. Value: v,
  181. }, w, nil
  182. }
  183. func labelsForMetric(metricMap map[string]interface{}) string {
  184. var pairs []string
  185. for k, v := range metricMap {
  186. pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
  187. }
  188. return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
  189. }
  190. func wrapPrometheusError(query string, qr interface{}) (string, error) {
  191. e, ok := qr.(map[string]interface{})["error"]
  192. if !ok {
  193. return "", PromUnexpectedResponseErr(query, qr)
  194. }
  195. eStr, ok := e.(string)
  196. return fmt.Sprintf("'%s' parsing query '%s'", eStr, query), nil
  197. }