result.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package prom
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/util"
  9. )
  10. var (
  11. // Static Warnings for data point parsing
  12. InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
  13. NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
  14. )
  15. func DataFieldFormatErr(query string, promResponse interface{}) error {
  16. return fmt.Errorf("Error parsing Prometheus response: 'data' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  17. }
  18. func DataPointFormatErr(query string, promResponse interface{}) error {
  19. return fmt.Errorf("Error parsing Prometheus response: improperly formatted datapoint. Query: '%s'. Response: '%+v'", query, promResponse)
  20. }
  21. func MetricFieldDoesNotExistErr(query string, promResponse interface{}) error {
  22. return fmt.Errorf("Error parsing Prometheus response: 'metric' field does not exist in data result vector. Query: '%s'. Response: '%+v'", query, promResponse)
  23. }
  24. func MetricFieldFormatErr(query string, promResponse interface{}) error {
  25. return fmt.Errorf("Error parsing Prometheus response: 'metric' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  26. }
  27. func NoDataErr(query string) error {
  28. return NewNoDataError(query)
  29. }
  30. func PromUnexpectedResponseErr(query string, promResponse interface{}) error {
  31. return fmt.Errorf("Error parsing Prometheus response: unexpected response. Query: '%s'. Response: '%+v'", query, promResponse)
  32. }
  33. func QueryResultNilErr(query string) error {
  34. return NewCommError(query)
  35. }
  36. func ResultFieldDoesNotExistErr(query string, promResponse interface{}) error {
  37. return fmt.Errorf("Error parsing Prometheus response: 'result' field does not exist. Query: '%s'. Response: '%+v'", query, promResponse)
  38. }
  39. func ResultFieldFormatErr(query string, promResponse interface{}) error {
  40. return fmt.Errorf("Error parsing Prometheus response: 'result' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  41. }
  42. func ResultFormatErr(query string, promResponse interface{}) error {
  43. return fmt.Errorf("Error parsing Prometheus response: 'result' field improperly formatted. Query: '%s'. Response: '%+v'", query, promResponse)
  44. }
  45. func ValueFieldDoesNotExistErr(query string, promResponse interface{}) error {
  46. return fmt.Errorf("Error parsing Prometheus response: 'value' field does not exist in data result vector. Query: '%s'. Response: '%+v'", query, promResponse)
  47. }
  48. // QueryResultsChan is a channel of query results
  49. type QueryResultsChan chan *QueryResults
  50. // Await returns query results, blocking until they are made available, and
  51. // deferring the closure of the underlying channel
  52. func (qrc QueryResultsChan) Await() ([]*QueryResult, error) {
  53. defer close(qrc)
  54. results := <-qrc
  55. if results.Error != nil {
  56. return nil, results.Error
  57. }
  58. return results.Results, nil
  59. }
  60. // QueryResults contains all of the query results and the source query string.
  61. type QueryResults struct {
  62. Query string
  63. Error error
  64. Results []*QueryResult
  65. }
  66. // QueryResult contains a single result from a prometheus query. It's common
  67. // to refer to query results as a slice of QueryResult
  68. type QueryResult struct {
  69. Metric map[string]interface{} `json:"metric"`
  70. Values []*util.Vector `json:"values"`
  71. }
  72. // NewQueryResults accepts the raw prometheus query result and returns an array of
  73. // QueryResult objects
  74. func NewQueryResults(query string, queryResult interface{}) *QueryResults {
  75. qrs := &QueryResults{Query: query}
  76. if queryResult == nil {
  77. qrs.Error = QueryResultNilErr(query)
  78. return qrs
  79. }
  80. data, ok := queryResult.(map[string]interface{})["data"]
  81. if !ok {
  82. e, err := wrapPrometheusError(query, queryResult)
  83. if err != nil {
  84. qrs.Error = err
  85. return qrs
  86. }
  87. qrs.Error = fmt.Errorf(e)
  88. return qrs
  89. }
  90. // Deep Check for proper formatting
  91. d, ok := data.(map[string]interface{})
  92. if !ok {
  93. qrs.Error = DataFieldFormatErr(query, data)
  94. return qrs
  95. }
  96. resultData, ok := d["result"]
  97. if !ok {
  98. qrs.Error = ResultFieldDoesNotExistErr(query, d)
  99. return qrs
  100. }
  101. resultsData, ok := resultData.([]interface{})
  102. if !ok {
  103. qrs.Error = ResultFieldFormatErr(query, resultData)
  104. return qrs
  105. }
  106. // Result vectors from the query
  107. var results []*QueryResult
  108. // Parse raw results and into QueryResults
  109. for _, val := range resultsData {
  110. resultInterface, ok := val.(map[string]interface{})
  111. if !ok {
  112. qrs.Error = ResultFormatErr(query, val)
  113. return qrs
  114. }
  115. metricInterface, ok := resultInterface["metric"]
  116. if !ok {
  117. qrs.Error = MetricFieldDoesNotExistErr(query, resultInterface)
  118. return qrs
  119. }
  120. metricMap, ok := metricInterface.(map[string]interface{})
  121. if !ok {
  122. qrs.Error = MetricFieldFormatErr(query, metricInterface)
  123. return qrs
  124. }
  125. // Define label string for values to ensure that we only run labelsForMetric once
  126. // if we receive multiple warnings.
  127. var labelString string = ""
  128. // Determine if the result is a ranged data set or single value
  129. _, isRange := resultInterface["values"]
  130. var vectors []*util.Vector
  131. if !isRange {
  132. dataPoint, ok := resultInterface["value"]
  133. if !ok {
  134. qrs.Error = ValueFieldDoesNotExistErr(query, resultInterface)
  135. return qrs
  136. }
  137. // Append new data point, log warnings
  138. v, warn, err := parseDataPoint(query, dataPoint)
  139. if err != nil {
  140. qrs.Error = err
  141. return qrs
  142. }
  143. if warn != nil {
  144. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
  145. }
  146. vectors = append(vectors, v)
  147. } else {
  148. values, ok := resultInterface["values"].([]interface{})
  149. if !ok {
  150. qrs.Error = fmt.Errorf("Values field is improperly formatted")
  151. return qrs
  152. }
  153. // Append new data points, log warnings
  154. for _, value := range values {
  155. v, warn, err := parseDataPoint(query, value)
  156. if err != nil {
  157. qrs.Error = err
  158. return qrs
  159. }
  160. if warn != nil {
  161. if labelString == "" {
  162. labelString = labelsForMetric(metricMap)
  163. }
  164. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
  165. }
  166. vectors = append(vectors, v)
  167. }
  168. }
  169. results = append(results, &QueryResult{
  170. Metric: metricMap,
  171. Values: vectors,
  172. })
  173. }
  174. qrs.Results = results
  175. return qrs
  176. }
  177. // GetString returns the requested field, or an error if it does not exist
  178. func (qr *QueryResult) GetString(field string) (string, error) {
  179. f, ok := qr.Metric[field]
  180. if !ok {
  181. return "", fmt.Errorf("'%s' field does not exist in data result vector", field)
  182. }
  183. strField, ok := f.(string)
  184. if !ok {
  185. return "", fmt.Errorf("'%s' field is improperly formatted and cannot be converted to string", field)
  186. }
  187. return strField, nil
  188. }
  189. // GetStrings returns the requested fields, or an error if it does not exist
  190. func (qr *QueryResult) GetStrings(fields ...string) (map[string]string, error) {
  191. values := map[string]string{}
  192. for _, field := range fields {
  193. f, ok := qr.Metric[field]
  194. if !ok {
  195. return nil, fmt.Errorf("'%s' field does not exist in data result vector", field)
  196. }
  197. value, ok := f.(string)
  198. if !ok {
  199. return nil, fmt.Errorf("'%s' field is improperly formatted and cannot be converted to string", field)
  200. }
  201. values[field] = value
  202. }
  203. return values, nil
  204. }
  205. // GetLabels returns all labels and their values from the query result
  206. func (qr *QueryResult) GetLabels() map[string]string {
  207. result := make(map[string]string)
  208. // Find All keys with prefix label_, remove prefix, add to labels
  209. for k, v := range qr.Metric {
  210. if !strings.HasPrefix(k, "label_") {
  211. continue
  212. }
  213. label := strings.TrimPrefix(k, "label_")
  214. value, ok := v.(string)
  215. if !ok {
  216. log.Warnf("Failed to parse label value for label: '%s'", label)
  217. continue
  218. }
  219. result[label] = value
  220. }
  221. return result
  222. }
  223. // GetAnnotations returns all annotations and their values from the query result
  224. func (qr *QueryResult) GetAnnotations() map[string]string {
  225. result := make(map[string]string)
  226. // Find All keys with prefix annotation_, remove prefix, add to annotations
  227. for k, v := range qr.Metric {
  228. if !strings.HasPrefix(k, "annotation_") {
  229. continue
  230. }
  231. annotations := strings.TrimPrefix(k, "annotation_")
  232. value, ok := v.(string)
  233. if !ok {
  234. log.Warnf("Failed to parse label value for label: '%s'", annotations)
  235. continue
  236. }
  237. result[annotations] = value
  238. }
  239. return result
  240. }
  241. // parseDataPoint parses a data point from raw prometheus query results and returns
  242. // a new Vector instance containing the parsed data along with any warnings or errors.
  243. func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {
  244. var w warning = nil
  245. value, ok := dataPoint.([]interface{})
  246. if !ok || len(value) != 2 {
  247. return nil, w, DataPointFormatErr(query, dataPoint)
  248. }
  249. strVal := value[1].(string)
  250. v, err := strconv.ParseFloat(strVal, 64)
  251. if err != nil {
  252. return nil, w, err
  253. }
  254. // Test for +Inf and -Inf (sign: 0), Test for NaN
  255. if math.IsInf(v, 0) {
  256. w = InfWarning
  257. v = 0.0
  258. } else if math.IsNaN(v) {
  259. w = NaNWarning
  260. v = 0.0
  261. }
  262. return &util.Vector{
  263. Timestamp: math.Round(value[0].(float64)/10) * 10,
  264. Value: v,
  265. }, w, nil
  266. }
  267. func labelsForMetric(metricMap map[string]interface{}) string {
  268. var pairs []string
  269. for k, v := range metricMap {
  270. pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
  271. }
  272. return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
  273. }
  274. func wrapPrometheusError(query string, qr interface{}) (string, error) {
  275. e, ok := qr.(map[string]interface{})["error"]
  276. if !ok {
  277. return "", PromUnexpectedResponseErr(query, qr)
  278. }
  279. eStr, ok := e.(string)
  280. return fmt.Sprintf("'%s' parsing query '%s'", eStr, query), nil
  281. }