result.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package prom
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/source"
  9. "github.com/opencost/opencost/core/pkg/util"
  10. )
  11. var (
  12. // Static Warnings for data point parsing
  13. InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
  14. NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
  15. )
  16. func DataFieldFormatErr(query string) error {
  17. return fmt.Errorf("Data field improperly formatted in prometheus response fetching query '%s'", query)
  18. }
  19. func DataPointFormatErr(query string) error {
  20. return fmt.Errorf("Improperly formatted datapoint from Prometheus fetching query '%s'", query)
  21. }
  22. func MetricFieldDoesNotExistErr(query string) error {
  23. return fmt.Errorf("Metric field does not exist in data result vector fetching query '%s'", query)
  24. }
  25. func MetricFieldFormatErr(query string) error {
  26. return fmt.Errorf("Metric field is improperly formatted fetching query '%s'", query)
  27. }
  28. func NoDataErr(query string) error {
  29. return source.NewNoDataError(query)
  30. }
  31. func PromUnexpectedResponseErr(query string) error {
  32. return fmt.Errorf("Unexpected response from Prometheus fetching query '%s'", query)
  33. }
  34. func QueryResultNilErr(query string) error {
  35. return source.NewCommError(query)
  36. }
  37. func ResultFieldDoesNotExistErr(query string) error {
  38. return fmt.Errorf("Result field not does not exist in prometheus response fetching query '%s'", query)
  39. }
  40. func ResultFieldFormatErr(query string) error {
  41. return fmt.Errorf("Result field improperly formatted in prometheus response fetching query '%s'", query)
  42. }
  43. func ResultFormatErr(query string) error {
  44. return fmt.Errorf("Result is improperly formatted fetching query '%s'", query)
  45. }
  46. func ValueFieldDoesNotExistErr(query string) error {
  47. return fmt.Errorf("Value field does not exist in data result vector fetching query '%s'", query)
  48. }
  49. func ValueFieldFormatErr(query string) error {
  50. return fmt.Errorf("Values field is improperly formatted fetching query '%s'", query)
  51. }
  52. // NewQueryResultError returns a QueryResults object with an error set and does not parse a result.
  53. func NewQueryResultError(query string, err error) *source.QueryResults {
  54. qrs := source.NewQueryResults(query)
  55. qrs.Error = err
  56. return qrs
  57. }
  58. // NewQueryResults accepts the raw prometheus query result and returns an array of
  59. // QueryResult objects
  60. func NewQueryResults(query string, queryResult interface{}, resultKeys *source.ResultKeys) *source.QueryResults {
  61. qrs := source.NewQueryResults(query)
  62. if queryResult == nil {
  63. qrs.Error = QueryResultNilErr(query)
  64. return qrs
  65. }
  66. data, ok := queryResult.(map[string]interface{})["data"]
  67. if !ok {
  68. e, err := wrapPrometheusError(query, queryResult)
  69. if err != nil {
  70. qrs.Error = err
  71. return qrs
  72. }
  73. qrs.Error = fmt.Errorf("%s", e)
  74. return qrs
  75. }
  76. // Deep Check for proper formatting
  77. d, ok := data.(map[string]interface{})
  78. if !ok {
  79. qrs.Error = DataFieldFormatErr(query)
  80. return qrs
  81. }
  82. resultData, ok := d["result"]
  83. if !ok {
  84. qrs.Error = ResultFieldDoesNotExistErr(query)
  85. return qrs
  86. }
  87. resultsData, ok := resultData.([]interface{})
  88. if !ok {
  89. qrs.Error = ResultFieldFormatErr(query)
  90. return qrs
  91. }
  92. // Result vectors from the query
  93. var results []*source.QueryResult
  94. // Parse raw results and into QueryResults
  95. for _, val := range resultsData {
  96. resultInterface, ok := val.(map[string]interface{})
  97. if !ok {
  98. qrs.Error = ResultFormatErr(query)
  99. return qrs
  100. }
  101. metricInterface, ok := resultInterface["metric"]
  102. if !ok {
  103. qrs.Error = MetricFieldDoesNotExistErr(query)
  104. return qrs
  105. }
  106. metricMap, ok := metricInterface.(map[string]interface{})
  107. if !ok {
  108. qrs.Error = MetricFieldFormatErr(query)
  109. return qrs
  110. }
  111. // Define label string for values to ensure that we only run labelsForMetric once
  112. // if we receive multiple warnings.
  113. var labelString string = ""
  114. // Determine if the result is a ranged data set or single value
  115. _, isRange := resultInterface["values"]
  116. var vectors []*util.Vector
  117. if !isRange {
  118. dataPoint, ok := resultInterface["value"]
  119. if !ok {
  120. qrs.Error = ValueFieldDoesNotExistErr(query)
  121. return qrs
  122. }
  123. // Append new data point, log warnings
  124. v, warn, err := parseDataPoint(query, dataPoint)
  125. if err != nil {
  126. qrs.Error = err
  127. return qrs
  128. }
  129. if warn != nil {
  130. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
  131. }
  132. vectors = append(vectors, v)
  133. } else {
  134. values, ok := resultInterface["values"].([]interface{})
  135. if !ok {
  136. qrs.Error = fmt.Errorf("Values field is improperly formatted")
  137. return qrs
  138. }
  139. // Append new data points, log warnings
  140. for _, value := range values {
  141. v, warn, err := parseDataPoint(query, value)
  142. if err != nil {
  143. qrs.Error = err
  144. return qrs
  145. }
  146. if warn != nil {
  147. if labelString == "" {
  148. labelString = labelsForMetric(metricMap)
  149. }
  150. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
  151. }
  152. vectors = append(vectors, v)
  153. }
  154. }
  155. results = append(results, source.NewQueryResult(metricMap, vectors, resultKeys))
  156. }
  157. qrs.Results = results
  158. return qrs
  159. }
  160. // parseDataPoint parses a data point from raw prometheus query results and returns
  161. // a new Vector instance containing the parsed data along with any warnings or errors.
  162. func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {
  163. var w warning = nil
  164. value, ok := dataPoint.([]interface{})
  165. if !ok || len(value) != 2 {
  166. return nil, w, DataPointFormatErr(query)
  167. }
  168. strVal := value[1].(string)
  169. v, err := strconv.ParseFloat(strVal, 64)
  170. if err != nil {
  171. return nil, w, err
  172. }
  173. // Test for +Inf and -Inf (sign: 0), Test for NaN
  174. if math.IsInf(v, 0) {
  175. w = InfWarning
  176. v = 0.0
  177. } else if math.IsNaN(v) {
  178. w = NaNWarning
  179. v = 0.0
  180. }
  181. return &util.Vector{
  182. Timestamp: math.Round(value[0].(float64)/10) * 10,
  183. Value: v,
  184. }, w, nil
  185. }
  186. func labelsForMetric(metricMap map[string]interface{}) string {
  187. var pairs []string
  188. for k, v := range metricMap {
  189. pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
  190. }
  191. return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
  192. }
  193. func wrapPrometheusError(query string, qr interface{}) (string, error) {
  194. e, ok := qr.(map[string]interface{})["error"]
  195. if !ok {
  196. return "", PromUnexpectedResponseErr(query)
  197. }
  198. eStr, ok := e.(string)
  199. return fmt.Sprintf("'%s' parsing query '%s'", eStr, query), nil
  200. }