result.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package prom
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/util"
  9. )
  10. var (
  11. // Static Warnings for data point parsing
  12. InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
  13. NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
  14. )
  15. func DataFieldFormatErr(query string) error {
  16. return fmt.Errorf("Data field improperly formatted in prometheus response fetching query '%s'", query)
  17. }
  18. func DataPointFormatErr(query string) error {
  19. return fmt.Errorf("Improperly formatted datapoint from Prometheus fetching query '%s'", query)
  20. }
  21. func MetricFieldDoesNotExistErr(query string) error {
  22. return fmt.Errorf("Metric field does not exist in data result vector fetching query '%s'", query)
  23. }
  24. func MetricFieldFormatErr(query string) error {
  25. return fmt.Errorf("Metric field is improperly formatted fetching query '%s'", query)
  26. }
  27. func NoDataErr(query string) error {
  28. return NewNoDataError(query)
  29. }
  30. func PromUnexpectedResponseErr(query string) error {
  31. return fmt.Errorf("Unexpected response from Prometheus fetching query '%s'", query)
  32. }
  33. func QueryResultNilErr(query string) error {
  34. return NewCommError(query)
  35. }
  36. func ResultFieldDoesNotExistErr(query string) error {
  37. return fmt.Errorf("Result field not does not exist in prometheus response fetching query '%s'", query)
  38. }
  39. func ResultFieldFormatErr(query string) error {
  40. return fmt.Errorf("Result field improperly formatted in prometheus response fetching query '%s'", query)
  41. }
  42. func ResultFormatErr(query string) error {
  43. return fmt.Errorf("Result is improperly formatted fetching query '%s'", query)
  44. }
  45. func ValueFieldDoesNotExistErr(query string) error {
  46. return fmt.Errorf("Value field does not exist in data result vector fetching query '%s'", query)
  47. }
  48. func ValueFieldFormatErr(query string) error {
  49. return fmt.Errorf("Values field is improperly formatted fetching query '%s'", query)
  50. }
  51. // QueryResultsChan is a channel of query results
  52. type QueryResultsChan chan *QueryResults
  53. // Await returns query results, blocking until they are made available, and
  54. // deferring the closure of the underlying channel
  55. func (qrc QueryResultsChan) Await() ([]*QueryResult, error) {
  56. defer close(qrc)
  57. results := <-qrc
  58. if results.Error != nil {
  59. return nil, results.Error
  60. }
  61. return results.Results, nil
  62. }
  63. // QueryResults contains all of the query results and the source query string.
  64. type QueryResults struct {
  65. Query string
  66. Error error
  67. Results []*QueryResult
  68. }
  69. // QueryResult contains a single result from a prometheus query. It's common
  70. // to refer to query results as a slice of QueryResult
  71. type QueryResult struct {
  72. Metric map[string]interface{} `json:"metric"`
  73. Values []*util.Vector `json:"values"`
  74. }
  75. // NewQueryResults accepts the raw prometheus query result and returns an array of
  76. // QueryResult objects
  77. func NewQueryResults(query string, queryResult interface{}) *QueryResults {
  78. qrs := &QueryResults{Query: query}
  79. if queryResult == nil {
  80. qrs.Error = QueryResultNilErr(query)
  81. return qrs
  82. }
  83. data, ok := queryResult.(map[string]interface{})["data"]
  84. if !ok {
  85. e, err := wrapPrometheusError(query, queryResult)
  86. if err != nil {
  87. qrs.Error = err
  88. return qrs
  89. }
  90. qrs.Error = fmt.Errorf(e)
  91. return qrs
  92. }
  93. // Deep Check for proper formatting
  94. d, ok := data.(map[string]interface{})
  95. if !ok {
  96. qrs.Error = DataFieldFormatErr(query)
  97. return qrs
  98. }
  99. resultData, ok := d["result"]
  100. if !ok {
  101. qrs.Error = ResultFieldDoesNotExistErr(query)
  102. return qrs
  103. }
  104. resultsData, ok := resultData.([]interface{})
  105. if !ok {
  106. qrs.Error = ResultFieldFormatErr(query)
  107. return qrs
  108. }
  109. // Result vectors from the query
  110. var results []*QueryResult
  111. // Parse raw results and into QueryResults
  112. for _, val := range resultsData {
  113. resultInterface, ok := val.(map[string]interface{})
  114. if !ok {
  115. qrs.Error = ResultFormatErr(query)
  116. return qrs
  117. }
  118. metricInterface, ok := resultInterface["metric"]
  119. if !ok {
  120. qrs.Error = MetricFieldDoesNotExistErr(query)
  121. return qrs
  122. }
  123. metricMap, ok := metricInterface.(map[string]interface{})
  124. if !ok {
  125. qrs.Error = MetricFieldFormatErr(query)
  126. return qrs
  127. }
  128. // Define label string for values to ensure that we only run labelsForMetric once
  129. // if we receive multiple warnings.
  130. var labelString string = ""
  131. // Determine if the result is a ranged data set or single value
  132. _, isRange := resultInterface["values"]
  133. var vectors []*util.Vector
  134. if !isRange {
  135. dataPoint, ok := resultInterface["value"]
  136. if !ok {
  137. qrs.Error = ValueFieldDoesNotExistErr(query)
  138. return qrs
  139. }
  140. // Append new data point, log warnings
  141. v, warn, err := parseDataPoint(query, dataPoint)
  142. if err != nil {
  143. qrs.Error = err
  144. return qrs
  145. }
  146. if warn != nil {
  147. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
  148. }
  149. vectors = append(vectors, v)
  150. } else {
  151. values, ok := resultInterface["values"].([]interface{})
  152. if !ok {
  153. qrs.Error = fmt.Errorf("Values field is improperly formatted")
  154. return qrs
  155. }
  156. // Append new data points, log warnings
  157. for _, value := range values {
  158. v, warn, err := parseDataPoint(query, value)
  159. if err != nil {
  160. qrs.Error = err
  161. return qrs
  162. }
  163. if warn != nil {
  164. if labelString == "" {
  165. labelString = labelsForMetric(metricMap)
  166. }
  167. log.DedupedWarningf(5, "%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
  168. }
  169. vectors = append(vectors, v)
  170. }
  171. }
  172. results = append(results, &QueryResult{
  173. Metric: metricMap,
  174. Values: vectors,
  175. })
  176. }
  177. qrs.Results = results
  178. return qrs
  179. }
  180. // GetString returns the requested field, or an error if it does not exist
  181. func (qr *QueryResult) GetString(field string) (string, error) {
  182. f, ok := qr.Metric[field]
  183. if !ok {
  184. return "", fmt.Errorf("'%s' field does not exist in data result vector", field)
  185. }
  186. strField, ok := f.(string)
  187. if !ok {
  188. return "", fmt.Errorf("'%s' field is improperly formatted and cannot be converted to string", field)
  189. }
  190. return strField, nil
  191. }
  192. // GetStrings returns the requested fields, or an error if it does not exist
  193. func (qr *QueryResult) GetStrings(fields ...string) (map[string]string, error) {
  194. values := map[string]string{}
  195. for _, field := range fields {
  196. f, ok := qr.Metric[field]
  197. if !ok {
  198. return nil, fmt.Errorf("'%s' field does not exist in data result vector", field)
  199. }
  200. value, ok := f.(string)
  201. if !ok {
  202. return nil, fmt.Errorf("'%s' field is improperly formatted and cannot be converted to string", field)
  203. }
  204. values[field] = value
  205. }
  206. return values, nil
  207. }
  208. // GetLabels returns all labels and their values from the query result
  209. func (qr *QueryResult) GetLabels() map[string]string {
  210. result := make(map[string]string)
  211. // Find All keys with prefix label_, remove prefix, add to labels
  212. for k, v := range qr.Metric {
  213. if !strings.HasPrefix(k, "label_") {
  214. continue
  215. }
  216. label := strings.TrimPrefix(k, "label_")
  217. value, ok := v.(string)
  218. if !ok {
  219. log.Warnf("Failed to parse label value for label: '%s'", label)
  220. continue
  221. }
  222. result[label] = value
  223. }
  224. return result
  225. }
  226. // GetAnnotations returns all annotations and their values from the query result
  227. func (qr *QueryResult) GetAnnotations() map[string]string {
  228. result := make(map[string]string)
  229. // Find All keys with prefix annotation_, remove prefix, add to annotations
  230. for k, v := range qr.Metric {
  231. if !strings.HasPrefix(k, "annotation_") {
  232. continue
  233. }
  234. annotations := strings.TrimPrefix(k, "annotation_")
  235. value, ok := v.(string)
  236. if !ok {
  237. log.Warnf("Failed to parse label value for label: '%s'", annotations)
  238. continue
  239. }
  240. result[annotations] = value
  241. }
  242. return result
  243. }
  244. // parseDataPoint parses a data point from raw prometheus query results and returns
  245. // a new Vector instance containing the parsed data along with any warnings or errors.
  246. func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {
  247. var w warning = nil
  248. value, ok := dataPoint.([]interface{})
  249. if !ok || len(value) != 2 {
  250. return nil, w, DataPointFormatErr(query)
  251. }
  252. strVal := value[1].(string)
  253. v, err := strconv.ParseFloat(strVal, 64)
  254. if err != nil {
  255. return nil, w, err
  256. }
  257. // Test for +Inf and -Inf (sign: 0), Test for NaN
  258. if math.IsInf(v, 0) {
  259. w = InfWarning
  260. v = 0.0
  261. } else if math.IsNaN(v) {
  262. w = NaNWarning
  263. v = 0.0
  264. }
  265. return &util.Vector{
  266. Timestamp: math.Round(value[0].(float64)/10) * 10,
  267. Value: v,
  268. }, w, nil
  269. }
  270. func labelsForMetric(metricMap map[string]interface{}) string {
  271. var pairs []string
  272. for k, v := range metricMap {
  273. pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
  274. }
  275. return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
  276. }
  277. func wrapPrometheusError(query string, qr interface{}) (string, error) {
  278. e, ok := qr.(map[string]interface{})["error"]
  279. if !ok {
  280. return "", PromUnexpectedResponseErr(query)
  281. }
  282. eStr, ok := e.(string)
  283. return fmt.Sprintf("'%s' parsing query '%s'", eStr, query), nil
  284. }