queryresult.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package source
  2. import (
  3. "fmt"
  4. "strings"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/util"
  7. )
  8. // QueryResultsChan is a channel of query results
  9. type QueryResultsChan chan *QueryResults
  10. // Await returns query results, blocking until they are made available, and
  11. // deferring the closure of the underlying channel
  12. func (qrc QueryResultsChan) Await() ([]*QueryResult, error) {
  13. defer close(qrc)
  14. results := <-qrc
  15. if results.Error != nil {
  16. return nil, results.Error
  17. }
  18. return results.Results, nil
  19. }
  20. type ResultKeys struct {
  21. ClusterKey string
  22. NamespaceKey string
  23. NodeKey string
  24. InstanceKey string
  25. InstanceTypeKey string
  26. ContainerKey string
  27. PodKey string
  28. ProviderIDKey string
  29. DeviceKey string
  30. }
  31. func DefaultResultKeys() *ResultKeys {
  32. return &ResultKeys{
  33. ClusterKey: "cluster_id",
  34. NamespaceKey: "namespace",
  35. NodeKey: "node",
  36. InstanceKey: "instance",
  37. InstanceTypeKey: "instance_type",
  38. ContainerKey: "container",
  39. PodKey: "pod",
  40. ProviderIDKey: "provider_id",
  41. DeviceKey: "device",
  42. }
  43. }
  44. func ClusterKeyWithDefaults(clusterKey string) *ResultKeys {
  45. keys := DefaultResultKeys()
  46. keys.ClusterKey = clusterKey
  47. return keys
  48. }
  49. // QueryResults contains all of the query results and the source query string.
  50. type QueryResults struct {
  51. Query string
  52. Error error
  53. Results []*QueryResult
  54. }
  55. func NewQueryResults(query string) *QueryResults {
  56. return &QueryResults{
  57. Query: query,
  58. }
  59. }
  60. // QueryResult contains a single result from a prometheus query. It's common
  61. // to refer to query results as a slice of QueryResult
  62. type QueryResult struct {
  63. Metric map[string]interface{} `json:"metric"`
  64. Values []*util.Vector `json:"values"`
  65. keys *ResultKeys
  66. }
  67. func NewQueryResult(metrics map[string]any, values []*util.Vector, keys *ResultKeys) *QueryResult {
  68. if keys == nil {
  69. keys = DefaultResultKeys()
  70. }
  71. return &QueryResult{
  72. Metric: metrics,
  73. Values: values,
  74. keys: keys,
  75. }
  76. }
  77. func (qr *QueryResult) GetCluster() (string, error) {
  78. return qr.GetString(qr.keys.ClusterKey)
  79. }
  80. func (qr *QueryResult) GetNamespace() (string, error) {
  81. return qr.GetString(qr.keys.NamespaceKey)
  82. }
  83. func (qr *QueryResult) GetNode() (string, error) {
  84. return qr.GetString(qr.keys.NodeKey)
  85. }
  86. func (qr *QueryResult) GetInstance() (string, error) {
  87. return qr.GetString(qr.keys.InstanceKey)
  88. }
  89. func (qr *QueryResult) GetInstanceType() (string, error) {
  90. return qr.GetString(qr.keys.InstanceTypeKey)
  91. }
  92. func (qr *QueryResult) GetContainer() (string, error) {
  93. value, err := qr.GetString(qr.keys.ContainerKey)
  94. if value == "" || err != nil {
  95. alternate, e := qr.GetString(qr.keys.ContainerKey + "_name")
  96. if alternate == "" || e != nil {
  97. return "", fmt.Errorf("'%s' and '%s' fields do not exist in data result vector", qr.keys.ContainerKey, qr.keys.ContainerKey+"_name")
  98. }
  99. return alternate, nil
  100. }
  101. return value, nil
  102. }
  103. func (qr *QueryResult) GetPod() (string, error) {
  104. value, err := qr.GetString(qr.keys.PodKey)
  105. if value == "" || err != nil {
  106. alternate, e := qr.GetString(qr.keys.PodKey + "_name")
  107. if alternate == "" || e != nil {
  108. return "", fmt.Errorf("'%s' and '%s' fields do not exist in data result vector", qr.keys.PodKey, qr.keys.PodKey+"_name")
  109. }
  110. return alternate, nil
  111. }
  112. return value, nil
  113. }
  114. func (qr *QueryResult) GetProviderID() (string, error) {
  115. return qr.GetString(qr.keys.ProviderIDKey)
  116. }
  117. func (qr *QueryResult) GetDevice() (string, error) {
  118. return qr.GetString(qr.keys.DeviceKey)
  119. }
  120. // GetString returns the requested field, or an error if it does not exist
  121. func (qr *QueryResult) GetString(field string) (string, error) {
  122. f, ok := qr.Metric[field]
  123. if !ok {
  124. return "", fmt.Errorf("'%s' field does not exist in data result vector", field)
  125. }
  126. strField, ok := f.(string)
  127. if !ok {
  128. return "", fmt.Errorf("'%s' field is improperly formatted and cannot be converted to string", field)
  129. }
  130. return strField, nil
  131. }
  132. // GetStrings returns the requested fields, or an error if it does not exist
  133. func (qr *QueryResult) GetStrings(fields ...string) (map[string]string, error) {
  134. values := map[string]string{}
  135. for _, field := range fields {
  136. f, ok := qr.Metric[field]
  137. if !ok {
  138. return nil, fmt.Errorf("'%s' field does not exist in data result vector", field)
  139. }
  140. value, ok := f.(string)
  141. if !ok {
  142. return nil, fmt.Errorf("'%s' field is improperly formatted and cannot be converted to string", field)
  143. }
  144. values[field] = value
  145. }
  146. return values, nil
  147. }
  148. // GetLabels returns all labels and their values from the query result
  149. func (qr *QueryResult) GetLabels() map[string]string {
  150. result := make(map[string]string)
  151. // Find All keys with prefix label_, remove prefix, add to labels
  152. for k, v := range qr.Metric {
  153. if !strings.HasPrefix(k, "label_") {
  154. continue
  155. }
  156. label := strings.TrimPrefix(k, "label_")
  157. value, ok := v.(string)
  158. if !ok {
  159. log.Warnf("Failed to parse label value for label: '%s'", label)
  160. continue
  161. }
  162. result[label] = value
  163. }
  164. return result
  165. }
  166. // GetAnnotations returns all annotations and their values from the query result
  167. func (qr *QueryResult) GetAnnotations() map[string]string {
  168. result := make(map[string]string)
  169. // Find All keys with prefix annotation_, remove prefix, add to annotations
  170. for k, v := range qr.Metric {
  171. if !strings.HasPrefix(k, "annotation_") {
  172. continue
  173. }
  174. annotations := strings.TrimPrefix(k, "annotation_")
  175. value, ok := v.(string)
  176. if !ok {
  177. log.Warnf("Failed to parse label value for label: '%s'", annotations)
  178. continue
  179. }
  180. result[annotations] = value
  181. }
  182. return result
  183. }