cluster.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. package costmodel
  2. import (
  3. "fmt"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/kubecost/cost-model/pkg/cloud"
  8. "github.com/kubecost/cost-model/pkg/util"
  9. prometheus "github.com/prometheus/client_golang/api"
  10. "k8s.io/klog"
  11. )
  12. const (
  13. queryClusterCores = `sum(
  14. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  15. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  16. ) by (cluster_id)`
  17. queryClusterRAM = `sum(
  18. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  19. ) by (cluster_id)`
  20. queryStorage = `sum(
  21. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  22. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  23. ) by (cluster_id) %s`
  24. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  25. sum(
  26. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  27. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  28. ) by (cluster_id) %s`
  29. )
  30. // TODO move this to a package-accessible helper
  31. type PromQueryContext struct {
  32. Client prometheus.Client
  33. ErrorCollector *util.ErrorCollector
  34. WaitGroup *sync.WaitGroup
  35. }
  36. // TODO move this to a package-accessible helper function once dependencies are able to
  37. // be extricated from costmodel package (PromQueryResult -> util.Vector). Otherwise, circular deps.
  38. func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
  39. if ctx.WaitGroup != nil {
  40. defer ctx.WaitGroup.Done()
  41. }
  42. raw, promErr := Query(ctx.Client, query)
  43. ctx.ErrorCollector.Report(promErr)
  44. results, parseErr := NewQueryResults(raw)
  45. ctx.ErrorCollector.Report(parseErr)
  46. resultCh <- results
  47. }
  48. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  49. // are broken down by cores, memory, and storage.
  50. type ClusterCosts struct {
  51. Start *time.Time `json:"startTime"`
  52. End *time.Time `json:"endTime"`
  53. CPUCumulative float64 `json:"cpuCumulativeCost"`
  54. CPUMonthly float64 `json:"cpuMonthlyCost"`
  55. CPUMonthlyBreakdown float64 `json:"cpuMonthlyBreakdown"`
  56. GPUCumulative float64 `json:"gpuCumulativeCost"`
  57. GPUMonthly float64 `json:"gpuMonthlyCost"`
  58. RAMCumulative float64 `json:"ramCumulativeCost"`
  59. RAMMonthly float64 `json:"ramMonthlyCost"`
  60. RAMMonthlyBreakdown float64 `json:"ramMonthlyBreakdown"`
  61. StorageCumulative float64 `json:"storageCumulativeCost"`
  62. StorageMonthly float64 `json:"storageMonthlyCost"`
  63. StorageMonthlyBreakdown float64 `json:"storageMonthlyBreakdown"`
  64. TotalCumulative float64 `json:"totalCumulativeCost"`
  65. TotalMonthly float64 `json:"totalMonthlyCost"`
  66. }
  67. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  68. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  69. type ClusterCostsBreakdown struct {
  70. Idle float64 `json:"idle"`
  71. System float64 `json:"system"`
  72. User float64 `json:"user"`
  73. }
  74. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  75. // the associated monthly rate data, and returns the Costs.
  76. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
  77. start, end, err := util.ParseTimeRange(window, offset)
  78. if err != nil {
  79. return nil, err
  80. }
  81. klog.Infof("[Debug] ComputeClusterCosts: dataHours=%f; range dataHours=%f", dataHours, end.Sub(*start).Hours())
  82. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  83. if dataHours == 0 {
  84. dataHours = end.Sub(*start).Hours()
  85. }
  86. // Do not allow zero-length windows to prevent divide-by-zero issues
  87. if dataHours == 0 {
  88. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  89. }
  90. cc := &ClusterCosts{
  91. Start: start,
  92. End: end,
  93. CPUCumulative: cpu,
  94. GPUCumulative: gpu,
  95. RAMCumulative: ram,
  96. StorageCumulative: storage,
  97. TotalCumulative: cpu + gpu + ram + storage,
  98. CPUMonthly: cpu / dataHours * (util.HoursPerMonth),
  99. GPUMonthly: gpu / dataHours * (util.HoursPerMonth),
  100. RAMMonthly: ram / dataHours * (util.HoursPerMonth),
  101. StorageMonthly: storage / dataHours * (util.HoursPerMonth),
  102. }
  103. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  104. return cc, nil
  105. }
  106. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  107. func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
  108. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  109. start, end, err := util.ParseTimeRange(window, offset)
  110. if err != nil {
  111. return nil, err
  112. }
  113. mins := end.Sub(*start).Minutes()
  114. const fmtQueryDataCount = `max(sum(count_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s)) by (node, cluster_id))`
  115. const fmtQueryTotalGPU = `sum(
  116. sum_over_time(node_gpu_hourly_cost[%s:1m]%s) / 60
  117. ) by (cluster_id)`
  118. const fmtQueryTotalCPU = `sum(
  119. sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s)) by (node, cluster_id) *
  120. avg(avg_over_time(node_cpu_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
  121. ) by (cluster_id)`
  122. const fmtQueryTotalRAM = `sum(
  123. sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
  124. avg(avg_over_time(node_ram_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
  125. ) by (cluster_id)`
  126. const fmtQueryTotalStorage = `sum(
  127. sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1m]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
  128. avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
  129. ) by (cluster_id) %s`
  130. const fmtQueryCPUModePct = `sum(rate(node_cpu_seconds_total[%s])) by (mode) / scalar(sum(rate(node_cpu_seconds_total[%s])))`
  131. const fmtQueryRAMSystemPct = `sum(avg_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s]))
  132. / sum(avg(kube_node_status_capacity_memory_bytes) by (node))`
  133. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false)
  134. if queryTotalLocalStorage != "" {
  135. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  136. }
  137. fmtOffset := ""
  138. if offset != "" {
  139. fmtOffset = fmt.Sprintf("offset %s", offset)
  140. }
  141. queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, fmtOffset)
  142. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, fmtOffset)
  143. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
  144. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
  145. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
  146. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, window)
  147. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window)
  148. numQueries := 7
  149. klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
  150. klog.V(4).Infof("[Debug] queryTotalGPU: %s", queryTotalGPU)
  151. klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
  152. klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
  153. klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
  154. klog.V(4).Infof("[Debug] queryCPUModePct: %s", queryCPUModePct)
  155. klog.V(4).Infof("[Debug] queryRAMSystemPct: %s", queryRAMSystemPct)
  156. // Submit queries to Prometheus asynchronously
  157. var ec util.ErrorCollector
  158. var wg sync.WaitGroup
  159. ctx := PromQueryContext{client, &ec, &wg}
  160. ctx.WaitGroup.Add(numQueries)
  161. chDataCount := make(chan []*PromQueryResult, 1)
  162. go AsyncPromQuery(queryDataCount, chDataCount, ctx)
  163. chTotalGPU := make(chan []*PromQueryResult, 1)
  164. go AsyncPromQuery(queryTotalGPU, chTotalGPU, ctx)
  165. chTotalCPU := make(chan []*PromQueryResult, 1)
  166. go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
  167. chTotalRAM := make(chan []*PromQueryResult, 1)
  168. go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
  169. chTotalStorage := make(chan []*PromQueryResult, 1)
  170. go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
  171. chCPUModePct := make(chan []*PromQueryResult, 1)
  172. go AsyncPromQuery(queryCPUModePct, chCPUModePct, ctx)
  173. chRAMSystemPct := make(chan []*PromQueryResult, 1)
  174. go AsyncPromQuery(queryRAMSystemPct, chRAMSystemPct, ctx)
  175. // After queries complete, retrieve results
  176. wg.Wait()
  177. resultsDataCount := <-chDataCount
  178. close(chDataCount)
  179. resultsTotalGPU := <-chTotalGPU
  180. close(chTotalGPU)
  181. resultsTotalCPU := <-chTotalCPU
  182. close(chTotalCPU)
  183. resultsTotalRAM := <-chTotalRAM
  184. close(chTotalRAM)
  185. resultsTotalStorage := <-chTotalStorage
  186. close(chTotalStorage)
  187. dataMins := mins
  188. if len(resultsDataCount) > 0 && len(resultsDataCount[0].Values) > 0 {
  189. dataMins = resultsDataCount[0].Values[0].Value
  190. } else {
  191. klog.V(3).Infof("[Warning] cluster cost data count returned no results")
  192. }
  193. // Determine combined discount
  194. discount, customDiscount := 0.0, 0.0
  195. c, err := A.Cloud.GetConfig()
  196. if err == nil {
  197. discount, err = ParsePercentString(c.Discount)
  198. if err != nil {
  199. discount = 0.0
  200. }
  201. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  202. if err != nil {
  203. customDiscount = 0.0
  204. }
  205. }
  206. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  207. costData := make(map[string]map[string]float64)
  208. defaultClusterID := os.Getenv(clusterIDKey)
  209. // Helper function to iterate over Prom query results, parsing the raw values into
  210. // the intermediate costData structure.
  211. setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string, discount float64, customDiscount float64) {
  212. for _, result := range results {
  213. clusterID, _ := result.GetString("cluster_id")
  214. if clusterID == "" {
  215. clusterID = defaultClusterID
  216. }
  217. if _, ok := costData[clusterID]; !ok {
  218. costData[clusterID] = map[string]float64{}
  219. }
  220. if len(result.Values) > 0 {
  221. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  222. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  223. }
  224. }
  225. }
  226. setCostsFromResults(costData, resultsTotalGPU, "gpu", 0.0, customDiscount)
  227. setCostsFromResults(costData, resultsTotalCPU, "cpu", discount, customDiscount)
  228. setCostsFromResults(costData, resultsTotalRAM, "ram", discount, customDiscount)
  229. setCostsFromResults(costData, resultsTotalStorage, "storage", 0.0, customDiscount)
  230. // Convert intermediate structure to Costs instances
  231. costsByCluster := map[string]*ClusterCosts{}
  232. for id, cd := range costData {
  233. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"], window, offset, dataMins/util.MinsPerHour)
  234. if err != nil {
  235. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  236. return nil, err
  237. }
  238. costsByCluster[id] = costs
  239. }
  240. return costsByCluster, nil
  241. }
  242. type Totals struct {
  243. TotalCost [][]string `json:"totalcost"`
  244. CPUCost [][]string `json:"cpucost"`
  245. MemCost [][]string `json:"memcost"`
  246. StorageCost [][]string `json:"storageCost"`
  247. }
  248. func resultToTotals(qr interface{}) ([][]string, error) {
  249. results, err := NewQueryResults(qr)
  250. if err != nil {
  251. return nil, err
  252. }
  253. if len(results) == 0 {
  254. return nil, fmt.Errorf("Not enough data available in the selected time range")
  255. }
  256. result := results[0]
  257. totals := [][]string{}
  258. for _, value := range result.Values {
  259. d0 := fmt.Sprintf("%f", value.Timestamp)
  260. d1 := fmt.Sprintf("%f", value.Value)
  261. toAppend := []string{
  262. d0,
  263. d1,
  264. }
  265. totals = append(totals, toAppend)
  266. }
  267. return totals, nil
  268. }
  269. func resultToTotal(qr interface{}) (map[string][][]string, error) {
  270. defaultClusterID := os.Getenv(clusterIDKey)
  271. results, err := NewQueryResults(qr)
  272. if err != nil {
  273. return nil, err
  274. }
  275. toReturn := make(map[string][][]string)
  276. for _, result := range results {
  277. clusterID, _ := result.GetString("cluster_id")
  278. if clusterID == "" {
  279. clusterID = defaultClusterID
  280. }
  281. // Expect a single value only
  282. if len(result.Values) == 0 {
  283. klog.V(1).Infof("[Warning] Metric values did not contain any valid data.")
  284. continue
  285. }
  286. value := result.Values[0]
  287. d0 := fmt.Sprintf("%f", value.Timestamp)
  288. d1 := fmt.Sprintf("%f", value.Value)
  289. toAppend := []string{
  290. d0,
  291. d1,
  292. }
  293. if t, ok := toReturn[clusterID]; ok {
  294. t = append(t, toAppend)
  295. } else {
  296. toReturn[clusterID] = [][]string{toAppend}
  297. }
  298. }
  299. return toReturn, nil
  300. }
  301. // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
  302. func ClusterCostsForAllClusters(cli prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*Totals, error) {
  303. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true)
  304. if localStorageQuery != "" {
  305. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  306. }
  307. fmtOffset := ""
  308. if offset != "" {
  309. fmtOffset = fmt.Sprintf("offset %s", offset)
  310. }
  311. qCores := fmt.Sprintf(queryClusterCores, window, fmtOffset, window, fmtOffset, window, fmtOffset)
  312. qRAM := fmt.Sprintf(queryClusterRAM, window, fmtOffset, window, fmtOffset)
  313. qStorage := fmt.Sprintf(queryStorage, window, fmtOffset, window, fmtOffset, localStorageQuery)
  314. klog.V(4).Infof("Running query %s", qCores)
  315. resultClusterCores, err := Query(cli, qCores)
  316. if err != nil {
  317. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  318. }
  319. klog.V(4).Infof("Running query %s", qRAM)
  320. resultClusterRAM, err := Query(cli, qRAM)
  321. if err != nil {
  322. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  323. }
  324. klog.V(4).Infof("Running query %s", qRAM)
  325. resultStorage, err := Query(cli, qStorage)
  326. if err != nil {
  327. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  328. }
  329. toReturn := make(map[string]*Totals)
  330. coreTotal, err := resultToTotal(resultClusterCores)
  331. if err != nil {
  332. return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
  333. }
  334. for clusterID, total := range coreTotal {
  335. if _, ok := toReturn[clusterID]; !ok {
  336. toReturn[clusterID] = &Totals{}
  337. }
  338. toReturn[clusterID].CPUCost = total
  339. }
  340. ramTotal, err := resultToTotal(resultClusterRAM)
  341. if err != nil {
  342. return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
  343. }
  344. for clusterID, total := range ramTotal {
  345. if _, ok := toReturn[clusterID]; !ok {
  346. toReturn[clusterID] = &Totals{}
  347. }
  348. toReturn[clusterID].MemCost = total
  349. }
  350. storageTotal, err := resultToTotal(resultStorage)
  351. if err != nil {
  352. return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
  353. }
  354. for clusterID, total := range storageTotal {
  355. if _, ok := toReturn[clusterID]; !ok {
  356. toReturn[clusterID] = &Totals{}
  357. }
  358. toReturn[clusterID].StorageCost = total
  359. }
  360. return toReturn, nil
  361. }
  362. // AverageClusterTotals gives the current full cluster costs averaged over a window of time.
  363. // Used to be ClutserCosts, but has been deprecated for that use.
  364. func AverageClusterTotals(cli prometheus.Client, provider cloud.Provider, windowString, offset string) (*Totals, error) {
  365. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  366. fmtOffset := ""
  367. if offset != "" {
  368. fmtOffset = fmt.Sprintf("offset %s", offset)
  369. }
  370. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
  371. if localStorageQuery != "" {
  372. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  373. }
  374. qCores := fmt.Sprintf(queryClusterCores, windowString, fmtOffset, windowString, fmtOffset, windowString, fmtOffset)
  375. qRAM := fmt.Sprintf(queryClusterRAM, windowString, fmtOffset, windowString, fmtOffset)
  376. qStorage := fmt.Sprintf(queryStorage, windowString, fmtOffset, windowString, fmtOffset, localStorageQuery)
  377. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  378. resultClusterCores, err := Query(cli, qCores)
  379. if err != nil {
  380. return nil, err
  381. }
  382. resultClusterRAM, err := Query(cli, qRAM)
  383. if err != nil {
  384. return nil, err
  385. }
  386. resultStorage, err := Query(cli, qStorage)
  387. if err != nil {
  388. return nil, err
  389. }
  390. resultTotal, err := Query(cli, qTotal)
  391. if err != nil {
  392. return nil, err
  393. }
  394. coreTotal, err := resultToTotal(resultClusterCores)
  395. if err != nil {
  396. return nil, err
  397. }
  398. ramTotal, err := resultToTotal(resultClusterRAM)
  399. if err != nil {
  400. return nil, err
  401. }
  402. storageTotal, err := resultToTotal(resultStorage)
  403. if err != nil {
  404. return nil, err
  405. }
  406. clusterTotal, err := resultToTotal(resultTotal)
  407. if err != nil {
  408. return nil, err
  409. }
  410. defaultClusterID := os.Getenv(clusterIDKey)
  411. return &Totals{
  412. TotalCost: clusterTotal[defaultClusterID],
  413. CPUCost: coreTotal[defaultClusterID],
  414. MemCost: ramTotal[defaultClusterID],
  415. StorageCost: storageTotal[defaultClusterID],
  416. }, nil
  417. }
  418. // ClusterCostsOverTime gives the full cluster costs over time
  419. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  420. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
  421. if localStorageQuery != "" {
  422. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  423. }
  424. layout := "2006-01-02T15:04:05.000Z"
  425. start, err := time.Parse(layout, startString)
  426. if err != nil {
  427. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  428. return nil, err
  429. }
  430. end, err := time.Parse(layout, endString)
  431. if err != nil {
  432. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  433. return nil, err
  434. }
  435. window, err := time.ParseDuration(windowString)
  436. if err != nil {
  437. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  438. return nil, err
  439. }
  440. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  441. if offset != "" {
  442. offset = fmt.Sprintf("offset %s", offset)
  443. }
  444. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  445. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  446. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  447. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  448. resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
  449. if err != nil {
  450. return nil, err
  451. }
  452. resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
  453. if err != nil {
  454. return nil, err
  455. }
  456. resultStorage, err := QueryRange(cli, qStorage, start, end, window)
  457. if err != nil {
  458. return nil, err
  459. }
  460. resultTotal, err := QueryRange(cli, qTotal, start, end, window)
  461. if err != nil {
  462. return nil, err
  463. }
  464. coreTotal, err := resultToTotals(resultClusterCores)
  465. if err != nil {
  466. return nil, err
  467. }
  468. ramTotal, err := resultToTotals(resultClusterRAM)
  469. if err != nil {
  470. return nil, err
  471. }
  472. storageTotal, err := resultToTotals(resultStorage)
  473. if err != nil {
  474. return nil, err
  475. }
  476. clusterTotal, err := resultToTotals(resultTotal)
  477. if err != nil {
  478. return nil, err
  479. }
  480. return &Totals{
  481. TotalCost: clusterTotal,
  482. CPUCost: coreTotal,
  483. MemCost: ramTotal,
  484. StorageCost: storageTotal,
  485. }, nil
  486. }