cluster.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/cloud"
  6. "github.com/kubecost/cost-model/pkg/env"
  7. "github.com/kubecost/cost-model/pkg/prom"
  8. "github.com/kubecost/cost-model/pkg/util"
  9. prometheus "github.com/prometheus/client_golang/api"
  10. "k8s.io/klog"
  11. )
  12. const (
  13. queryClusterCores = `sum(
  14. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  15. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  16. ) by (cluster_id)`
  17. queryClusterRAM = `sum(
  18. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  19. ) by (cluster_id)`
  20. queryStorage = `sum(
  21. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  22. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  23. ) by (cluster_id) %s`
  24. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  25. sum(
  26. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  27. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  28. ) by (cluster_id) %s`
  29. queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
  30. )
  31. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  32. // are broken down by cores, memory, and storage.
  33. type ClusterCosts struct {
  34. Start *time.Time `json:"startTime"`
  35. End *time.Time `json:"endTime"`
  36. CPUCumulative float64 `json:"cpuCumulativeCost"`
  37. CPUMonthly float64 `json:"cpuMonthlyCost"`
  38. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  39. GPUCumulative float64 `json:"gpuCumulativeCost"`
  40. GPUMonthly float64 `json:"gpuMonthlyCost"`
  41. RAMCumulative float64 `json:"ramCumulativeCost"`
  42. RAMMonthly float64 `json:"ramMonthlyCost"`
  43. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  44. StorageCumulative float64 `json:"storageCumulativeCost"`
  45. StorageMonthly float64 `json:"storageMonthlyCost"`
  46. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  47. TotalCumulative float64 `json:"totalCumulativeCost"`
  48. TotalMonthly float64 `json:"totalMonthlyCost"`
  49. DataMinutes float64
  50. }
  51. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  52. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  53. type ClusterCostsBreakdown struct {
  54. Idle float64 `json:"idle"`
  55. Other float64 `json:"other"`
  56. System float64 `json:"system"`
  57. User float64 `json:"user"`
  58. }
  59. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  60. // the associated monthly rate data, and returns the Costs.
  61. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
  62. start, end, err := util.ParseTimeRange(window, offset)
  63. if err != nil {
  64. return nil, err
  65. }
  66. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  67. if dataHours == 0 {
  68. dataHours = end.Sub(*start).Hours()
  69. }
  70. // Do not allow zero-length windows to prevent divide-by-zero issues
  71. if dataHours == 0 {
  72. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  73. }
  74. cc := &ClusterCosts{
  75. Start: start,
  76. End: end,
  77. CPUCumulative: cpu,
  78. GPUCumulative: gpu,
  79. RAMCumulative: ram,
  80. StorageCumulative: storage,
  81. TotalCumulative: cpu + gpu + ram + storage,
  82. CPUMonthly: cpu / dataHours * (util.HoursPerMonth),
  83. GPUMonthly: gpu / dataHours * (util.HoursPerMonth),
  84. RAMMonthly: ram / dataHours * (util.HoursPerMonth),
  85. StorageMonthly: storage / dataHours * (util.HoursPerMonth),
  86. }
  87. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  88. return cc, nil
  89. }
  90. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  91. func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
  92. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  93. start, end, err := util.ParseTimeRange(window, offset)
  94. if err != nil {
  95. return nil, err
  96. }
  97. mins := end.Sub(*start).Minutes()
  98. // minsPerResolution determines accuracy and resource use for the following
  99. // queries. Smaller values (higher resolution) result in better accuracy,
  100. // but more expensive queries, and vice-a-versa.
  101. minsPerResolution := 5
  102. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  103. // value, converts it to a cumulative value; i.e.
  104. // [$/hr] * [min/res]*[hr/min] = [$/res]
  105. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  106. const fmtQueryDataCount = `
  107. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (cluster_id)[%s:%dm]%s) * %d
  108. `
  109. const fmtQueryTotalGPU = `
  110. sum(
  111. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  112. ) by (cluster_id)
  113. `
  114. const fmtQueryTotalCPU = `
  115. sum(
  116. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, cluster_id)[%s:%dm]%s) *
  117. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  118. ) by (cluster_id)
  119. `
  120. const fmtQueryTotalRAM = `
  121. sum(
  122. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  123. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  124. ) by (cluster_id)
  125. `
  126. const fmtQueryTotalStorage = `
  127. sum(
  128. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  129. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, cluster_id) * %f
  130. ) by (cluster_id)
  131. `
  132. const fmtQueryCPUModePct = `
  133. sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
  134. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)
  135. `
  136. const fmtQueryRAMSystemPct = `
  137. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (cluster_id)
  138. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  139. `
  140. const fmtQueryRAMUserPct = `
  141. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (cluster_id)
  142. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  143. `
  144. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  145. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  146. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  147. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  148. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  149. if queryTotalLocalStorage != "" {
  150. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  151. }
  152. fmtOffset := ""
  153. if offset != "" {
  154. fmtOffset = fmt.Sprintf("offset %s", offset)
  155. }
  156. queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, minsPerResolution, fmtOffset, minsPerResolution)
  157. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  158. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  159. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  160. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  161. ctx := prom.NewContext(client)
  162. resChs := ctx.QueryAll(
  163. queryDataCount,
  164. queryTotalGPU,
  165. queryTotalCPU,
  166. queryTotalRAM,
  167. queryTotalStorage,
  168. queryTotalLocalStorage,
  169. )
  170. if withBreakdown {
  171. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
  172. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  173. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  174. bdResChs := ctx.QueryAll(
  175. queryCPUModePct,
  176. queryRAMSystemPct,
  177. queryRAMUserPct,
  178. queryUsedLocalStorage,
  179. )
  180. resChs = append(resChs, bdResChs...)
  181. }
  182. defaultClusterID := env.GetClusterID()
  183. dataMinsByCluster := map[string]float64{}
  184. for _, result := range resChs[0].Await() {
  185. clusterID, _ := result.GetString("cluster_id")
  186. if clusterID == "" {
  187. clusterID = defaultClusterID
  188. }
  189. dataMins := mins
  190. if len(result.Values) > 0 {
  191. dataMins = result.Values[0].Value
  192. } else {
  193. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  194. }
  195. dataMinsByCluster[clusterID] = dataMins
  196. }
  197. // Determine combined discount
  198. discount, customDiscount := 0.0, 0.0
  199. c, err := A.Cloud.GetConfig()
  200. if err == nil {
  201. discount, err = ParsePercentString(c.Discount)
  202. if err != nil {
  203. discount = 0.0
  204. }
  205. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  206. if err != nil {
  207. customDiscount = 0.0
  208. }
  209. }
  210. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  211. costData := make(map[string]map[string]float64)
  212. // Helper function to iterate over Prom query results, parsing the raw values into
  213. // the intermediate costData structure.
  214. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  215. for _, result := range results {
  216. clusterID, _ := result.GetString("cluster_id")
  217. if clusterID == "" {
  218. clusterID = defaultClusterID
  219. }
  220. if _, ok := costData[clusterID]; !ok {
  221. costData[clusterID] = map[string]float64{}
  222. }
  223. if len(result.Values) > 0 {
  224. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  225. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  226. }
  227. }
  228. }
  229. // Apply both sustained use and custom discounts to RAM and CPU
  230. setCostsFromResults(costData, resChs[2].Await(), "cpu", discount, customDiscount)
  231. setCostsFromResults(costData, resChs[3].Await(), "ram", discount, customDiscount)
  232. // Apply only custom discount to GPU and storage
  233. setCostsFromResults(costData, resChs[1].Await(), "gpu", 0.0, customDiscount)
  234. setCostsFromResults(costData, resChs[4].Await(), "storage", 0.0, customDiscount)
  235. setCostsFromResults(costData, resChs[5].Await(), "localstorage", 0.0, customDiscount)
  236. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  237. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  238. pvUsedCostMap := map[string]float64{}
  239. if withBreakdown {
  240. for _, result := range resChs[6].Await() {
  241. clusterID, _ := result.GetString("cluster_id")
  242. if clusterID == "" {
  243. clusterID = defaultClusterID
  244. }
  245. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  246. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  247. }
  248. cpuBD := cpuBreakdownMap[clusterID]
  249. mode, err := result.GetString("mode")
  250. if err != nil {
  251. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  252. mode = "other"
  253. }
  254. switch mode {
  255. case "idle":
  256. cpuBD.Idle += result.Values[0].Value
  257. case "system":
  258. cpuBD.System += result.Values[0].Value
  259. case "user":
  260. cpuBD.User += result.Values[0].Value
  261. default:
  262. cpuBD.Other += result.Values[0].Value
  263. }
  264. }
  265. for _, result := range resChs[7].Await() {
  266. clusterID, _ := result.GetString("cluster_id")
  267. if clusterID == "" {
  268. clusterID = defaultClusterID
  269. }
  270. if _, ok := ramBreakdownMap[clusterID]; !ok {
  271. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  272. }
  273. ramBD := ramBreakdownMap[clusterID]
  274. ramBD.System += result.Values[0].Value
  275. }
  276. for _, result := range resChs[8].Await() {
  277. clusterID, _ := result.GetString("cluster_id")
  278. if clusterID == "" {
  279. clusterID = defaultClusterID
  280. }
  281. if _, ok := ramBreakdownMap[clusterID]; !ok {
  282. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  283. }
  284. ramBD := ramBreakdownMap[clusterID]
  285. ramBD.User += result.Values[0].Value
  286. }
  287. for _, ramBD := range ramBreakdownMap {
  288. remaining := 1.0
  289. remaining -= ramBD.Other
  290. remaining -= ramBD.System
  291. remaining -= ramBD.User
  292. ramBD.Idle = remaining
  293. }
  294. for _, result := range resChs[9].Await() {
  295. clusterID, _ := result.GetString("cluster_id")
  296. if clusterID == "" {
  297. clusterID = defaultClusterID
  298. }
  299. pvUsedCostMap[clusterID] += result.Values[0].Value
  300. }
  301. }
  302. // Convert intermediate structure to Costs instances
  303. costsByCluster := map[string]*ClusterCosts{}
  304. for id, cd := range costData {
  305. dataMins, ok := dataMinsByCluster[id]
  306. if !ok {
  307. dataMins = mins
  308. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  309. }
  310. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/util.MinsPerHour)
  311. if err != nil {
  312. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  313. return nil, err
  314. }
  315. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  316. costs.CPUBreakdown = cpuBD
  317. }
  318. if ramBD, ok := ramBreakdownMap[id]; ok {
  319. costs.RAMBreakdown = ramBD
  320. }
  321. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  322. if pvUC, ok := pvUsedCostMap[id]; ok {
  323. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  324. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  325. }
  326. costs.DataMinutes = dataMins
  327. costsByCluster[id] = costs
  328. }
  329. return costsByCluster, nil
  330. }
  331. type Totals struct {
  332. TotalCost [][]string `json:"totalcost"`
  333. CPUCost [][]string `json:"cpucost"`
  334. MemCost [][]string `json:"memcost"`
  335. StorageCost [][]string `json:"storageCost"`
  336. }
  337. func resultToTotals(qr interface{}) ([][]string, error) {
  338. // TODO: Provide an actual query instead of resultToTotals
  339. qResults, err := prom.NewQueryResults("resultToTotals", qr)
  340. if err != nil {
  341. return nil, err
  342. }
  343. results := qResults.Results
  344. if len(results) == 0 {
  345. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  346. }
  347. result := results[0]
  348. totals := [][]string{}
  349. for _, value := range result.Values {
  350. d0 := fmt.Sprintf("%f", value.Timestamp)
  351. d1 := fmt.Sprintf("%f", value.Value)
  352. toAppend := []string{
  353. d0,
  354. d1,
  355. }
  356. totals = append(totals, toAppend)
  357. }
  358. return totals, nil
  359. }
  360. // ClusterCostsOverTime gives the full cluster costs over time
  361. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  362. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
  363. if localStorageQuery != "" {
  364. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  365. }
  366. layout := "2006-01-02T15:04:05.000Z"
  367. start, err := time.Parse(layout, startString)
  368. if err != nil {
  369. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  370. return nil, err
  371. }
  372. end, err := time.Parse(layout, endString)
  373. if err != nil {
  374. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  375. return nil, err
  376. }
  377. window, err := time.ParseDuration(windowString)
  378. if err != nil {
  379. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  380. return nil, err
  381. }
  382. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  383. if offset != "" {
  384. offset = fmt.Sprintf("offset %s", offset)
  385. }
  386. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  387. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  388. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  389. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  390. resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
  391. if err != nil {
  392. return nil, err
  393. }
  394. resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
  395. if err != nil {
  396. return nil, err
  397. }
  398. resultStorage, err := QueryRange(cli, qStorage, start, end, window)
  399. if err != nil {
  400. return nil, err
  401. }
  402. resultTotal, err := QueryRange(cli, qTotal, start, end, window)
  403. if err != nil {
  404. return nil, err
  405. }
  406. coreTotal, err := resultToTotals(resultClusterCores)
  407. if err != nil {
  408. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  409. return nil, err
  410. }
  411. ramTotal, err := resultToTotals(resultClusterRAM)
  412. if err != nil {
  413. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  414. return nil, err
  415. }
  416. storageTotal, err := resultToTotals(resultStorage)
  417. if err != nil {
  418. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  419. }
  420. clusterTotal, err := resultToTotals(resultTotal)
  421. if err != nil {
  422. // If clusterTotal query failed, it's likely because there are no PVs, which
  423. // causes the qTotal query to return no data. Instead, query only node costs.
  424. // If that fails, return an error because something is actually wrong.
  425. qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
  426. resultNodes, err := QueryRange(cli, qNodes, start, end, window)
  427. if err != nil {
  428. return nil, err
  429. }
  430. clusterTotal, err = resultToTotals(resultNodes)
  431. if err != nil {
  432. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  433. return nil, err
  434. }
  435. }
  436. return &Totals{
  437. TotalCost: clusterTotal,
  438. CPUCost: coreTotal,
  439. MemCost: ramTotal,
  440. StorageCost: storageTotal,
  441. }, nil
  442. }