cluster.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/cloud"
  6. "github.com/kubecost/cost-model/pkg/env"
  7. "github.com/kubecost/cost-model/pkg/log"
  8. "github.com/kubecost/cost-model/pkg/prom"
  9. "github.com/kubecost/cost-model/pkg/util"
  10. prometheus "github.com/prometheus/client_golang/api"
  11. "k8s.io/klog"
  12. )
  13. const (
  14. queryClusterCores = `sum(
  15. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  16. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryClusterRAM = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  20. ) by (cluster_id)`
  21. queryStorage = `sum(
  22. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  23. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  24. ) by (cluster_id) %s`
  25. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  26. sum(
  27. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  29. ) by (cluster_id) %s`
  30. queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
  31. )
  32. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  33. // are broken down by cores, memory, and storage.
  34. type ClusterCosts struct {
  35. Start *time.Time `json:"startTime"`
  36. End *time.Time `json:"endTime"`
  37. CPUCumulative float64 `json:"cpuCumulativeCost"`
  38. CPUMonthly float64 `json:"cpuMonthlyCost"`
  39. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  40. GPUCumulative float64 `json:"gpuCumulativeCost"`
  41. GPUMonthly float64 `json:"gpuMonthlyCost"`
  42. RAMCumulative float64 `json:"ramCumulativeCost"`
  43. RAMMonthly float64 `json:"ramMonthlyCost"`
  44. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  45. StorageCumulative float64 `json:"storageCumulativeCost"`
  46. StorageMonthly float64 `json:"storageMonthlyCost"`
  47. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  48. TotalCumulative float64 `json:"totalCumulativeCost"`
  49. TotalMonthly float64 `json:"totalMonthlyCost"`
  50. DataMinutes float64
  51. }
  52. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  53. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  54. type ClusterCostsBreakdown struct {
  55. Idle float64 `json:"idle"`
  56. Other float64 `json:"other"`
  57. System float64 `json:"system"`
  58. User float64 `json:"user"`
  59. }
  60. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  61. // the associated monthly rate data, and returns the Costs.
  62. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
  63. start, end, err := util.ParseTimeRange(window, offset)
  64. if err != nil {
  65. return nil, err
  66. }
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(*start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: start,
  77. End: end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (util.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (util.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (util.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (util.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. Cost float64
  96. Bytes float64
  97. Local bool
  98. }
  99. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, []error) {
  100. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  101. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  102. if offset < time.Minute {
  103. offsetStr = ""
  104. }
  105. // minsPerResolution determines accuracy and resource use for the following
  106. // queries. Smaller values (higher resolution) result in better accuracy,
  107. // but more expensive queries, and vice-a-versa.
  108. minsPerResolution := 5
  109. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  110. // value, converts it to a cumulative value; i.e.
  111. // [$/hr] * [min/res]*[hr/min] = [$/res]
  112. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  113. // TODO niko/assets how do we not hard-code this price?
  114. costPerGBHr := 0.04 / 730.0
  115. ctx := prom.NewContext(client)
  116. queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  117. queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  118. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  119. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  120. resChPVCost := ctx.Query(queryPVCost)
  121. resChPVSize := ctx.Query(queryPVSize)
  122. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  123. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  124. resPVCost, _ := resChPVCost.Await()
  125. resPVSize, _ := resChPVSize.Await()
  126. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  127. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  128. if ctx.ErrorCollector.IsError() {
  129. return nil, ctx.Errors()
  130. }
  131. diskMap := map[string]*Disk{}
  132. for _, result := range resPVCost {
  133. cluster, err := result.GetString("cluster_id")
  134. if err != nil {
  135. cluster = env.GetClusterID()
  136. }
  137. name, err := result.GetString("persistentvolume")
  138. if err != nil {
  139. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  140. continue
  141. }
  142. // TODO niko/assets storage class
  143. cost := result.Values[0].Value
  144. key := fmt.Sprintf("%s/%s", cluster, name)
  145. if _, ok := diskMap[key]; !ok {
  146. diskMap[key] = &Disk{
  147. Cluster: cluster,
  148. Name: name,
  149. }
  150. }
  151. diskMap[key].Cost += cost
  152. }
  153. for _, result := range resPVSize {
  154. cluster, err := result.GetString("cluster_id")
  155. if err != nil {
  156. cluster = env.GetClusterID()
  157. }
  158. name, err := result.GetString("persistentvolume")
  159. if err != nil {
  160. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  161. continue
  162. }
  163. // TODO niko/assets storage class
  164. bytes := result.Values[0].Value
  165. key := fmt.Sprintf("%s/%s", cluster, name)
  166. if _, ok := diskMap[key]; !ok {
  167. diskMap[key] = &Disk{
  168. Cluster: cluster,
  169. Name: name,
  170. }
  171. }
  172. diskMap[key].Bytes = bytes
  173. }
  174. for _, result := range resLocalStorageCost {
  175. cluster, err := result.GetString("cluster_id")
  176. if err != nil {
  177. cluster = env.GetClusterID()
  178. }
  179. name, err := result.GetString("instance")
  180. if err != nil {
  181. log.Warningf("ClusterDisks: local storage data missing instance")
  182. continue
  183. }
  184. // TODO niko/assets storage class?
  185. cost := result.Values[0].Value
  186. key := fmt.Sprintf("%s/%s", cluster, name)
  187. if _, ok := diskMap[key]; !ok {
  188. diskMap[key] = &Disk{
  189. Cluster: cluster,
  190. Name: name,
  191. Local: true,
  192. }
  193. }
  194. diskMap[key].Cost += cost
  195. }
  196. for _, result := range resLocalStorageBytes {
  197. cluster, err := result.GetString("cluster_id")
  198. if err != nil {
  199. cluster = env.GetClusterID()
  200. }
  201. name, err := result.GetString("instance")
  202. if err != nil {
  203. log.Warningf("ClusterDisks: local storage data missing instance")
  204. continue
  205. }
  206. // TODO niko/assets storage class
  207. bytes := result.Values[0].Value
  208. key := fmt.Sprintf("%s/%s", cluster, name)
  209. if _, ok := diskMap[key]; !ok {
  210. diskMap[key] = &Disk{
  211. Cluster: cluster,
  212. Name: name,
  213. Local: true,
  214. }
  215. }
  216. diskMap[key].Bytes = bytes
  217. }
  218. return diskMap, nil
  219. }
  220. type Node struct {
  221. Cluster string
  222. Name string
  223. ProviderID string
  224. NodeType string
  225. CPUCost float64
  226. CPUCores float64
  227. GPUCost float64
  228. RAMCost float64
  229. RAMBytes float64
  230. Discount float64
  231. Preemptible bool
  232. }
  233. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
  234. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  235. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  236. if offset < time.Minute {
  237. offsetStr = ""
  238. }
  239. // minsPerResolution determines accuracy and resource use for the following
  240. // queries. Smaller values (higher resolution) result in better accuracy,
  241. // but more expensive queries, and vice-a-versa.
  242. minsPerResolution := 5
  243. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  244. // value, converts it to a cumulative value; i.e.
  245. // [$/hr] * [min/res]*[hr/min] = [$/res]
  246. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  247. ctx := prom.NewContext(client)
  248. queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  249. queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  250. queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  251. queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  252. queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  253. queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  254. resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
  255. resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
  256. resChNodeRAMCost := ctx.Query(queryNodeRAMCost)
  257. resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
  258. resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
  259. resChNodeLabels := ctx.Query(queryNodeLabels)
  260. resNodeCPUCost, _ := resChNodeCPUCost.Await()
  261. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  262. resNodeGPUCost, _ := resChNodeGPUCost.Await()
  263. resNodeRAMCost, _ := resChNodeRAMCost.Await()
  264. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  265. resNodeLabels, _ := resChNodeLabels.Await()
  266. if ctx.ErrorCollector.IsError() {
  267. return nil, ctx.Errors()
  268. }
  269. nodeMap := map[string]*Node{}
  270. for _, result := range resNodeCPUCost {
  271. cluster, err := result.GetString("cluster_id")
  272. if err != nil {
  273. cluster = env.GetClusterID()
  274. }
  275. name, err := result.GetString("node")
  276. if err != nil {
  277. log.Warningf("ClusterNodes: CPU cost data missing node")
  278. continue
  279. }
  280. nodeType, _ := result.GetString("instance_type")
  281. providerID, _ := result.GetString("provider_id")
  282. cpuCost := result.Values[0].Value
  283. key := fmt.Sprintf("%s/%s", cluster, name)
  284. if _, ok := nodeMap[key]; !ok {
  285. nodeMap[key] = &Node{
  286. Cluster: cluster,
  287. Name: name,
  288. NodeType: nodeType,
  289. ProviderID: cp.ParseID(providerID),
  290. }
  291. }
  292. nodeMap[key].CPUCost += cpuCost
  293. nodeMap[key].NodeType = nodeType
  294. }
  295. partialCPUMap := make(map[string]float64)
  296. partialCPUMap["e2-micro"] = 0.25
  297. partialCPUMap["e2-small"] = 0.5
  298. partialCPUMap["e2-medium"] = 1.0
  299. for _, result := range resNodeCPUCores {
  300. cluster, err := result.GetString("cluster_id")
  301. if err != nil {
  302. cluster = env.GetClusterID()
  303. }
  304. name, err := result.GetString("node")
  305. if err != nil {
  306. log.Warningf("ClusterNodes: CPU cores data missing node")
  307. continue
  308. }
  309. cpuCores := result.Values[0].Value
  310. key := fmt.Sprintf("%s/%s", cluster, name)
  311. if _, ok := nodeMap[key]; !ok {
  312. nodeMap[key] = &Node{
  313. Cluster: cluster,
  314. Name: name,
  315. }
  316. }
  317. node := nodeMap[key]
  318. if v, ok := partialCPUMap[node.NodeType]; ok {
  319. node.CPUCores = v
  320. if cpuCores > 0 {
  321. adjustmentFactor := v / cpuCores
  322. node.CPUCost = node.CPUCost * adjustmentFactor
  323. }
  324. } else {
  325. nodeMap[key].CPUCores = cpuCores
  326. }
  327. }
  328. for _, result := range resNodeRAMCost {
  329. cluster, err := result.GetString("cluster_id")
  330. if err != nil {
  331. cluster = env.GetClusterID()
  332. }
  333. name, err := result.GetString("node")
  334. if err != nil {
  335. log.Warningf("ClusterNodes: RAM cost data missing node")
  336. continue
  337. }
  338. nodeType, _ := result.GetString("instance_type")
  339. providerID, _ := result.GetString("provider_id")
  340. ramCost := result.Values[0].Value
  341. key := fmt.Sprintf("%s/%s", cluster, name)
  342. if _, ok := nodeMap[key]; !ok {
  343. nodeMap[key] = &Node{
  344. Cluster: cluster,
  345. Name: name,
  346. NodeType: nodeType,
  347. ProviderID: cp.ParseID(providerID),
  348. }
  349. }
  350. nodeMap[key].RAMCost += ramCost
  351. nodeMap[key].NodeType = nodeType
  352. }
  353. for _, result := range resNodeRAMBytes {
  354. cluster, err := result.GetString("cluster_id")
  355. if err != nil {
  356. cluster = env.GetClusterID()
  357. }
  358. name, err := result.GetString("node")
  359. if err != nil {
  360. log.Warningf("ClusterNodes: RAM bytes data missing node")
  361. continue
  362. }
  363. ramBytes := result.Values[0].Value
  364. key := fmt.Sprintf("%s/%s", cluster, name)
  365. if _, ok := nodeMap[key]; !ok {
  366. nodeMap[key] = &Node{
  367. Cluster: cluster,
  368. Name: name,
  369. }
  370. }
  371. nodeMap[key].RAMBytes = ramBytes
  372. }
  373. for _, result := range resNodeGPUCost {
  374. cluster, err := result.GetString("cluster_id")
  375. if err != nil {
  376. cluster = env.GetClusterID()
  377. }
  378. name, err := result.GetString("node")
  379. if err != nil {
  380. log.Warningf("ClusterNodes: GPU cost data missing node")
  381. continue
  382. }
  383. nodeType, _ := result.GetString("instance_type")
  384. providerID, _ := result.GetString("provider_id")
  385. gpuCost := result.Values[0].Value
  386. key := fmt.Sprintf("%s/%s", cluster, name)
  387. if _, ok := nodeMap[key]; !ok {
  388. nodeMap[key] = &Node{
  389. Cluster: cluster,
  390. Name: name,
  391. NodeType: nodeType,
  392. ProviderID: cp.ParseID(providerID),
  393. }
  394. }
  395. nodeMap[key].GPUCost += gpuCost
  396. }
  397. // Determine preemptibility with node labels
  398. for _, result := range resNodeLabels {
  399. nodeName, err := result.GetString("node")
  400. if err != nil {
  401. continue
  402. }
  403. // GCP preemptible label
  404. pre := result.Values[0].Value
  405. cluster, err := result.GetString("cluster_id")
  406. if err != nil {
  407. cluster = env.GetClusterID()
  408. }
  409. key := fmt.Sprintf("%s/%s", cluster, nodeName)
  410. if node, ok := nodeMap[key]; pre > 0.0 && ok {
  411. node.Preemptible = true
  412. }
  413. // TODO AWS preemptible
  414. // TODO Azure preemptible
  415. }
  416. c, err := cp.GetConfig()
  417. if err != nil {
  418. return nil, []error{err}
  419. }
  420. discount, err := ParsePercentString(c.Discount)
  421. if err != nil {
  422. return nil, []error{err}
  423. }
  424. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  425. if err != nil {
  426. return nil, []error{err}
  427. }
  428. for _, node := range nodeMap {
  429. // TODO take RI into account
  430. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  431. }
  432. return nodeMap, nil
  433. }
  434. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  435. func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
  436. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  437. start, end, err := util.ParseTimeRange(window, offset)
  438. if err != nil {
  439. return nil, err
  440. }
  441. mins := end.Sub(*start).Minutes()
  442. // minsPerResolution determines accuracy and resource use for the following
  443. // queries. Smaller values (higher resolution) result in better accuracy,
  444. // but more expensive queries, and vice-a-versa.
  445. minsPerResolution := 5
  446. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  447. // value, converts it to a cumulative value; i.e.
  448. // [$/hr] * [min/res]*[hr/min] = [$/res]
  449. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  450. const fmtQueryDataCount = `
  451. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (cluster_id)[%s:%dm]%s) * %d
  452. `
  453. const fmtQueryTotalGPU = `
  454. sum(
  455. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  456. ) by (cluster_id)
  457. `
  458. const fmtQueryTotalCPU = `
  459. sum(
  460. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, cluster_id)[%s:%dm]%s) *
  461. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  462. ) by (cluster_id)
  463. `
  464. const fmtQueryTotalRAM = `
  465. sum(
  466. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  467. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  468. ) by (cluster_id)
  469. `
  470. const fmtQueryTotalStorage = `
  471. sum(
  472. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  473. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, cluster_id) * %f
  474. ) by (cluster_id)
  475. `
  476. const fmtQueryCPUModePct = `
  477. sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
  478. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)
  479. `
  480. const fmtQueryRAMSystemPct = `
  481. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (cluster_id)
  482. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  483. `
  484. const fmtQueryRAMUserPct = `
  485. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (cluster_id)
  486. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  487. `
  488. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  489. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  490. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  491. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  492. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  493. if queryTotalLocalStorage != "" {
  494. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  495. }
  496. fmtOffset := ""
  497. if offset != "" {
  498. fmtOffset = fmt.Sprintf("offset %s", offset)
  499. }
  500. queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, minsPerResolution, fmtOffset, minsPerResolution)
  501. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  502. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  503. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  504. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  505. ctx := prom.NewContext(client)
  506. resChs := ctx.QueryAll(
  507. queryDataCount,
  508. queryTotalGPU,
  509. queryTotalCPU,
  510. queryTotalRAM,
  511. queryTotalStorage,
  512. )
  513. // Only submit the local storage query if it is valid. Otherwise Prometheus
  514. // will return errors. Always append something to resChs, regardless, to
  515. // maintain indexing.
  516. if queryTotalLocalStorage != "" {
  517. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  518. } else {
  519. resChs = append(resChs, nil)
  520. }
  521. if withBreakdown {
  522. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
  523. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  524. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  525. bdResChs := ctx.QueryAll(
  526. queryCPUModePct,
  527. queryRAMSystemPct,
  528. queryRAMUserPct,
  529. )
  530. // Only submit the local storage query if it is valid. Otherwise Prometheus
  531. // will return errors. Always append something to resChs, regardless, to
  532. // maintain indexing.
  533. if queryUsedLocalStorage != "" {
  534. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  535. } else {
  536. bdResChs = append(bdResChs, nil)
  537. }
  538. resChs = append(resChs, bdResChs...)
  539. }
  540. resDataCount, _ := resChs[0].Await()
  541. resTotalGPU, _ := resChs[1].Await()
  542. resTotalCPU, _ := resChs[2].Await()
  543. resTotalRAM, _ := resChs[3].Await()
  544. resTotalStorage, _ := resChs[4].Await()
  545. if ctx.HasErrors() {
  546. return nil, ctx.Errors()[0]
  547. }
  548. defaultClusterID := env.GetClusterID()
  549. dataMinsByCluster := map[string]float64{}
  550. for _, result := range resDataCount {
  551. clusterID, _ := result.GetString("cluster_id")
  552. if clusterID == "" {
  553. clusterID = defaultClusterID
  554. }
  555. dataMins := mins
  556. if len(result.Values) > 0 {
  557. dataMins = result.Values[0].Value
  558. } else {
  559. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  560. }
  561. dataMinsByCluster[clusterID] = dataMins
  562. }
  563. // Determine combined discount
  564. discount, customDiscount := 0.0, 0.0
  565. c, err := A.Cloud.GetConfig()
  566. if err == nil {
  567. discount, err = ParsePercentString(c.Discount)
  568. if err != nil {
  569. discount = 0.0
  570. }
  571. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  572. if err != nil {
  573. customDiscount = 0.0
  574. }
  575. }
  576. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  577. costData := make(map[string]map[string]float64)
  578. // Helper function to iterate over Prom query results, parsing the raw values into
  579. // the intermediate costData structure.
  580. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  581. for _, result := range results {
  582. clusterID, _ := result.GetString("cluster_id")
  583. if clusterID == "" {
  584. clusterID = defaultClusterID
  585. }
  586. if _, ok := costData[clusterID]; !ok {
  587. costData[clusterID] = map[string]float64{}
  588. }
  589. if len(result.Values) > 0 {
  590. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  591. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  592. }
  593. }
  594. }
  595. // Apply both sustained use and custom discounts to RAM and CPU
  596. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  597. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  598. // Apply only custom discount to GPU and storage
  599. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  600. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  601. if queryTotalLocalStorage != "" {
  602. resTotalLocalStorage, err := resChs[5].Await()
  603. if err != nil {
  604. return nil, err
  605. }
  606. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  607. }
  608. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  609. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  610. pvUsedCostMap := map[string]float64{}
  611. if withBreakdown {
  612. resCPUModePct, _ := resChs[6].Await()
  613. resRAMSystemPct, _ := resChs[7].Await()
  614. resRAMUserPct, _ := resChs[8].Await()
  615. if ctx.HasErrors() {
  616. return nil, ctx.Errors()[0]
  617. }
  618. for _, result := range resCPUModePct {
  619. clusterID, _ := result.GetString("cluster_id")
  620. if clusterID == "" {
  621. clusterID = defaultClusterID
  622. }
  623. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  624. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  625. }
  626. cpuBD := cpuBreakdownMap[clusterID]
  627. mode, err := result.GetString("mode")
  628. if err != nil {
  629. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  630. mode = "other"
  631. }
  632. switch mode {
  633. case "idle":
  634. cpuBD.Idle += result.Values[0].Value
  635. case "system":
  636. cpuBD.System += result.Values[0].Value
  637. case "user":
  638. cpuBD.User += result.Values[0].Value
  639. default:
  640. cpuBD.Other += result.Values[0].Value
  641. }
  642. }
  643. for _, result := range resRAMSystemPct {
  644. clusterID, _ := result.GetString("cluster_id")
  645. if clusterID == "" {
  646. clusterID = defaultClusterID
  647. }
  648. if _, ok := ramBreakdownMap[clusterID]; !ok {
  649. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  650. }
  651. ramBD := ramBreakdownMap[clusterID]
  652. ramBD.System += result.Values[0].Value
  653. }
  654. for _, result := range resRAMUserPct {
  655. clusterID, _ := result.GetString("cluster_id")
  656. if clusterID == "" {
  657. clusterID = defaultClusterID
  658. }
  659. if _, ok := ramBreakdownMap[clusterID]; !ok {
  660. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  661. }
  662. ramBD := ramBreakdownMap[clusterID]
  663. ramBD.User += result.Values[0].Value
  664. }
  665. for _, ramBD := range ramBreakdownMap {
  666. remaining := 1.0
  667. remaining -= ramBD.Other
  668. remaining -= ramBD.System
  669. remaining -= ramBD.User
  670. ramBD.Idle = remaining
  671. }
  672. if queryUsedLocalStorage != "" {
  673. resUsedLocalStorage, err := resChs[9].Await()
  674. if err != nil {
  675. return nil, err
  676. }
  677. for _, result := range resUsedLocalStorage {
  678. clusterID, _ := result.GetString("cluster_id")
  679. if clusterID == "" {
  680. clusterID = defaultClusterID
  681. }
  682. pvUsedCostMap[clusterID] += result.Values[0].Value
  683. }
  684. }
  685. }
  686. if ctx.ErrorCollector.IsError() {
  687. for _, err := range ctx.Errors() {
  688. log.Errorf("ComputeClusterCosts: %s", err)
  689. }
  690. return nil, ctx.Errors()[0]
  691. }
  692. // Convert intermediate structure to Costs instances
  693. costsByCluster := map[string]*ClusterCosts{}
  694. for id, cd := range costData {
  695. dataMins, ok := dataMinsByCluster[id]
  696. if !ok {
  697. dataMins = mins
  698. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  699. }
  700. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/util.MinsPerHour)
  701. if err != nil {
  702. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  703. return nil, err
  704. }
  705. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  706. costs.CPUBreakdown = cpuBD
  707. }
  708. if ramBD, ok := ramBreakdownMap[id]; ok {
  709. costs.RAMBreakdown = ramBD
  710. }
  711. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  712. if pvUC, ok := pvUsedCostMap[id]; ok {
  713. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  714. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  715. }
  716. costs.DataMinutes = dataMins
  717. costsByCluster[id] = costs
  718. }
  719. return costsByCluster, nil
  720. }
  721. type Totals struct {
  722. TotalCost [][]string `json:"totalcost"`
  723. CPUCost [][]string `json:"cpucost"`
  724. MemCost [][]string `json:"memcost"`
  725. StorageCost [][]string `json:"storageCost"`
  726. }
  727. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  728. if len(qrs) == 0 {
  729. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  730. }
  731. result := qrs[0]
  732. totals := [][]string{}
  733. for _, value := range result.Values {
  734. d0 := fmt.Sprintf("%f", value.Timestamp)
  735. d1 := fmt.Sprintf("%f", value.Value)
  736. toAppend := []string{
  737. d0,
  738. d1,
  739. }
  740. totals = append(totals, toAppend)
  741. }
  742. return totals, nil
  743. }
  744. // ClusterCostsOverTime gives the full cluster costs over time
  745. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  746. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
  747. if localStorageQuery != "" {
  748. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  749. }
  750. layout := "2006-01-02T15:04:05.000Z"
  751. start, err := time.Parse(layout, startString)
  752. if err != nil {
  753. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  754. return nil, err
  755. }
  756. end, err := time.Parse(layout, endString)
  757. if err != nil {
  758. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  759. return nil, err
  760. }
  761. window, err := time.ParseDuration(windowString)
  762. if err != nil {
  763. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  764. return nil, err
  765. }
  766. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  767. if offset != "" {
  768. offset = fmt.Sprintf("offset %s", offset)
  769. }
  770. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  771. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  772. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  773. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  774. ctx := prom.NewContext(cli)
  775. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  776. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  777. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  778. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  779. resultClusterCores, err := resChClusterCores.Await()
  780. if err != nil {
  781. return nil, err
  782. }
  783. resultClusterRAM, err := resChClusterRAM.Await()
  784. if err != nil {
  785. return nil, err
  786. }
  787. resultStorage, err := resChStorage.Await()
  788. if err != nil {
  789. return nil, err
  790. }
  791. resultTotal, err := resChTotal.Await()
  792. if err != nil {
  793. return nil, err
  794. }
  795. coreTotal, err := resultToTotals(resultClusterCores)
  796. if err != nil {
  797. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  798. return nil, err
  799. }
  800. ramTotal, err := resultToTotals(resultClusterRAM)
  801. if err != nil {
  802. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  803. return nil, err
  804. }
  805. storageTotal, err := resultToTotals(resultStorage)
  806. if err != nil {
  807. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  808. }
  809. clusterTotal, err := resultToTotals(resultTotal)
  810. if err != nil {
  811. // If clusterTotal query failed, it's likely because there are no PVs, which
  812. // causes the qTotal query to return no data. Instead, query only node costs.
  813. // If that fails, return an error because something is actually wrong.
  814. qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
  815. resultNodes, err := ctx.QueryRangeSync(qNodes, start, end, window)
  816. if err != nil {
  817. return nil, err
  818. }
  819. clusterTotal, err = resultToTotals(resultNodes)
  820. if err != nil {
  821. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  822. return nil, err
  823. }
  824. }
  825. return &Totals{
  826. TotalCost: clusterTotal,
  827. CPUCost: coreTotal,
  828. MemCost: ramTotal,
  829. StorageCost: storageTotal,
  830. }, nil
  831. }