result.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package prom
  2. import (
  3. "fmt"
  4. "math"
  5. "strconv"
  6. "strings"
  7. "github.com/kubecost/cost-model/pkg/util"
  8. "k8s.io/klog"
  9. )
  10. // QueryResultsChan is a channel of query results
  11. type QueryResultsChan chan []*QueryResult
  12. // Await returns query results, blocking until they are made available, and
  13. // deferring the closure of the underlying channel
  14. func (qrc QueryResultsChan) Await() []*QueryResult {
  15. defer close(qrc)
  16. return <-qrc
  17. }
  18. // QueryResult contains a single result from a prometheus query. It's common
  19. // to refer to query results as a slice of QueryResult
  20. type QueryResult struct {
  21. Metric map[string]interface{}
  22. Values []*util.Vector
  23. }
  24. // NewQueryResults accepts the raw prometheus query result and returns an array of
  25. // QueryResult objects
  26. func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
  27. var result []*QueryResult
  28. if queryResult == nil {
  29. return nil, fmt.Errorf("[Error] nil result from prometheus, has it gone down?")
  30. }
  31. data, ok := queryResult.(map[string]interface{})["data"]
  32. if !ok {
  33. e, err := wrapPrometheusError(queryResult)
  34. if err != nil {
  35. return nil, err
  36. }
  37. return nil, fmt.Errorf(e)
  38. }
  39. // Deep Check for proper formatting
  40. d, ok := data.(map[string]interface{})
  41. if !ok {
  42. return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
  43. }
  44. resultData, ok := d["result"]
  45. if !ok {
  46. return nil, fmt.Errorf("Result field not present in prometheus response")
  47. }
  48. resultsData, ok := resultData.([]interface{})
  49. if !ok {
  50. return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
  51. }
  52. // Scan Results
  53. for _, val := range resultsData {
  54. resultInterface, ok := val.(map[string]interface{})
  55. if !ok {
  56. return nil, fmt.Errorf("Result is improperly formatted")
  57. }
  58. metricInterface, ok := resultInterface["metric"]
  59. if !ok {
  60. return nil, fmt.Errorf("Metric field does not exist in data result vector")
  61. }
  62. metricMap, ok := metricInterface.(map[string]interface{})
  63. if !ok {
  64. return nil, fmt.Errorf("Metric field is improperly formatted")
  65. }
  66. // Wrap execution of this lazily in case the data is not used
  67. labels := func() string { return labelsForMetric(metricMap) }
  68. // Determine if the result is a ranged data set or single value
  69. _, isRange := resultInterface["values"]
  70. var vectors []*util.Vector
  71. if !isRange {
  72. dataPoint, ok := resultInterface["value"]
  73. if !ok {
  74. return nil, fmt.Errorf("Value field does not exist in data result vector")
  75. }
  76. v, err := parseDataPoint(dataPoint, labels)
  77. if err != nil {
  78. return nil, err
  79. }
  80. vectors = append(vectors, v)
  81. } else {
  82. values, ok := resultInterface["values"].([]interface{})
  83. if !ok {
  84. return nil, fmt.Errorf("Values field is improperly formatted")
  85. }
  86. for _, value := range values {
  87. v, err := parseDataPoint(value, labels)
  88. if err != nil {
  89. return nil, err
  90. }
  91. vectors = append(vectors, v)
  92. }
  93. }
  94. result = append(result, &QueryResult{
  95. Metric: metricMap,
  96. Values: vectors,
  97. })
  98. }
  99. return result, nil
  100. }
  101. // GetString returns the requested field, or an error if it does not exist
  102. func (qr *QueryResult) GetString(field string) (string, error) {
  103. f, ok := qr.Metric[field]
  104. if !ok {
  105. return "", fmt.Errorf("%s field does not exist in data result vector", field)
  106. }
  107. strField, ok := f.(string)
  108. if !ok {
  109. return "", fmt.Errorf("%s field is improperly formatted", field)
  110. }
  111. return strField, nil
  112. }
  113. // GetLabels returns all labels and their values from the query result
  114. func (qr *QueryResult) GetLabels() map[string]string {
  115. result := make(map[string]string)
  116. // Find All keys with prefix label_, remove prefix, add to labels
  117. for k, v := range qr.Metric {
  118. if !strings.HasPrefix(k, "label_") {
  119. continue
  120. }
  121. label := k[6:]
  122. value, ok := v.(string)
  123. if !ok {
  124. klog.V(3).Infof("Failed to parse label value for label: %s", label)
  125. continue
  126. }
  127. result[label] = value
  128. }
  129. return result
  130. }
  131. func parseDataPoint(dataPoint interface{}, labels func() string) (*util.Vector, error) {
  132. value, ok := dataPoint.([]interface{})
  133. if !ok || len(value) != 2 {
  134. return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
  135. }
  136. strVal := value[1].(string)
  137. v, err := strconv.ParseFloat(strVal, 64)
  138. if err != nil {
  139. return nil, err
  140. }
  141. // Test for +Inf and -Inf (sign: 0), Test for NaN
  142. if math.IsInf(v, 0) {
  143. klog.V(1).Infof("[Warning] Found Inf value parsing vector data point for metric: %s", labels())
  144. v = 0.0
  145. } else if math.IsNaN(v) {
  146. klog.V(1).Infof("[Warning] Found NaN value parsing vector data point for metric: %s", labels())
  147. v = 0.0
  148. }
  149. return &util.Vector{
  150. Timestamp: math.Round(value[0].(float64)/10) * 10,
  151. Value: v,
  152. }, nil
  153. }
  154. func labelsForMetric(metricMap map[string]interface{}) string {
  155. var pairs []string
  156. for k, v := range metricMap {
  157. pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
  158. }
  159. return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
  160. }
  161. func wrapPrometheusError(qr interface{}) (string, error) {
  162. e, ok := qr.(map[string]interface{})["error"]
  163. if !ok {
  164. return "", fmt.Errorf("Unexpected response from Prometheus")
  165. }
  166. eStr, ok := e.(string)
  167. return eStr, nil
  168. }