cluster.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. package costmodel
  2. import (
  3. "fmt"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/kubecost/cost-model/pkg/cloud"
  8. "github.com/kubecost/cost-model/pkg/util"
  9. prometheus "github.com/prometheus/client_golang/api"
  10. "k8s.io/klog"
  11. )
  12. const (
  13. queryClusterCores = `sum(
  14. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  15. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  16. ) by (cluster_id)`
  17. queryClusterRAM = `sum(
  18. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  19. ) by (cluster_id)`
  20. queryStorage = `sum(
  21. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  22. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  23. ) by (cluster_id) %s`
  24. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  25. sum(
  26. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  27. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  28. ) by (cluster_id) %s`
  29. queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
  30. )
  31. // TODO move this to a package-accessible helper
  32. type PromQueryContext struct {
  33. Client prometheus.Client
  34. ErrorCollector *util.ErrorCollector
  35. WaitGroup *sync.WaitGroup
  36. }
  37. // TODO move this to a package-accessible helper function once dependencies are able to
  38. // be extricated from costmodel package (PromQueryResult -> util.Vector). Otherwise, circular deps.
  39. func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
  40. if ctx.WaitGroup != nil {
  41. defer ctx.WaitGroup.Done()
  42. }
  43. raw, promErr := Query(ctx.Client, query)
  44. ctx.ErrorCollector.Report(promErr)
  45. results, parseErr := NewQueryResults(raw)
  46. ctx.ErrorCollector.Report(parseErr)
  47. resultCh <- results
  48. }
  49. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  50. // are broken down by cores, memory, and storage.
  51. type ClusterCosts struct {
  52. Start *time.Time `json:"startTime"`
  53. End *time.Time `json:"endTime"`
  54. CPUCumulative float64 `json:"cpuCumulativeCost"`
  55. CPUMonthly float64 `json:"cpuMonthlyCost"`
  56. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  57. GPUCumulative float64 `json:"gpuCumulativeCost"`
  58. GPUMonthly float64 `json:"gpuMonthlyCost"`
  59. RAMCumulative float64 `json:"ramCumulativeCost"`
  60. RAMMonthly float64 `json:"ramMonthlyCost"`
  61. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  62. StorageCumulative float64 `json:"storageCumulativeCost"`
  63. StorageMonthly float64 `json:"storageMonthlyCost"`
  64. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  65. TotalCumulative float64 `json:"totalCumulativeCost"`
  66. TotalMonthly float64 `json:"totalMonthlyCost"`
  67. }
  68. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  69. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  70. type ClusterCostsBreakdown struct {
  71. Idle float64 `json:"idle"`
  72. Other float64 `json:"other"`
  73. System float64 `json:"system"`
  74. User float64 `json:"user"`
  75. }
  76. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  77. // the associated monthly rate data, and returns the Costs.
  78. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
  79. start, end, err := util.ParseTimeRange(window, offset)
  80. if err != nil {
  81. return nil, err
  82. }
  83. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  84. if dataHours == 0 {
  85. dataHours = end.Sub(*start).Hours()
  86. }
  87. // Do not allow zero-length windows to prevent divide-by-zero issues
  88. if dataHours == 0 {
  89. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  90. }
  91. cc := &ClusterCosts{
  92. Start: start,
  93. End: end,
  94. CPUCumulative: cpu,
  95. GPUCumulative: gpu,
  96. RAMCumulative: ram,
  97. StorageCumulative: storage,
  98. TotalCumulative: cpu + gpu + ram + storage,
  99. CPUMonthly: cpu / dataHours * (util.HoursPerMonth),
  100. GPUMonthly: gpu / dataHours * (util.HoursPerMonth),
  101. RAMMonthly: ram / dataHours * (util.HoursPerMonth),
  102. StorageMonthly: storage / dataHours * (util.HoursPerMonth),
  103. }
  104. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  105. return cc, nil
  106. }
  107. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  108. func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
  109. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  110. start, end, err := util.ParseTimeRange(window, offset)
  111. if err != nil {
  112. return nil, err
  113. }
  114. mins := end.Sub(*start).Minutes()
  115. const fmtQueryDataCount = `count_over_time(sum(kube_node_status_capacity_cpu_cores) by (cluster_id)[%s:1m]%s)`
  116. const fmtQueryTotalGPU = `sum(
  117. sum_over_time(node_gpu_hourly_cost[%s:1m]%s) / 60
  118. ) by (cluster_id)`
  119. const fmtQueryTotalCPU = `sum(
  120. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, cluster_id)[%s:1m]%s) *
  121. avg(avg_over_time(node_cpu_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
  122. ) by (cluster_id)`
  123. const fmtQueryTotalRAM = `sum(
  124. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, cluster_id)[%s:1m]%s) / 1024 / 1024 / 1024 *
  125. avg(avg_over_time(node_ram_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
  126. ) by (cluster_id)`
  127. const fmtQueryTotalStorage = `sum(
  128. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:1m]%s) / 1024 / 1024 / 1024 *
  129. avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
  130. ) by (cluster_id) %s`
  131. const fmtQueryCPUModePct = `sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
  132. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)`
  133. const fmtQueryRAMSystemPct = `sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:1m]%s)) by (cluster_id)
  134. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s)) by (cluster_id)`
  135. const fmtQueryRAMUserPct = `sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:1m]%s)) by (cluster_id)
  136. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s)) by (cluster_id)`
  137. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  138. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  139. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  140. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  141. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  142. if queryTotalLocalStorage != "" {
  143. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  144. }
  145. fmtOffset := ""
  146. if offset != "" {
  147. fmtOffset = fmt.Sprintf("offset %s", offset)
  148. }
  149. queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, fmtOffset)
  150. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, fmtOffset)
  151. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
  152. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
  153. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
  154. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
  155. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, fmtOffset, window, fmtOffset)
  156. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, fmtOffset, window, fmtOffset)
  157. numQueries := 9
  158. klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
  159. klog.V(4).Infof("[Debug] queryTotalGPU: %s", queryTotalGPU)
  160. klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
  161. klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
  162. klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
  163. klog.V(4).Infof("[Debug] queryCPUModePct: %s", queryCPUModePct)
  164. klog.V(4).Infof("[Debug] queryRAMSystemPct: %s", queryRAMSystemPct)
  165. klog.V(4).Infof("[Debug] queryRAMUserPct: %s", queryRAMUserPct)
  166. klog.V(4).Infof("[Debug] queryUsedLocalStorage: %s", queryUsedLocalStorage)
  167. // Submit queries to Prometheus asynchronously
  168. var ec util.ErrorCollector
  169. var wg sync.WaitGroup
  170. ctx := PromQueryContext{client, &ec, &wg}
  171. ctx.WaitGroup.Add(numQueries)
  172. chDataCount := make(chan []*PromQueryResult, 1)
  173. go AsyncPromQuery(queryDataCount, chDataCount, ctx)
  174. chTotalGPU := make(chan []*PromQueryResult, 1)
  175. go AsyncPromQuery(queryTotalGPU, chTotalGPU, ctx)
  176. chTotalCPU := make(chan []*PromQueryResult, 1)
  177. go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
  178. chTotalRAM := make(chan []*PromQueryResult, 1)
  179. go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
  180. chTotalStorage := make(chan []*PromQueryResult, 1)
  181. go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
  182. chCPUModePct := make(chan []*PromQueryResult, 1)
  183. go AsyncPromQuery(queryCPUModePct, chCPUModePct, ctx)
  184. chRAMSystemPct := make(chan []*PromQueryResult, 1)
  185. go AsyncPromQuery(queryRAMSystemPct, chRAMSystemPct, ctx)
  186. chRAMUserPct := make(chan []*PromQueryResult, 1)
  187. go AsyncPromQuery(queryRAMUserPct, chRAMUserPct, ctx)
  188. chUsedLocalStorage := make(chan []*PromQueryResult, 1)
  189. go AsyncPromQuery(queryUsedLocalStorage, chUsedLocalStorage, ctx)
  190. // After queries complete, retrieve results
  191. wg.Wait()
  192. resultsDataCount := <-chDataCount
  193. close(chDataCount)
  194. resultsTotalGPU := <-chTotalGPU
  195. close(chTotalGPU)
  196. resultsTotalCPU := <-chTotalCPU
  197. close(chTotalCPU)
  198. resultsTotalRAM := <-chTotalRAM
  199. close(chTotalRAM)
  200. resultsTotalStorage := <-chTotalStorage
  201. close(chTotalStorage)
  202. resultsCPUModePct := <-chCPUModePct
  203. close(chCPUModePct)
  204. resultsRAMSystemPct := <-chRAMSystemPct
  205. close(chRAMSystemPct)
  206. resultsRAMUserPct := <-chRAMUserPct
  207. close(chRAMUserPct)
  208. resultsUsedLocalStorage := <-chUsedLocalStorage
  209. close(chUsedLocalStorage)
  210. defaultClusterID := os.Getenv(clusterIDKey)
  211. dataMinsByCluster := map[string]float64{}
  212. for _, result := range resultsDataCount {
  213. clusterID, _ := result.GetString("cluster_id")
  214. if clusterID == "" {
  215. clusterID = defaultClusterID
  216. }
  217. dataMins := mins
  218. if len(result.Values) > 0 {
  219. dataMins = result.Values[0].Value
  220. } else {
  221. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  222. }
  223. dataMinsByCluster[clusterID] = dataMins
  224. }
  225. // Determine combined discount
  226. discount, customDiscount := 0.0, 0.0
  227. c, err := A.Cloud.GetConfig()
  228. if err == nil {
  229. discount, err = ParsePercentString(c.Discount)
  230. if err != nil {
  231. discount = 0.0
  232. }
  233. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  234. if err != nil {
  235. customDiscount = 0.0
  236. }
  237. }
  238. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  239. costData := make(map[string]map[string]float64)
  240. // Helper function to iterate over Prom query results, parsing the raw values into
  241. // the intermediate costData structure.
  242. setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string, discount float64, customDiscount float64) {
  243. for _, result := range results {
  244. clusterID, _ := result.GetString("cluster_id")
  245. if clusterID == "" {
  246. clusterID = defaultClusterID
  247. }
  248. if _, ok := costData[clusterID]; !ok {
  249. costData[clusterID] = map[string]float64{}
  250. }
  251. if len(result.Values) > 0 {
  252. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  253. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  254. }
  255. }
  256. }
  257. // Apply both sustained use and custom discounts to RAM and CPU
  258. setCostsFromResults(costData, resultsTotalCPU, "cpu", discount, customDiscount)
  259. setCostsFromResults(costData, resultsTotalRAM, "ram", discount, customDiscount)
  260. // Apply only custom discount to GPU and storage
  261. setCostsFromResults(costData, resultsTotalGPU, "gpu", 0.0, customDiscount)
  262. setCostsFromResults(costData, resultsTotalStorage, "storage", 0.0, customDiscount)
  263. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  264. for _, result := range resultsCPUModePct {
  265. clusterID, _ := result.GetString("cluster_id")
  266. if clusterID == "" {
  267. clusterID = defaultClusterID
  268. }
  269. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  270. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  271. }
  272. cpuBD := cpuBreakdownMap[clusterID]
  273. mode, err := result.GetString("mode")
  274. if err != nil {
  275. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  276. mode = "other"
  277. }
  278. switch mode {
  279. case "idle":
  280. cpuBD.Idle += result.Values[0].Value
  281. case "system":
  282. cpuBD.System += result.Values[0].Value
  283. case "user":
  284. cpuBD.User += result.Values[0].Value
  285. default:
  286. cpuBD.Other += result.Values[0].Value
  287. }
  288. }
  289. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  290. for _, result := range resultsRAMSystemPct {
  291. clusterID, _ := result.GetString("cluster_id")
  292. if clusterID == "" {
  293. clusterID = defaultClusterID
  294. }
  295. if _, ok := ramBreakdownMap[clusterID]; !ok {
  296. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  297. }
  298. ramBD := ramBreakdownMap[clusterID]
  299. ramBD.System += result.Values[0].Value
  300. }
  301. for _, result := range resultsRAMUserPct {
  302. clusterID, _ := result.GetString("cluster_id")
  303. if clusterID == "" {
  304. clusterID = defaultClusterID
  305. }
  306. if _, ok := ramBreakdownMap[clusterID]; !ok {
  307. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  308. }
  309. ramBD := ramBreakdownMap[clusterID]
  310. ramBD.User += result.Values[0].Value
  311. }
  312. for _, ramBD := range ramBreakdownMap {
  313. remaining := 1.0
  314. remaining -= ramBD.Other
  315. remaining -= ramBD.System
  316. remaining -= ramBD.User
  317. ramBD.Idle = remaining
  318. }
  319. pvUsedCostMap := map[string]float64{}
  320. for _, result := range resultsUsedLocalStorage {
  321. clusterID, _ := result.GetString("cluster_id")
  322. if clusterID == "" {
  323. clusterID = defaultClusterID
  324. }
  325. pvUsedCostMap[clusterID] += result.Values[0].Value
  326. }
  327. // Convert intermediate structure to Costs instances
  328. costsByCluster := map[string]*ClusterCosts{}
  329. for id, cd := range costData {
  330. dataMins, ok := dataMinsByCluster[id]
  331. if !ok {
  332. dataMins = mins
  333. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  334. }
  335. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"], window, offset, dataMins/util.MinsPerHour)
  336. if err != nil {
  337. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  338. return nil, err
  339. }
  340. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  341. costs.CPUBreakdown = cpuBD
  342. }
  343. if ramBD, ok := ramBreakdownMap[id]; ok {
  344. costs.RAMBreakdown = ramBD
  345. }
  346. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  347. if pvUC, ok := pvUsedCostMap[id]; ok {
  348. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  349. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  350. }
  351. costsByCluster[id] = costs
  352. }
  353. return costsByCluster, nil
  354. }
  355. type Totals struct {
  356. TotalCost [][]string `json:"totalcost"`
  357. CPUCost [][]string `json:"cpucost"`
  358. MemCost [][]string `json:"memcost"`
  359. StorageCost [][]string `json:"storageCost"`
  360. }
  361. func resultToTotals(qr interface{}) ([][]string, error) {
  362. results, err := NewQueryResults(qr)
  363. if err != nil {
  364. return nil, err
  365. }
  366. if len(results) == 0 {
  367. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  368. }
  369. result := results[0]
  370. totals := [][]string{}
  371. for _, value := range result.Values {
  372. d0 := fmt.Sprintf("%f", value.Timestamp)
  373. d1 := fmt.Sprintf("%f", value.Value)
  374. toAppend := []string{
  375. d0,
  376. d1,
  377. }
  378. totals = append(totals, toAppend)
  379. }
  380. return totals, nil
  381. }
  382. // ClusterCostsOverTime gives the full cluster costs over time
  383. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  384. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
  385. if localStorageQuery != "" {
  386. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  387. }
  388. layout := "2006-01-02T15:04:05.000Z"
  389. start, err := time.Parse(layout, startString)
  390. if err != nil {
  391. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  392. return nil, err
  393. }
  394. end, err := time.Parse(layout, endString)
  395. if err != nil {
  396. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  397. return nil, err
  398. }
  399. window, err := time.ParseDuration(windowString)
  400. if err != nil {
  401. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  402. return nil, err
  403. }
  404. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  405. if offset != "" {
  406. offset = fmt.Sprintf("offset %s", offset)
  407. }
  408. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  409. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  410. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  411. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  412. resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
  413. if err != nil {
  414. return nil, err
  415. }
  416. resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
  417. if err != nil {
  418. return nil, err
  419. }
  420. resultStorage, err := QueryRange(cli, qStorage, start, end, window)
  421. if err != nil {
  422. return nil, err
  423. }
  424. resultTotal, err := QueryRange(cli, qTotal, start, end, window)
  425. if err != nil {
  426. return nil, err
  427. }
  428. coreTotal, err := resultToTotals(resultClusterCores)
  429. if err != nil {
  430. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  431. return nil, err
  432. }
  433. ramTotal, err := resultToTotals(resultClusterRAM)
  434. if err != nil {
  435. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  436. return nil, err
  437. }
  438. storageTotal, err := resultToTotals(resultStorage)
  439. if err != nil {
  440. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  441. }
  442. clusterTotal, err := resultToTotals(resultTotal)
  443. if err != nil {
  444. // If clusterTotal query failed, it's likely because there are no PVs, which
  445. // causes the qTotal query to return no data. Instead, query only node costs.
  446. // If that fails, return an error because something is actually wrong.
  447. qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
  448. resultNodes, err := QueryRange(cli, qNodes, start, end, window)
  449. if err != nil {
  450. return nil, err
  451. }
  452. clusterTotal, err = resultToTotals(resultNodes)
  453. if err != nil {
  454. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  455. return nil, err
  456. }
  457. }
  458. return &Totals{
  459. TotalCost: clusterTotal,
  460. CPUCost: coreTotal,
  461. MemCost: ramTotal,
  462. StorageCost: storageTotal,
  463. }, nil
  464. }