allocation_running_pods_test.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package count
  2. // Description - Checks for the aggregate count of pods for each namespace from prometheus request
  3. // and allocation API request are the same
  4. // Both prometheus and allocation seem to be returning duplicate results. Does this we might be double counting costs times?
  5. import (
  6. // "fmt"
  7. "slices"
  8. "sort"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/opencost/opencost-integration-tests/pkg/api"
  13. "github.com/opencost/opencost-integration-tests/pkg/prometheus"
  14. )
  15. func TestQueryAllocation(t *testing.T) {
  16. apiObj := api.NewAPI()
  17. testCases := []struct {
  18. name string
  19. window string
  20. aggregate string
  21. accumulate string
  22. }{
  23. {
  24. name: "Yesterday",
  25. window: "24h",
  26. aggregate: "pod",
  27. accumulate: "false",
  28. },
  29. }
  30. t.Logf("testCases: %v", testCases)
  31. for _, tc := range testCases {
  32. t.Run(tc.name, func(t *testing.T) {
  33. // API Client
  34. apiResponse, err := apiObj.GetAllocation(api.AllocationRequest{
  35. Window: tc.window,
  36. Aggregate: tc.aggregate,
  37. Accumulate: tc.accumulate,
  38. })
  39. if err != nil {
  40. t.Fatalf("Error while calling Allocation API %v", err)
  41. }
  42. if apiResponse.Code != 200 {
  43. t.Errorf("API returned non-200 code")
  44. }
  45. queryEnd := time.Now().UTC().Truncate(time.Hour).Add(time.Hour)
  46. endTime := queryEnd.Unix()
  47. // Prometheus Client
  48. // Want to Run avg(avg_over_time(kube_pod_container_status_running[24h]) != 0) by (container, pod, namespace)
  49. // Running avg(avg_over_time(kube_pod_container_status_running[24h])) by (container, pod, namespace)
  50. client := prometheus.NewClient()
  51. promInput := prometheus.PrometheusInput{
  52. Metric: "kube_pod_container_status_running",
  53. // MetricNotEqualTo: "0",
  54. Function: []string{"avg_over_time", "avg"},
  55. QueryWindow: tc.window,
  56. AggregateBy: []string{"container", "pod", "namespace"},
  57. Time: &endTime,
  58. }
  59. promResponse, err := client.RunPromQLQuery(promInput, t)
  60. if err != nil {
  61. t.Fatalf("Error while calling Prometheus API %v", err)
  62. }
  63. // Narrow the Prometheus pod set to pods alive at the query
  64. // endTime using a 1m-resolution subquery. Without this,
  65. // pods that were only very briefly running inside the 24h
  66. // window show up in Prometheus (as their avg_over_time is
  67. // non-zero) but are absent from /allocation, which only
  68. // reports pods with coincident usage samples. That is a
  69. // window-boundary race, not a pod-count bug.
  70. promAliveInput := prometheus.PrometheusInput{
  71. Metric: "kube_pod_container_status_running",
  72. MetricNotEqualTo: "0",
  73. Function: []string{"avg"},
  74. AggregateBy: []string{"container", "pod", "namespace", "node"},
  75. AggregateWindow: tc.window,
  76. AggregateResolution: "1m",
  77. Time: &endTime,
  78. }
  79. promAliveResponse, err := client.RunPromQLQuery(promAliveInput, t)
  80. if err != nil {
  81. t.Fatalf("Error while calling Prometheus API %v", err)
  82. }
  83. alivePods := make(map[string]bool)
  84. for _, metric := range promAliveResponse.Data.Result {
  85. alivePods[metric.Metric.Pod] = true
  86. }
  87. // Calculate Number of Pods per Aggregate for API Object
  88. type podAggregation struct {
  89. Pods []string
  90. }
  91. // Namespace based calculation
  92. var apiAggregateCount = make(map[string]*podAggregation)
  93. for pod, allocationResponeItem := range apiResponse.Data[0] {
  94. // Synthetic value generated and returned by /allocation and not /prometheus
  95. if slices.Contains([]string{"prometheus-system-unmounted-pvcs", "network-load-gen-unmounted-pvcs"}, pod) {
  96. continue
  97. }
  98. podNamespace := allocationResponeItem.Properties.Namespace
  99. apiAggregateItem, namespacePresent := apiAggregateCount[podNamespace]
  100. if !namespacePresent {
  101. apiAggregateCount[podNamespace] = &podAggregation{
  102. Pods: []string{pod},
  103. }
  104. continue
  105. }
  106. if allocationResponeItem.Properties.Pod == "" {
  107. continue
  108. }
  109. if !slices.Contains(apiAggregateItem.Pods, pod) {
  110. apiAggregateItem.Pods = append(apiAggregateItem.Pods, pod)
  111. }
  112. }
  113. // Calculate Number of Pods per Aggregate for Prom Object
  114. var promAggregateCount = make(map[string]*podAggregation)
  115. for _, metric := range promResponse.Data.Result {
  116. podNamespace := metric.Metric.Namespace
  117. pod := metric.Metric.Pod
  118. // This pod was down, unable to do it with the query
  119. if metric.Value.Value == 0 {
  120. continue
  121. }
  122. // Skip pods that are not alive at the query end time.
  123. // /allocation only returns pods with usage data in the
  124. // window, so short-lived pods that were up earlier in
  125. // the 24h window but not at endTime would otherwise
  126. // produce spurious mismatches.
  127. if !alivePods[pod] {
  128. continue
  129. }
  130. promAggregateItem, namespacePresent := promAggregateCount[podNamespace]
  131. if !namespacePresent {
  132. promAggregateCount[podNamespace] = &podAggregation{
  133. Pods: []string{pod},
  134. }
  135. continue
  136. }
  137. if !slices.Contains(promAggregateItem.Pods, pod) {
  138. promAggregateItem.Pods = append(promAggregateItem.Pods, pod)
  139. }
  140. }
  141. if len(promAggregateCount) != len(apiAggregateCount) {
  142. t.Logf("Namespace Count Allocation %d != Prometheus %d", len(apiAggregateCount), len(promAggregateCount))
  143. }
  144. for namespace, _ := range promAggregateCount {
  145. apiNamespaceCount, apiNamespacePresent := apiAggregateCount[namespace]
  146. promNamespaceCount, promNamespacePresent := promAggregateCount[namespace]
  147. if apiNamespacePresent && promNamespacePresent {
  148. t.Logf("Namespace: %s", namespace)
  149. sort.Strings(apiNamespaceCount.Pods)
  150. sort.Strings(promNamespaceCount.Pods)
  151. if len(apiNamespaceCount.Pods) != len(promNamespaceCount.Pods) {
  152. t.Errorf("[Fail]: /allocation (%d) != Prometheus (%d)", len(apiNamespaceCount.Pods), len(promNamespaceCount.Pods))
  153. t.Errorf("API Pods:\n - %v\nPrometheus Pods:\n - %v", strings.Join(apiNamespaceCount.Pods, "\n - "), strings.Join(promNamespaceCount.Pods, "\n - "))
  154. } else {
  155. t.Logf("[Pass]: Pod Count %d", len(apiNamespaceCount.Pods))
  156. }
  157. } else {
  158. t.Errorf("Namespace Missing: Prometheus(%v), allocation API(%v)", apiNamespacePresent, promNamespacePresent)
  159. }
  160. }
  161. })
  162. }
  163. }