cluster.go 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153
  1. package costmodel
  2. import (
  3. "fmt"
  4. "github.com/kubecost/cost-model/pkg/util/timeutil"
  5. "time"
  6. "github.com/kubecost/cost-model/pkg/cloud"
  7. "github.com/kubecost/cost-model/pkg/env"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/prom"
  10. prometheus "github.com/prometheus/client_golang/api"
  11. "k8s.io/klog"
  12. )
  13. const (
  14. queryClusterCores = `sum(
  15. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  16. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  17. ) by (%s)`
  18. queryClusterRAM = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  20. ) by (%s)`
  21. queryStorage = `sum(
  22. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  23. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  24. ) by (%s) %s`
  25. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  26. sum(
  27. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  29. ) by (%s) %s`
  30. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  31. )
  32. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  33. // are broken down by cores, memory, and storage.
  34. type ClusterCosts struct {
  35. Start *time.Time `json:"startTime"`
  36. End *time.Time `json:"endTime"`
  37. CPUCumulative float64 `json:"cpuCumulativeCost"`
  38. CPUMonthly float64 `json:"cpuMonthlyCost"`
  39. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  40. GPUCumulative float64 `json:"gpuCumulativeCost"`
  41. GPUMonthly float64 `json:"gpuMonthlyCost"`
  42. RAMCumulative float64 `json:"ramCumulativeCost"`
  43. RAMMonthly float64 `json:"ramMonthlyCost"`
  44. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  45. StorageCumulative float64 `json:"storageCumulativeCost"`
  46. StorageMonthly float64 `json:"storageMonthlyCost"`
  47. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  48. TotalCumulative float64 `json:"totalCumulativeCost"`
  49. TotalMonthly float64 `json:"totalMonthlyCost"`
  50. DataMinutes float64
  51. }
  52. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  53. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  54. type ClusterCostsBreakdown struct {
  55. Idle float64 `json:"idle"`
  56. Other float64 `json:"other"`
  57. System float64 `json:"system"`
  58. User float64 `json:"user"`
  59. }
  60. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  61. // the associated monthly rate data, and returns the Costs.
  62. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  63. start, end := timeutil.ParseTimeRange(window, offset)
  64. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  65. if dataHours == 0 {
  66. dataHours = end.Sub(start).Hours()
  67. }
  68. // Do not allow zero-length windows to prevent divide-by-zero issues
  69. if dataHours == 0 {
  70. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  71. }
  72. cc := &ClusterCosts{
  73. Start: &start,
  74. End: &end,
  75. CPUCumulative: cpu,
  76. GPUCumulative: gpu,
  77. RAMCumulative: ram,
  78. StorageCumulative: storage,
  79. TotalCumulative: cpu + gpu + ram + storage,
  80. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  81. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  82. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  83. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  84. }
  85. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  86. return cc, nil
  87. }
  88. type Disk struct {
  89. Cluster string
  90. Name string
  91. ProviderID string
  92. Cost float64
  93. Bytes float64
  94. Local bool
  95. Start time.Time
  96. End time.Time
  97. Minutes float64
  98. Breakdown *ClusterCostsBreakdown
  99. }
  100. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
  101. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  102. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  103. if offset < time.Minute {
  104. offsetStr = ""
  105. }
  106. // minsPerResolution determines accuracy and resource use for the following
  107. // queries. Smaller values (higher resolution) result in better accuracy,
  108. // but more expensive queries, and vice-a-versa.
  109. minsPerResolution := 1
  110. resolution := time.Duration(minsPerResolution) * time.Minute
  111. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  112. // value, converts it to a cumulative value; i.e.
  113. // [$/hr] * [min/res]*[hr/min] = [$/res]
  114. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  115. // TODO niko/assets how do we not hard-code this price?
  116. costPerGBHr := 0.04 / 730.0
  117. ctx := prom.NewContext(client)
  118. queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (%s, persistentvolume) * on(%s, persistentvolume) group_right avg(pv_hourly_cost) by (%s, persistentvolume,provider_id))[%s:%dm]%s)/1024/1024/1024 * %f`, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  119. queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  120. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  121. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  122. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  123. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s)`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  124. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  125. resChPVCost := ctx.Query(queryPVCost)
  126. resChPVSize := ctx.Query(queryPVSize)
  127. resChActiveMins := ctx.Query(queryActiveMins)
  128. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  129. resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
  130. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  131. resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
  132. resPVCost, _ := resChPVCost.Await()
  133. resPVSize, _ := resChPVSize.Await()
  134. resActiveMins, _ := resChActiveMins.Await()
  135. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  136. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  137. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  138. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  139. if ctx.HasErrors() {
  140. return nil, ctx.ErrorCollection()
  141. }
  142. diskMap := map[string]*Disk{}
  143. for _, result := range resPVCost {
  144. cluster, err := result.GetString(env.GetPromClusterLabel())
  145. if err != nil {
  146. cluster = env.GetClusterID()
  147. }
  148. name, err := result.GetString("persistentvolume")
  149. if err != nil {
  150. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  151. continue
  152. }
  153. // TODO niko/assets storage class
  154. cost := result.Values[0].Value
  155. key := fmt.Sprintf("%s/%s", cluster, name)
  156. if _, ok := diskMap[key]; !ok {
  157. diskMap[key] = &Disk{
  158. Cluster: cluster,
  159. Name: name,
  160. Breakdown: &ClusterCostsBreakdown{},
  161. }
  162. }
  163. diskMap[key].Cost += cost
  164. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  165. if providerID != "" {
  166. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  167. }
  168. }
  169. for _, result := range resPVSize {
  170. cluster, err := result.GetString(env.GetPromClusterLabel())
  171. if err != nil {
  172. cluster = env.GetClusterID()
  173. }
  174. name, err := result.GetString("persistentvolume")
  175. if err != nil {
  176. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  177. continue
  178. }
  179. // TODO niko/assets storage class
  180. bytes := result.Values[0].Value
  181. key := fmt.Sprintf("%s/%s", cluster, name)
  182. if _, ok := diskMap[key]; !ok {
  183. diskMap[key] = &Disk{
  184. Cluster: cluster,
  185. Name: name,
  186. Breakdown: &ClusterCostsBreakdown{},
  187. }
  188. }
  189. diskMap[key].Bytes = bytes
  190. }
  191. for _, result := range resLocalStorageCost {
  192. cluster, err := result.GetString(env.GetPromClusterLabel())
  193. if err != nil {
  194. cluster = env.GetClusterID()
  195. }
  196. name, err := result.GetString("instance")
  197. if err != nil {
  198. log.Warningf("ClusterDisks: local storage data missing instance")
  199. continue
  200. }
  201. cost := result.Values[0].Value
  202. key := fmt.Sprintf("%s/%s", cluster, name)
  203. if _, ok := diskMap[key]; !ok {
  204. diskMap[key] = &Disk{
  205. Cluster: cluster,
  206. Name: name,
  207. Breakdown: &ClusterCostsBreakdown{},
  208. Local: true,
  209. }
  210. }
  211. diskMap[key].Cost += cost
  212. }
  213. for _, result := range resLocalStorageUsedCost {
  214. cluster, err := result.GetString(env.GetPromClusterLabel())
  215. if err != nil {
  216. cluster = env.GetClusterID()
  217. }
  218. name, err := result.GetString("instance")
  219. if err != nil {
  220. log.Warningf("ClusterDisks: local storage usage data missing instance")
  221. continue
  222. }
  223. cost := result.Values[0].Value
  224. key := fmt.Sprintf("%s/%s", cluster, name)
  225. if _, ok := diskMap[key]; !ok {
  226. diskMap[key] = &Disk{
  227. Cluster: cluster,
  228. Name: name,
  229. Breakdown: &ClusterCostsBreakdown{},
  230. Local: true,
  231. }
  232. }
  233. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  234. }
  235. for _, result := range resLocalStorageBytes {
  236. cluster, err := result.GetString(env.GetPromClusterLabel())
  237. if err != nil {
  238. cluster = env.GetClusterID()
  239. }
  240. name, err := result.GetString("instance")
  241. if err != nil {
  242. log.Warningf("ClusterDisks: local storage data missing instance")
  243. continue
  244. }
  245. bytes := result.Values[0].Value
  246. key := fmt.Sprintf("%s/%s", cluster, name)
  247. if _, ok := diskMap[key]; !ok {
  248. diskMap[key] = &Disk{
  249. Cluster: cluster,
  250. Name: name,
  251. Breakdown: &ClusterCostsBreakdown{},
  252. Local: true,
  253. }
  254. }
  255. diskMap[key].Bytes = bytes
  256. }
  257. for _, result := range resActiveMins {
  258. cluster, err := result.GetString(env.GetPromClusterLabel())
  259. if err != nil {
  260. cluster = env.GetClusterID()
  261. }
  262. name, err := result.GetString("persistentvolume")
  263. if err != nil {
  264. log.Warningf("ClusterDisks: active mins missing instance")
  265. continue
  266. }
  267. key := fmt.Sprintf("%s/%s", cluster, name)
  268. if _, ok := diskMap[key]; !ok {
  269. log.DedupedWarningf(5, "ClusterDisks: active mins for unidentified disk")
  270. continue
  271. }
  272. if len(result.Values) == 0 {
  273. continue
  274. }
  275. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  276. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  277. mins := e.Sub(s).Minutes()
  278. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  279. diskMap[key].End = e
  280. diskMap[key].Start = s
  281. diskMap[key].Minutes = mins
  282. }
  283. for _, result := range resLocalActiveMins {
  284. cluster, err := result.GetString(env.GetPromClusterLabel())
  285. if err != nil {
  286. cluster = env.GetClusterID()
  287. }
  288. name, err := result.GetString("node")
  289. if err != nil {
  290. log.Warningf("ClusterDisks: local active mins data missing instance")
  291. continue
  292. }
  293. key := fmt.Sprintf("%s/%s", cluster, name)
  294. if _, ok := diskMap[key]; !ok {
  295. log.Warningf("ClusterDisks: local active mins for unidentified disk")
  296. continue
  297. }
  298. if len(result.Values) == 0 {
  299. continue
  300. }
  301. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  302. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  303. mins := e.Sub(s).Minutes()
  304. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  305. diskMap[key].End = e
  306. diskMap[key].Start = s
  307. diskMap[key].Minutes = mins
  308. }
  309. for _, disk := range diskMap {
  310. // Apply all remaining RAM to Idle
  311. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  312. // Set provider Id to the name for reconciliation on Azure
  313. if fmt.Sprintf("%T", provider) == "*provider.Azure" {
  314. if disk.ProviderID == "" {
  315. disk.ProviderID = disk.Name
  316. }
  317. }
  318. }
  319. return diskMap, nil
  320. }
  321. type Node struct {
  322. Cluster string
  323. Name string
  324. ProviderID string
  325. NodeType string
  326. CPUCost float64
  327. CPUCores float64
  328. GPUCost float64
  329. GPUCount float64
  330. RAMCost float64
  331. RAMBytes float64
  332. Discount float64
  333. Preemptible bool
  334. CPUBreakdown *ClusterCostsBreakdown
  335. RAMBreakdown *ClusterCostsBreakdown
  336. Start time.Time
  337. End time.Time
  338. Minutes float64
  339. Labels map[string]string
  340. CostPerCPUHr float64
  341. CostPerRAMGiBHr float64
  342. CostPerGPUHr float64
  343. }
  344. // GKE lies about the number of cores e2 nodes have. This table
  345. // contains a mapping from node type -> actual CPU cores
  346. // for those cases.
  347. var partialCPUMap = map[string]float64{
  348. "e2-micro": 0.25,
  349. "e2-small": 0.5,
  350. "e2-medium": 1.0,
  351. }
  352. type NodeIdentifier struct {
  353. Cluster string
  354. Name string
  355. ProviderID string
  356. }
  357. type nodeIdentifierNoProviderID struct {
  358. Cluster string
  359. Name string
  360. }
  361. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  362. for k, v := range activeDataMap {
  363. keyNon := nodeIdentifierNoProviderID{
  364. Cluster: k.Cluster,
  365. Name: k.Name,
  366. }
  367. if cost, ok := costMap[k]; ok {
  368. minutes := v.minutes
  369. count := 1.0
  370. if c, ok := resourceCountMap[keyNon]; ok {
  371. count = c
  372. }
  373. costMap[k] = cost * (minutes / 60) * count
  374. }
  375. }
  376. }
  377. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  378. for k, v := range activeDataMap {
  379. if cost, ok := costMap[k]; ok {
  380. minutes := v.minutes
  381. costMap[k] = cost * (minutes / 60)
  382. }
  383. }
  384. }
  385. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[NodeIdentifier]*Node, error) {
  386. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  387. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  388. if offset < time.Minute {
  389. offsetStr = ""
  390. }
  391. // minsPerResolution determines accuracy and resource use for the following
  392. // queries. Smaller values (higher resolution) result in better accuracy,
  393. // but more expensive queries, and vice-a-versa.
  394. minsPerResolution := 1
  395. resolution := time.Duration(minsPerResolution) * time.Minute
  396. requiredCtx := prom.NewContext(client)
  397. optionalCtx := prom.NewContext(client)
  398. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  399. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  400. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr, env.GetPromClusterLabel())
  401. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  402. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (%s, node, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  403. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  404. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, %s, mode)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel())
  405. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  406. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  407. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  408. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  409. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  410. // Return errors if these fail
  411. resChNodeCPUHourlyCost := requiredCtx.Query(queryNodeCPUHourlyCost)
  412. resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
  413. resChNodeRAMHourlyCost := requiredCtx.Query(queryNodeRAMHourlyCost)
  414. resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
  415. resChNodeGPUCount := requiredCtx.Query(queryNodeGPUCount)
  416. resChNodeGPUHourlyCost := requiredCtx.Query(queryNodeGPUHourlyCost)
  417. resChActiveMins := requiredCtx.Query(queryActiveMins)
  418. resChIsSpot := requiredCtx.Query(queryIsSpot)
  419. // Do not return errors if these fail, but log warnings
  420. resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
  421. resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
  422. resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
  423. resChLabels := optionalCtx.Query(queryLabels)
  424. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  425. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  426. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  427. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  428. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  429. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  430. resIsSpot, _ := resChIsSpot.Await()
  431. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  432. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  433. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  434. resActiveMins, _ := resChActiveMins.Await()
  435. resLabels, _ := resChLabels.Await()
  436. if optionalCtx.HasErrors() {
  437. for _, err := range optionalCtx.Errors() {
  438. log.Warningf("ClusterNodes: %s", err)
  439. }
  440. }
  441. if requiredCtx.HasErrors() {
  442. for _, err := range requiredCtx.Errors() {
  443. log.Errorf("ClusterNodes: %s", err)
  444. }
  445. return nil, requiredCtx.ErrorCollection()
  446. }
  447. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  448. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  449. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost)
  450. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost)
  451. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap)
  452. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  453. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  454. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  455. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  456. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  457. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  458. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  459. preemptibleMap := buildPreemptibleMap(resIsSpot)
  460. labelsMap := buildLabelsMap(resLabels)
  461. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  462. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  463. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  464. nodeMap := buildNodeMap(
  465. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  466. cpuCoresMap, ramBytesMap, ramUserPctMap,
  467. ramSystemPctMap,
  468. cpuBreakdownMap,
  469. activeDataMap,
  470. preemptibleMap,
  471. labelsMap,
  472. clusterAndNameToType,
  473. )
  474. c, err := cp.GetConfig()
  475. if err != nil {
  476. return nil, err
  477. }
  478. discount, err := ParsePercentString(c.Discount)
  479. if err != nil {
  480. return nil, err
  481. }
  482. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  483. if err != nil {
  484. return nil, err
  485. }
  486. for _, node := range nodeMap {
  487. // TODO take GKE Reserved Instances into account
  488. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  489. // Apply all remaining resources to Idle
  490. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  491. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  492. }
  493. return nodeMap, nil
  494. }
  495. type LoadBalancer struct {
  496. Cluster string
  497. Name string
  498. ProviderID string
  499. Cost float64
  500. Start time.Time
  501. Minutes float64
  502. }
  503. func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
  504. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  505. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  506. if offset < time.Minute {
  507. offsetStr = ""
  508. }
  509. // minsPerResolution determines accuracy and resource use for the following
  510. // queries. Smaller values (higher resolution) result in better accuracy,
  511. // but more expensive queries, and vice-a-versa.
  512. minsPerResolution := 5
  513. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  514. // value, converts it to a cumulative value; i.e.
  515. // [$/hr] * [min/res]*[hr/min] = [$/res]
  516. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  517. ctx := prom.NewContext(client)
  518. queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip))[%s:%dm]%s) * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  519. queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  520. resChLBCost := ctx.Query(queryLBCost)
  521. resChActiveMins := ctx.Query(queryActiveMins)
  522. resLBCost, _ := resChLBCost.Await()
  523. resActiveMins, _ := resChActiveMins.Await()
  524. if ctx.HasErrors() {
  525. return nil, ctx.ErrorCollection()
  526. }
  527. loadBalancerMap := map[string]*LoadBalancer{}
  528. for _, result := range resLBCost {
  529. cluster, err := result.GetString(env.GetPromClusterLabel())
  530. if err != nil {
  531. cluster = env.GetClusterID()
  532. }
  533. namespace, err := result.GetString("namespace")
  534. if err != nil {
  535. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  536. continue
  537. }
  538. serviceName, err := result.GetString("service_name")
  539. if err != nil {
  540. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  541. continue
  542. }
  543. providerID, err := result.GetString("ingress_ip")
  544. if err != nil {
  545. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  546. providerID = ""
  547. }
  548. lbCost := result.Values[0].Value
  549. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  550. if _, ok := loadBalancerMap[key]; !ok {
  551. loadBalancerMap[key] = &LoadBalancer{
  552. Cluster: cluster,
  553. Name: namespace + "/" + serviceName,
  554. ProviderID: cloud.ParseLBID(providerID),
  555. }
  556. }
  557. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  558. // Prevents there from being a duplicate LoadBalancers on the same day
  559. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  560. loadBalancerMap[key].ProviderID = providerID
  561. }
  562. loadBalancerMap[key].Cost += lbCost
  563. }
  564. for _, result := range resActiveMins {
  565. cluster, err := result.GetString(env.GetPromClusterLabel())
  566. if err != nil {
  567. cluster = env.GetClusterID()
  568. }
  569. namespace, err := result.GetString("namespace")
  570. if err != nil {
  571. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  572. continue
  573. }
  574. serviceName, err := result.GetString("service_name")
  575. if err != nil {
  576. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  577. continue
  578. }
  579. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  580. if len(result.Values) == 0 {
  581. continue
  582. }
  583. if lb, ok := loadBalancerMap[key]; ok {
  584. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  585. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  586. mins := e.Sub(s).Minutes()
  587. lb.Start = s
  588. lb.Minutes = mins
  589. } else {
  590. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  591. }
  592. }
  593. return loadBalancerMap, nil
  594. }
  595. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  596. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  597. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  598. start, end := timeutil.ParseTimeRange(window, offset)
  599. mins := end.Sub(start).Minutes()
  600. // minsPerResolution determines accuracy and resource use for the following
  601. // queries. Smaller values (higher resolution) result in better accuracy,
  602. // but more expensive queries, and vice-a-versa.
  603. minsPerResolution := 5
  604. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  605. // value, converts it to a cumulative value; i.e.
  606. // [$/hr] * [min/res]*[hr/min] = [$/res]
  607. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  608. const fmtQueryDataCount = `
  609. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  610. `
  611. const fmtQueryTotalGPU = `
  612. sum(
  613. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  614. ) by (%s)
  615. `
  616. const fmtQueryTotalCPU = `
  617. sum(
  618. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  619. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  620. ) by (%s)
  621. `
  622. const fmtQueryTotalRAM = `
  623. sum(
  624. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  625. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  626. ) by (%s)
  627. `
  628. const fmtQueryTotalStorage = `
  629. sum(
  630. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  631. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  632. ) by (%s)
  633. `
  634. const fmtQueryCPUModePct = `
  635. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  636. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  637. `
  638. const fmtQueryRAMSystemPct = `
  639. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  640. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  641. `
  642. const fmtQueryRAMUserPct = `
  643. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  644. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  645. `
  646. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  647. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  648. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  649. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  650. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  651. if queryTotalLocalStorage != "" {
  652. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  653. }
  654. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  655. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, minsPerResolution)
  656. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  657. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  658. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  659. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  660. ctx := prom.NewContext(client)
  661. resChs := ctx.QueryAll(
  662. queryDataCount,
  663. queryTotalGPU,
  664. queryTotalCPU,
  665. queryTotalRAM,
  666. queryTotalStorage,
  667. )
  668. // Only submit the local storage query if it is valid. Otherwise Prometheus
  669. // will return errors. Always append something to resChs, regardless, to
  670. // maintain indexing.
  671. if queryTotalLocalStorage != "" {
  672. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  673. } else {
  674. resChs = append(resChs, nil)
  675. }
  676. if withBreakdown {
  677. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, env.GetPromClusterLabel(), window, fmtOffset, env.GetPromClusterLabel())
  678. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  679. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  680. bdResChs := ctx.QueryAll(
  681. queryCPUModePct,
  682. queryRAMSystemPct,
  683. queryRAMUserPct,
  684. )
  685. // Only submit the local storage query if it is valid. Otherwise Prometheus
  686. // will return errors. Always append something to resChs, regardless, to
  687. // maintain indexing.
  688. if queryUsedLocalStorage != "" {
  689. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  690. } else {
  691. bdResChs = append(bdResChs, nil)
  692. }
  693. resChs = append(resChs, bdResChs...)
  694. }
  695. resDataCount, _ := resChs[0].Await()
  696. resTotalGPU, _ := resChs[1].Await()
  697. resTotalCPU, _ := resChs[2].Await()
  698. resTotalRAM, _ := resChs[3].Await()
  699. resTotalStorage, _ := resChs[4].Await()
  700. if ctx.HasErrors() {
  701. return nil, ctx.ErrorCollection()
  702. }
  703. defaultClusterID := env.GetClusterID()
  704. dataMinsByCluster := map[string]float64{}
  705. for _, result := range resDataCount {
  706. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  707. if clusterID == "" {
  708. clusterID = defaultClusterID
  709. }
  710. dataMins := mins
  711. if len(result.Values) > 0 {
  712. dataMins = result.Values[0].Value
  713. } else {
  714. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  715. }
  716. dataMinsByCluster[clusterID] = dataMins
  717. }
  718. // Determine combined discount
  719. discount, customDiscount := 0.0, 0.0
  720. c, err := a.CloudProvider.GetConfig()
  721. if err == nil {
  722. discount, err = ParsePercentString(c.Discount)
  723. if err != nil {
  724. discount = 0.0
  725. }
  726. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  727. if err != nil {
  728. customDiscount = 0.0
  729. }
  730. }
  731. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  732. costData := make(map[string]map[string]float64)
  733. // Helper function to iterate over Prom query results, parsing the raw values into
  734. // the intermediate costData structure.
  735. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  736. for _, result := range results {
  737. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  738. if clusterID == "" {
  739. clusterID = defaultClusterID
  740. }
  741. if _, ok := costData[clusterID]; !ok {
  742. costData[clusterID] = map[string]float64{}
  743. }
  744. if len(result.Values) > 0 {
  745. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  746. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  747. }
  748. }
  749. }
  750. // Apply both sustained use and custom discounts to RAM and CPU
  751. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  752. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  753. // Apply only custom discount to GPU and storage
  754. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  755. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  756. if queryTotalLocalStorage != "" {
  757. resTotalLocalStorage, err := resChs[5].Await()
  758. if err != nil {
  759. return nil, err
  760. }
  761. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  762. }
  763. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  764. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  765. pvUsedCostMap := map[string]float64{}
  766. if withBreakdown {
  767. resCPUModePct, _ := resChs[6].Await()
  768. resRAMSystemPct, _ := resChs[7].Await()
  769. resRAMUserPct, _ := resChs[8].Await()
  770. if ctx.HasErrors() {
  771. return nil, ctx.ErrorCollection()
  772. }
  773. for _, result := range resCPUModePct {
  774. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  775. if clusterID == "" {
  776. clusterID = defaultClusterID
  777. }
  778. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  779. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  780. }
  781. cpuBD := cpuBreakdownMap[clusterID]
  782. mode, err := result.GetString("mode")
  783. if err != nil {
  784. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  785. mode = "other"
  786. }
  787. switch mode {
  788. case "idle":
  789. cpuBD.Idle += result.Values[0].Value
  790. case "system":
  791. cpuBD.System += result.Values[0].Value
  792. case "user":
  793. cpuBD.User += result.Values[0].Value
  794. default:
  795. cpuBD.Other += result.Values[0].Value
  796. }
  797. }
  798. for _, result := range resRAMSystemPct {
  799. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  800. if clusterID == "" {
  801. clusterID = defaultClusterID
  802. }
  803. if _, ok := ramBreakdownMap[clusterID]; !ok {
  804. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  805. }
  806. ramBD := ramBreakdownMap[clusterID]
  807. ramBD.System += result.Values[0].Value
  808. }
  809. for _, result := range resRAMUserPct {
  810. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  811. if clusterID == "" {
  812. clusterID = defaultClusterID
  813. }
  814. if _, ok := ramBreakdownMap[clusterID]; !ok {
  815. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  816. }
  817. ramBD := ramBreakdownMap[clusterID]
  818. ramBD.User += result.Values[0].Value
  819. }
  820. for _, ramBD := range ramBreakdownMap {
  821. remaining := 1.0
  822. remaining -= ramBD.Other
  823. remaining -= ramBD.System
  824. remaining -= ramBD.User
  825. ramBD.Idle = remaining
  826. }
  827. if queryUsedLocalStorage != "" {
  828. resUsedLocalStorage, err := resChs[9].Await()
  829. if err != nil {
  830. return nil, err
  831. }
  832. for _, result := range resUsedLocalStorage {
  833. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  834. if clusterID == "" {
  835. clusterID = defaultClusterID
  836. }
  837. pvUsedCostMap[clusterID] += result.Values[0].Value
  838. }
  839. }
  840. }
  841. if ctx.HasErrors() {
  842. for _, err := range ctx.Errors() {
  843. log.Errorf("ComputeClusterCosts: %s", err)
  844. }
  845. return nil, ctx.ErrorCollection()
  846. }
  847. // Convert intermediate structure to Costs instances
  848. costsByCluster := map[string]*ClusterCosts{}
  849. for id, cd := range costData {
  850. dataMins, ok := dataMinsByCluster[id]
  851. if !ok {
  852. dataMins = mins
  853. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  854. }
  855. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  856. if err != nil {
  857. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  858. return nil, err
  859. }
  860. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  861. costs.CPUBreakdown = cpuBD
  862. }
  863. if ramBD, ok := ramBreakdownMap[id]; ok {
  864. costs.RAMBreakdown = ramBD
  865. }
  866. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  867. if pvUC, ok := pvUsedCostMap[id]; ok {
  868. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  869. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  870. }
  871. costs.DataMinutes = dataMins
  872. costsByCluster[id] = costs
  873. }
  874. return costsByCluster, nil
  875. }
  876. type Totals struct {
  877. TotalCost [][]string `json:"totalcost"`
  878. CPUCost [][]string `json:"cpucost"`
  879. MemCost [][]string `json:"memcost"`
  880. StorageCost [][]string `json:"storageCost"`
  881. }
  882. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  883. if len(qrs) == 0 {
  884. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  885. }
  886. result := qrs[0]
  887. totals := [][]string{}
  888. for _, value := range result.Values {
  889. d0 := fmt.Sprintf("%f", value.Timestamp)
  890. d1 := fmt.Sprintf("%f", value.Value)
  891. toAppend := []string{
  892. d0,
  893. d1,
  894. }
  895. totals = append(totals, toAppend)
  896. }
  897. return totals, nil
  898. }
  899. // ClusterCostsOverTime gives the full cluster costs over time
  900. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  901. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  902. if localStorageQuery != "" {
  903. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  904. }
  905. layout := "2006-01-02T15:04:05.000Z"
  906. start, err := time.Parse(layout, startString)
  907. if err != nil {
  908. klog.V(1).Infof("Error parsing time %s. Error: %s", startString, err.Error())
  909. return nil, err
  910. }
  911. end, err := time.Parse(layout, endString)
  912. if err != nil {
  913. klog.V(1).Infof("Error parsing time %s. Error: %s", endString, err.Error())
  914. return nil, err
  915. }
  916. fmtWindow := timeutil.DurationString(window)
  917. if fmtWindow == "" {
  918. err := fmt.Errorf("window value invalid or missing")
  919. klog.V(1).Infof("Error parsing time %v. Error: %s", window, err.Error())
  920. return nil, err
  921. }
  922. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  923. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  924. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  925. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  926. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  927. ctx := prom.NewContext(cli)
  928. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  929. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  930. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  931. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  932. resultClusterCores, err := resChClusterCores.Await()
  933. if err != nil {
  934. return nil, err
  935. }
  936. resultClusterRAM, err := resChClusterRAM.Await()
  937. if err != nil {
  938. return nil, err
  939. }
  940. resultStorage, err := resChStorage.Await()
  941. if err != nil {
  942. return nil, err
  943. }
  944. resultTotal, err := resChTotal.Await()
  945. if err != nil {
  946. return nil, err
  947. }
  948. coreTotal, err := resultToTotals(resultClusterCores)
  949. if err != nil {
  950. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  951. return nil, err
  952. }
  953. ramTotal, err := resultToTotals(resultClusterRAM)
  954. if err != nil {
  955. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  956. return nil, err
  957. }
  958. storageTotal, err := resultToTotals(resultStorage)
  959. if err != nil {
  960. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  961. }
  962. clusterTotal, err := resultToTotals(resultTotal)
  963. if err != nil {
  964. // If clusterTotal query failed, it's likely because there are no PVs, which
  965. // causes the qTotal query to return no data. Instead, query only node costs.
  966. // If that fails, return an error because something is actually wrong.
  967. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  968. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  969. for _, warning := range warnings {
  970. log.Warningf(warning)
  971. }
  972. if err != nil {
  973. return nil, err
  974. }
  975. clusterTotal, err = resultToTotals(resultNodes)
  976. if err != nil {
  977. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  978. return nil, err
  979. }
  980. }
  981. return &Totals{
  982. TotalCost: clusterTotal,
  983. CPUCost: coreTotal,
  984. MemCost: ramTotal,
  985. StorageCost: storageTotal,
  986. }, nil
  987. }