result.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package prom
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "strings"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/util"
  10. )
  11. // QueryResultsChan is a channel of query results
  12. type QueryResultsChan chan *QueryResults
  13. // Await returns query results, blocking until they are made available, and
  14. // deferring the closure of the underlying channel
  15. func (qrc QueryResultsChan) Await() []*QueryResult {
  16. defer close(qrc)
  17. results := <-qrc
  18. // Possible that the returned results are nil
  19. if results == nil {
  20. return nil
  21. }
  22. return results.Results
  23. }
  24. // QueryResult contains a single result from a prometheus query. It's common
  25. // to refer to query results as a slice of QueryResult
  26. type QueryResult struct {
  27. Metric map[string]interface{}
  28. Values []*util.Vector
  29. }
  30. // QueryResults contains all of the query results and the source query string.
  31. type QueryResults struct {
  32. Query string
  33. Results []*QueryResult
  34. }
  35. var (
  36. // Static Warnings for data point parsing
  37. InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
  38. NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
  39. // Static Errors for query result parsing
  40. QueryResultNilErr error = NewCommError("nil queryResult")
  41. PromUnexpectedResponseErr error = errors.New("Unexpected response from Prometheus")
  42. DataFieldFormatErr error = errors.New("Data field improperly formatted in prometheus repsonse")
  43. ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
  44. ResultFieldFormatErr error = errors.New("Result field improperly formatted in prometheus response")
  45. ResultFormatErr error = errors.New("Result is improperly formatted")
  46. MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
  47. MetricFieldFormatErr error = errors.New("Metric field is improperly formatted")
  48. ValueFieldDoesNotExistErr error = errors.New("Value field does not exist in data result vector")
  49. ValueFieldFormatErr error = errors.New("Values field is improperly formatted")
  50. DataPointFormatErr error = errors.New("Improperly formatted datapoint from Prometheus")
  51. )
  52. // NewQueryResults accepts the raw prometheus query result and returns an array of
  53. // QueryResult objects
  54. func NewQueryResults(query string, queryResult interface{}) (*QueryResults, error) {
  55. if queryResult == nil {
  56. return nil, QueryResultNilErr
  57. }
  58. data, ok := queryResult.(map[string]interface{})["data"]
  59. if !ok {
  60. e, err := wrapPrometheusError(queryResult)
  61. if err != nil {
  62. return nil, err
  63. }
  64. return nil, fmt.Errorf(e)
  65. }
  66. // Deep Check for proper formatting
  67. d, ok := data.(map[string]interface{})
  68. if !ok {
  69. return nil, DataFieldFormatErr
  70. }
  71. resultData, ok := d["result"]
  72. if !ok {
  73. return nil, ResultFieldDoesNotExistErr
  74. }
  75. resultsData, ok := resultData.([]interface{})
  76. if !ok {
  77. return nil, ResultFieldFormatErr
  78. }
  79. // Result vectors from the query
  80. var results []*QueryResult
  81. // Parse raw results and into QueryResults
  82. for _, val := range resultsData {
  83. resultInterface, ok := val.(map[string]interface{})
  84. if !ok {
  85. return nil, ResultFormatErr
  86. }
  87. metricInterface, ok := resultInterface["metric"]
  88. if !ok {
  89. return nil, MetricFieldDoesNotExistErr
  90. }
  91. metricMap, ok := metricInterface.(map[string]interface{})
  92. if !ok {
  93. return nil, MetricFieldFormatErr
  94. }
  95. // Define label string for values to ensure that we only run labelsForMetric once
  96. // if we receive multiple warnings.
  97. var labelString string = ""
  98. // Determine if the result is a ranged data set or single value
  99. _, isRange := resultInterface["values"]
  100. var vectors []*util.Vector
  101. if !isRange {
  102. dataPoint, ok := resultInterface["value"]
  103. if !ok {
  104. return nil, ValueFieldDoesNotExistErr
  105. }
  106. // Append new data point, log warnings
  107. v, warn, err := parseDataPoint(dataPoint)
  108. if err != nil {
  109. return nil, err
  110. }
  111. if warn != nil {
  112. log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
  113. }
  114. vectors = append(vectors, v)
  115. } else {
  116. values, ok := resultInterface["values"].([]interface{})
  117. if !ok {
  118. return nil, fmt.Errorf("Values field is improperly formatted")
  119. }
  120. // Append new data points, log warnings
  121. for _, value := range values {
  122. v, warn, err := parseDataPoint(value)
  123. if err != nil {
  124. return nil, err
  125. }
  126. if warn != nil {
  127. if labelString == "" {
  128. labelString = labelsForMetric(metricMap)
  129. }
  130. log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
  131. }
  132. vectors = append(vectors, v)
  133. }
  134. }
  135. results = append(results, &QueryResult{
  136. Metric: metricMap,
  137. Values: vectors,
  138. })
  139. }
  140. return &QueryResults{
  141. Query: query,
  142. Results: results,
  143. }, nil
  144. }
  145. // GetString returns the requested field, or an error if it does not exist
  146. func (qr *QueryResult) GetString(field string) (string, error) {
  147. f, ok := qr.Metric[field]
  148. if !ok {
  149. return "", fmt.Errorf("'%s' field does not exist in data result vector", field)
  150. }
  151. strField, ok := f.(string)
  152. if !ok {
  153. return "", fmt.Errorf("'%s' field is improperly formatted", field)
  154. }
  155. return strField, nil
  156. }
  157. // GetLabels returns all labels and their values from the query result
  158. func (qr *QueryResult) GetLabels() map[string]string {
  159. result := make(map[string]string)
  160. // Find All keys with prefix label_, remove prefix, add to labels
  161. for k, v := range qr.Metric {
  162. if !strings.HasPrefix(k, "label_") {
  163. continue
  164. }
  165. label := k[6:]
  166. value, ok := v.(string)
  167. if !ok {
  168. log.Warningf("Failed to parse label value for label: '%s'", label)
  169. continue
  170. }
  171. result[label] = value
  172. }
  173. return result
  174. }
  175. // parseDataPoint parses a data point from raw prometheus query results and returns
  176. // a new Vector instance containing the parsed data along with any warnings or errors.
  177. func parseDataPoint(dataPoint interface{}) (*util.Vector, warning, error) {
  178. var w warning = nil
  179. value, ok := dataPoint.([]interface{})
  180. if !ok || len(value) != 2 {
  181. return nil, w, DataPointFormatErr
  182. }
  183. strVal := value[1].(string)
  184. v, err := strconv.ParseFloat(strVal, 64)
  185. if err != nil {
  186. return nil, w, err
  187. }
  188. // Test for +Inf and -Inf (sign: 0), Test for NaN
  189. if math.IsInf(v, 0) {
  190. w = InfWarning
  191. v = 0.0
  192. } else if math.IsNaN(v) {
  193. w = NaNWarning
  194. v = 0.0
  195. }
  196. return &util.Vector{
  197. Timestamp: math.Round(value[0].(float64)/10) * 10,
  198. Value: v,
  199. }, w, nil
  200. }
  201. func labelsForMetric(metricMap map[string]interface{}) string {
  202. var pairs []string
  203. for k, v := range metricMap {
  204. pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
  205. }
  206. return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
  207. }
  208. func wrapPrometheusError(qr interface{}) (string, error) {
  209. e, ok := qr.(map[string]interface{})["error"]
  210. if !ok {
  211. return "", PromUnexpectedResponseErr
  212. }
  213. eStr, ok := e.(string)
  214. return eStr, nil
  215. }