cluster.go 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/opencost/opencost/pkg/kubecost"
  7. "github.com/opencost/opencost/pkg/util/timeutil"
  8. "golang.org/x/exp/slices"
  9. "github.com/opencost/opencost/pkg/cloud"
  10. "github.com/opencost/opencost/pkg/env"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/prom"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. const (
  16. queryClusterCores = `sum(
  17. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  18. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  19. ) by (%s)`
  20. queryClusterRAM = `sum(
  21. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  22. ) by (%s)`
  23. queryStorage = `sum(
  24. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  26. ) by (%s) %s`
  27. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  28. sum(
  29. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  30. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  31. ) by (%s) %s`
  32. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  33. )
  34. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  35. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  36. // are broken down by cores, memory, and storage.
  37. type ClusterCosts struct {
  38. Start *time.Time `json:"startTime"`
  39. End *time.Time `json:"endTime"`
  40. CPUCumulative float64 `json:"cpuCumulativeCost"`
  41. CPUMonthly float64 `json:"cpuMonthlyCost"`
  42. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  43. GPUCumulative float64 `json:"gpuCumulativeCost"`
  44. GPUMonthly float64 `json:"gpuMonthlyCost"`
  45. RAMCumulative float64 `json:"ramCumulativeCost"`
  46. RAMMonthly float64 `json:"ramMonthlyCost"`
  47. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  48. StorageCumulative float64 `json:"storageCumulativeCost"`
  49. StorageMonthly float64 `json:"storageMonthlyCost"`
  50. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  51. TotalCumulative float64 `json:"totalCumulativeCost"`
  52. TotalMonthly float64 `json:"totalMonthlyCost"`
  53. DataMinutes float64
  54. }
  55. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  56. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  57. type ClusterCostsBreakdown struct {
  58. Idle float64 `json:"idle"`
  59. Other float64 `json:"other"`
  60. System float64 `json:"system"`
  61. User float64 `json:"user"`
  62. }
  63. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  64. // the associated monthly rate data, and returns the Costs.
  65. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  66. start, end := timeutil.ParseTimeRange(window, offset)
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: &start,
  77. End: &end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. StorageClass string
  96. VolumeName string
  97. ClaimName string
  98. ClaimNamespace string
  99. Cost float64
  100. Bytes float64
  101. BytesUsedAvg float64
  102. BytesUsedMax float64
  103. Local bool
  104. Start time.Time
  105. End time.Time
  106. Minutes float64
  107. Breakdown *ClusterCostsBreakdown
  108. }
  109. type DiskIdentifier struct {
  110. Cluster string
  111. Name string
  112. }
  113. func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  114. // Query for the duration between start and end
  115. durStr := timeutil.DurationString(end.Sub(start))
  116. if durStr == "" {
  117. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  118. }
  119. // Start from the time "end", querying backwards
  120. t := end
  121. // minsPerResolution determines accuracy and resource use for the following
  122. // queries. Smaller values (higher resolution) result in better accuracy,
  123. // but more expensive queries, and vice-a-versa.
  124. resolution := env.GetETLResolution()
  125. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  126. var minsPerResolution int
  127. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  128. minsPerResolution = 1
  129. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  130. }
  131. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  132. // value, converts it to a cumulative value; i.e.
  133. // [$/hr] * [min/res]*[hr/min] = [$/res]
  134. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  135. // TODO niko/assets how do we not hard-code this price?
  136. costPerGBHr := 0.04 / 730.0
  137. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  138. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  139. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  140. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  141. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info[%s])) by (%s, persistentvolume, storageclass)`, durStr, env.GetPromClusterLabel())
  142. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes[%s])) by (%s, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  143. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes[%s])) by (%s, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  144. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  145. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  146. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  147. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(avg_over_time(container_fs_usage_bytes{device!="tmpfs", id="/"}[%s])) by (instance, %s)`, durStr, env.GetPromClusterLabel())
  148. queryLocalStorageUsedMax := fmt.Sprintf(`max(max_over_time(container_fs_usage_bytes{device!="tmpfs", id="/"}[%s])) by (instance, %s)`, durStr, env.GetPromClusterLabel())
  149. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  150. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  151. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  152. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  153. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  154. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  155. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  156. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  157. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  158. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  159. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  160. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  161. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  162. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  163. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  164. resPVCost, _ := resChPVCost.Await()
  165. resPVSize, _ := resChPVSize.Await()
  166. resActiveMins, _ := resChActiveMins.Await()
  167. resPVStorageClass, _ := resChPVStorageClass.Await()
  168. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  169. resPVUsedMax, _ := resChPVUsedMax.Await()
  170. resPVCInfo, _ := resChPVCInfo.Await()
  171. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  172. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  173. resLocalStorageUsedAvg, _ := resChLocalStoreageUsedAvg.Await()
  174. resLocalStorageUsedMax, _ := resChLocalStoreageUsedMax.Await()
  175. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  176. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  177. if ctx.HasErrors() {
  178. return nil, ctx.ErrorCollection()
  179. }
  180. diskMap := map[DiskIdentifier]*Disk{}
  181. for _, result := range resPVCInfo {
  182. cluster, err := result.GetString(env.GetPromClusterLabel())
  183. if err != nil {
  184. cluster = env.GetClusterID()
  185. }
  186. volumeName, err := result.GetString("volumename")
  187. if err != nil {
  188. log.Warnf("ClusterDisks: pv claim data missing volumename")
  189. continue
  190. }
  191. claimName, err := result.GetString("persistentvolumeclaim")
  192. if err != nil {
  193. log.Warnf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  194. continue
  195. }
  196. claimNamespace, err := result.GetString("namespace")
  197. if err != nil {
  198. log.Warnf("ClusterDisks: pv claim data missing namespace")
  199. continue
  200. }
  201. key := DiskIdentifier{cluster, volumeName}
  202. if _, ok := diskMap[key]; !ok {
  203. diskMap[key] = &Disk{
  204. Cluster: cluster,
  205. Name: volumeName,
  206. Breakdown: &ClusterCostsBreakdown{},
  207. }
  208. }
  209. diskMap[key].VolumeName = volumeName
  210. diskMap[key].ClaimName = claimName
  211. diskMap[key].ClaimNamespace = claimNamespace
  212. }
  213. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, provider)
  214. for _, result := range resLocalStorageCost {
  215. cluster, err := result.GetString(env.GetPromClusterLabel())
  216. if err != nil {
  217. cluster = env.GetClusterID()
  218. }
  219. name, err := result.GetString("instance")
  220. if err != nil {
  221. log.Warnf("ClusterDisks: local storage data missing instance")
  222. continue
  223. }
  224. cost := result.Values[0].Value
  225. key := DiskIdentifier{cluster, name}
  226. if _, ok := diskMap[key]; !ok {
  227. diskMap[key] = &Disk{
  228. Cluster: cluster,
  229. Name: name,
  230. Breakdown: &ClusterCostsBreakdown{},
  231. Local: true,
  232. }
  233. }
  234. diskMap[key].Cost += cost
  235. //Assigning explicitly the storage class of local storage to local
  236. diskMap[key].StorageClass = kubecost.LocalStorageClass
  237. }
  238. for _, result := range resLocalStorageUsedCost {
  239. cluster, err := result.GetString(env.GetPromClusterLabel())
  240. if err != nil {
  241. cluster = env.GetClusterID()
  242. }
  243. name, err := result.GetString("instance")
  244. if err != nil {
  245. log.Warnf("ClusterDisks: local storage usage data missing instance")
  246. continue
  247. }
  248. cost := result.Values[0].Value
  249. key := DiskIdentifier{cluster, name}
  250. if _, ok := diskMap[key]; !ok {
  251. diskMap[key] = &Disk{
  252. Cluster: cluster,
  253. Name: name,
  254. Breakdown: &ClusterCostsBreakdown{},
  255. Local: true,
  256. }
  257. }
  258. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  259. }
  260. for _, result := range resLocalStorageUsedAvg {
  261. cluster, err := result.GetString(env.GetPromClusterLabel())
  262. if err != nil {
  263. cluster = env.GetClusterID()
  264. }
  265. name, err := result.GetString("instance")
  266. if err != nil {
  267. log.Warnf("ClusterDisks: local storage data missing instance")
  268. continue
  269. }
  270. bytesAvg := result.Values[0].Value
  271. key := DiskIdentifier{cluster, name}
  272. if _, ok := diskMap[key]; !ok {
  273. diskMap[key] = &Disk{
  274. Cluster: cluster,
  275. Name: name,
  276. Breakdown: &ClusterCostsBreakdown{},
  277. Local: true,
  278. }
  279. }
  280. diskMap[key].BytesUsedAvg = bytesAvg
  281. }
  282. for _, result := range resLocalStorageUsedMax {
  283. cluster, err := result.GetString(env.GetPromClusterLabel())
  284. if err != nil {
  285. cluster = env.GetClusterID()
  286. }
  287. name, err := result.GetString("instance")
  288. if err != nil {
  289. log.Warnf("ClusterDisks: local storage data missing instance")
  290. continue
  291. }
  292. bytesMax := result.Values[0].Value
  293. key := DiskIdentifier{cluster, name}
  294. if _, ok := diskMap[key]; !ok {
  295. diskMap[key] = &Disk{
  296. Cluster: cluster,
  297. Name: name,
  298. Breakdown: &ClusterCostsBreakdown{},
  299. Local: true,
  300. }
  301. }
  302. diskMap[key].BytesUsedMax = bytesMax
  303. }
  304. for _, result := range resLocalStorageBytes {
  305. cluster, err := result.GetString(env.GetPromClusterLabel())
  306. if err != nil {
  307. cluster = env.GetClusterID()
  308. }
  309. name, err := result.GetString("instance")
  310. if err != nil {
  311. log.Warnf("ClusterDisks: local storage data missing instance")
  312. continue
  313. }
  314. bytes := result.Values[0].Value
  315. key := DiskIdentifier{cluster, name}
  316. if _, ok := diskMap[key]; !ok {
  317. diskMap[key] = &Disk{
  318. Cluster: cluster,
  319. Name: name,
  320. Breakdown: &ClusterCostsBreakdown{},
  321. Local: true,
  322. }
  323. }
  324. diskMap[key].Bytes = bytes
  325. if bytes/1024/1024/1024 > maxLocalDiskSize {
  326. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  327. delete(diskMap, key)
  328. }
  329. }
  330. for _, result := range resLocalActiveMins {
  331. cluster, err := result.GetString(env.GetPromClusterLabel())
  332. if err != nil {
  333. cluster = env.GetClusterID()
  334. }
  335. name, err := result.GetString("node")
  336. if err != nil {
  337. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  338. continue
  339. }
  340. key := DiskIdentifier{cluster, name}
  341. if _, ok := diskMap[key]; !ok {
  342. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  343. continue
  344. }
  345. if len(result.Values) == 0 {
  346. continue
  347. }
  348. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  349. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  350. mins := e.Sub(s).Minutes()
  351. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  352. diskMap[key].End = e
  353. diskMap[key].Start = s
  354. diskMap[key].Minutes = mins
  355. }
  356. var unTracedDiskLogData []DiskIdentifier
  357. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  358. for _, result := range resPVStorageClass {
  359. cluster, err := result.GetString(env.GetPromClusterLabel())
  360. if err != nil {
  361. cluster = env.GetClusterID()
  362. }
  363. name, _ := result.GetString("persistentvolume")
  364. key := DiskIdentifier{cluster, name}
  365. if _, ok := diskMap[key]; !ok {
  366. if !slices.Contains(unTracedDiskLogData, key) {
  367. unTracedDiskLogData = append(unTracedDiskLogData, key)
  368. }
  369. continue
  370. }
  371. if len(result.Values) == 0 {
  372. continue
  373. }
  374. storageClass, err := result.GetString("storageclass")
  375. if err != nil {
  376. diskMap[key].StorageClass = kubecost.UnknownStorageClass
  377. } else {
  378. diskMap[key].StorageClass = storageClass
  379. }
  380. }
  381. // Logging the unidentified disk information outside the loop
  382. for _, unIdentifiedDisk := range unTracedDiskLogData {
  383. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  384. }
  385. for _, disk := range diskMap {
  386. // Apply all remaining RAM to Idle
  387. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  388. // Set provider Id to the name for reconciliation
  389. if disk.ProviderID == "" {
  390. disk.ProviderID = disk.Name
  391. }
  392. }
  393. return diskMap, nil
  394. }
  395. type Node struct {
  396. Cluster string
  397. Name string
  398. ProviderID string
  399. NodeType string
  400. CPUCost float64
  401. CPUCores float64
  402. GPUCost float64
  403. GPUCount float64
  404. RAMCost float64
  405. RAMBytes float64
  406. Discount float64
  407. Preemptible bool
  408. CPUBreakdown *ClusterCostsBreakdown
  409. RAMBreakdown *ClusterCostsBreakdown
  410. Start time.Time
  411. End time.Time
  412. Minutes float64
  413. Labels map[string]string
  414. CostPerCPUHr float64
  415. CostPerRAMGiBHr float64
  416. CostPerGPUHr float64
  417. }
  418. // GKE lies about the number of cores e2 nodes have. This table
  419. // contains a mapping from node type -> actual CPU cores
  420. // for those cases.
  421. var partialCPUMap = map[string]float64{
  422. "e2-micro": 0.25,
  423. "e2-small": 0.5,
  424. "e2-medium": 1.0,
  425. }
  426. type NodeIdentifier struct {
  427. Cluster string
  428. Name string
  429. ProviderID string
  430. }
  431. type nodeIdentifierNoProviderID struct {
  432. Cluster string
  433. Name string
  434. }
  435. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  436. for k, v := range activeDataMap {
  437. keyNon := nodeIdentifierNoProviderID{
  438. Cluster: k.Cluster,
  439. Name: k.Name,
  440. }
  441. if cost, ok := costMap[k]; ok {
  442. minutes := v.minutes
  443. count := 1.0
  444. if c, ok := resourceCountMap[keyNon]; ok {
  445. count = c
  446. }
  447. costMap[k] = cost * (minutes / 60) * count
  448. }
  449. }
  450. }
  451. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  452. for k, v := range activeDataMap {
  453. if cost, ok := costMap[k]; ok {
  454. minutes := v.minutes
  455. costMap[k] = cost * (minutes / 60)
  456. }
  457. }
  458. }
  459. func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  460. // Query for the duration between start and end
  461. durStr := timeutil.DurationString(end.Sub(start))
  462. if durStr == "" {
  463. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  464. }
  465. // Start from the time "end", querying backwards
  466. t := end
  467. // minsPerResolution determines accuracy and resource use for the following
  468. // queries. Smaller values (higher resolution) result in better accuracy,
  469. // but more expensive queries, and vice-a-versa.
  470. resolution := env.GetETLResolution()
  471. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  472. var minsPerResolution int
  473. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  474. minsPerResolution = 1
  475. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  476. }
  477. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  478. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  479. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  480. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  481. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  482. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  483. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  484. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  485. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  486. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  487. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  488. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  489. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  490. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  491. // Return errors if these fail
  492. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  493. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  494. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  495. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  496. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  497. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  498. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  499. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  500. // Do not return errors if these fail, but log warnings
  501. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  502. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  503. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  504. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  505. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  506. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  507. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  508. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  509. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  510. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  511. resIsSpot, _ := resChIsSpot.Await()
  512. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  513. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  514. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  515. resActiveMins, _ := resChActiveMins.Await()
  516. resLabels, _ := resChLabels.Await()
  517. if optionalCtx.HasErrors() {
  518. for _, err := range optionalCtx.Errors() {
  519. log.Warnf("ClusterNodes: %s", err)
  520. }
  521. }
  522. if requiredCtx.HasErrors() {
  523. for _, err := range requiredCtx.Errors() {
  524. log.Errorf("ClusterNodes: %s", err)
  525. }
  526. return nil, requiredCtx.ErrorCollection()
  527. }
  528. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  529. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  530. preemptibleMap := buildPreemptibleMap(resIsSpot)
  531. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  532. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  533. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  534. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  535. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  536. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  537. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  538. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  539. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  540. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  541. labelsMap := buildLabelsMap(resLabels)
  542. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  543. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  544. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  545. nodeMap := buildNodeMap(
  546. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  547. cpuCoresMap, ramBytesMap, ramUserPctMap,
  548. ramSystemPctMap,
  549. cpuBreakdownMap,
  550. activeDataMap,
  551. preemptibleMap,
  552. labelsMap,
  553. clusterAndNameToType,
  554. resolution,
  555. )
  556. c, err := cp.GetConfig()
  557. if err != nil {
  558. return nil, err
  559. }
  560. discount, err := ParsePercentString(c.Discount)
  561. if err != nil {
  562. return nil, err
  563. }
  564. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  565. if err != nil {
  566. return nil, err
  567. }
  568. for _, node := range nodeMap {
  569. // TODO take GKE Reserved Instances into account
  570. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  571. // Apply all remaining resources to Idle
  572. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  573. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  574. }
  575. return nodeMap, nil
  576. }
  577. type LoadBalancerIdentifier struct {
  578. Cluster string
  579. Namespace string
  580. Name string
  581. }
  582. type LoadBalancer struct {
  583. Cluster string
  584. Namespace string
  585. Name string
  586. ProviderID string
  587. Cost float64
  588. Start time.Time
  589. End time.Time
  590. Minutes float64
  591. }
  592. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  593. // Query for the duration between start and end
  594. durStr := timeutil.DurationString(end.Sub(start))
  595. if durStr == "" {
  596. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  597. }
  598. // Start from the time "end", querying backwards
  599. t := end
  600. // minsPerResolution determines accuracy and resource use for the following
  601. // queries. Smaller values (higher resolution) result in better accuracy,
  602. // but more expensive queries, and vice-a-versa.
  603. resolution := env.GetETLResolution()
  604. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  605. var minsPerResolution int
  606. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  607. minsPerResolution = 1
  608. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  609. }
  610. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  611. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  612. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  613. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  614. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  615. resLBCost, _ := resChLBCost.Await()
  616. resActiveMins, _ := resChActiveMins.Await()
  617. if ctx.HasErrors() {
  618. return nil, ctx.ErrorCollection()
  619. }
  620. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  621. for _, result := range resActiveMins {
  622. cluster, err := result.GetString(env.GetPromClusterLabel())
  623. if err != nil {
  624. cluster = env.GetClusterID()
  625. }
  626. namespace, err := result.GetString("namespace")
  627. if err != nil {
  628. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  629. continue
  630. }
  631. name, err := result.GetString("service_name")
  632. if err != nil {
  633. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  634. continue
  635. }
  636. providerID, err := result.GetString("ingress_ip")
  637. if err != nil {
  638. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  639. providerID = ""
  640. }
  641. key := LoadBalancerIdentifier{
  642. Cluster: cluster,
  643. Namespace: namespace,
  644. Name: name,
  645. }
  646. // Skip if there are no data
  647. if len(result.Values) == 0 {
  648. continue
  649. }
  650. // Add load balancer to the set of load balancers
  651. if _, ok := loadBalancerMap[key]; !ok {
  652. loadBalancerMap[key] = &LoadBalancer{
  653. Cluster: cluster,
  654. Namespace: namespace,
  655. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  656. ProviderID: cloud.ParseLBID(providerID),
  657. }
  658. }
  659. // Append start, end, and minutes. This should come before all other data.
  660. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  661. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  662. loadBalancerMap[key].Start = s
  663. loadBalancerMap[key].End = e
  664. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  665. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  666. // Prevents there from being a duplicate LoadBalancers on the same day
  667. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  668. loadBalancerMap[key].ProviderID = providerID
  669. }
  670. }
  671. for _, result := range resLBCost {
  672. cluster, err := result.GetString(env.GetPromClusterLabel())
  673. if err != nil {
  674. cluster = env.GetClusterID()
  675. }
  676. namespace, err := result.GetString("namespace")
  677. if err != nil {
  678. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  679. continue
  680. }
  681. name, err := result.GetString("service_name")
  682. if err != nil {
  683. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  684. continue
  685. }
  686. key := LoadBalancerIdentifier{
  687. Cluster: cluster,
  688. Namespace: namespace,
  689. Name: name,
  690. }
  691. // Apply cost as price-per-hour * hours
  692. if lb, ok := loadBalancerMap[key]; ok {
  693. lbPricePerHr := result.Values[0].Value
  694. hrs := lb.Minutes / 60.0
  695. lb.Cost += lbPricePerHr * hrs
  696. } else {
  697. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  698. }
  699. }
  700. return loadBalancerMap, nil
  701. }
  702. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  703. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  704. if window < 10*time.Minute {
  705. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  706. }
  707. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  708. start, end := timeutil.ParseTimeRange(window, offset)
  709. mins := end.Sub(start).Minutes()
  710. windowStr := timeutil.DurationString(window)
  711. // minsPerResolution determines accuracy and resource use for the following
  712. // queries. Smaller values (higher resolution) result in better accuracy,
  713. // but more expensive queries, and vice-a-versa.
  714. resolution := env.GetETLResolution()
  715. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  716. var minsPerResolution int
  717. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  718. minsPerResolution = 1
  719. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  720. }
  721. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  722. // value, converts it to a cumulative value; i.e.
  723. // [$/hr] * [min/res]*[hr/min] = [$/res]
  724. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  725. const fmtQueryDataCount = `
  726. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  727. `
  728. const fmtQueryTotalGPU = `
  729. sum(
  730. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  731. ) by (%s)
  732. `
  733. const fmtQueryTotalCPU = `
  734. sum(
  735. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  736. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  737. ) by (%s)
  738. `
  739. const fmtQueryTotalRAM = `
  740. sum(
  741. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  742. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  743. ) by (%s)
  744. `
  745. const fmtQueryTotalStorage = `
  746. sum(
  747. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  748. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  749. ) by (%s)
  750. `
  751. const fmtQueryCPUModePct = `
  752. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  753. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  754. `
  755. const fmtQueryRAMSystemPct = `
  756. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  757. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  758. `
  759. const fmtQueryRAMUserPct = `
  760. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  761. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  762. `
  763. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  764. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  765. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  766. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  767. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  768. if queryTotalLocalStorage != "" {
  769. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  770. }
  771. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  772. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  773. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  774. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  775. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  776. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  777. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  778. resChs := ctx.QueryAll(
  779. queryDataCount,
  780. queryTotalGPU,
  781. queryTotalCPU,
  782. queryTotalRAM,
  783. queryTotalStorage,
  784. )
  785. // Only submit the local storage query if it is valid. Otherwise Prometheus
  786. // will return errors. Always append something to resChs, regardless, to
  787. // maintain indexing.
  788. if queryTotalLocalStorage != "" {
  789. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  790. } else {
  791. resChs = append(resChs, nil)
  792. }
  793. if withBreakdown {
  794. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  795. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  796. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  797. bdResChs := ctx.QueryAll(
  798. queryCPUModePct,
  799. queryRAMSystemPct,
  800. queryRAMUserPct,
  801. )
  802. // Only submit the local storage query if it is valid. Otherwise Prometheus
  803. // will return errors. Always append something to resChs, regardless, to
  804. // maintain indexing.
  805. if queryUsedLocalStorage != "" {
  806. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  807. } else {
  808. bdResChs = append(bdResChs, nil)
  809. }
  810. resChs = append(resChs, bdResChs...)
  811. }
  812. resDataCount, _ := resChs[0].Await()
  813. resTotalGPU, _ := resChs[1].Await()
  814. resTotalCPU, _ := resChs[2].Await()
  815. resTotalRAM, _ := resChs[3].Await()
  816. resTotalStorage, _ := resChs[4].Await()
  817. if ctx.HasErrors() {
  818. return nil, ctx.ErrorCollection()
  819. }
  820. defaultClusterID := env.GetClusterID()
  821. dataMinsByCluster := map[string]float64{}
  822. for _, result := range resDataCount {
  823. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  824. if clusterID == "" {
  825. clusterID = defaultClusterID
  826. }
  827. dataMins := mins
  828. if len(result.Values) > 0 {
  829. dataMins = result.Values[0].Value
  830. } else {
  831. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  832. }
  833. dataMinsByCluster[clusterID] = dataMins
  834. }
  835. // Determine combined discount
  836. discount, customDiscount := 0.0, 0.0
  837. c, err := a.CloudProvider.GetConfig()
  838. if err == nil {
  839. discount, err = ParsePercentString(c.Discount)
  840. if err != nil {
  841. discount = 0.0
  842. }
  843. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  844. if err != nil {
  845. customDiscount = 0.0
  846. }
  847. }
  848. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  849. costData := make(map[string]map[string]float64)
  850. // Helper function to iterate over Prom query results, parsing the raw values into
  851. // the intermediate costData structure.
  852. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  853. for _, result := range results {
  854. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  855. if clusterID == "" {
  856. clusterID = defaultClusterID
  857. }
  858. if _, ok := costData[clusterID]; !ok {
  859. costData[clusterID] = map[string]float64{}
  860. }
  861. if len(result.Values) > 0 {
  862. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  863. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  864. }
  865. }
  866. }
  867. // Apply both sustained use and custom discounts to RAM and CPU
  868. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  869. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  870. // Apply only custom discount to GPU and storage
  871. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  872. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  873. if queryTotalLocalStorage != "" {
  874. resTotalLocalStorage, err := resChs[5].Await()
  875. if err != nil {
  876. return nil, err
  877. }
  878. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  879. }
  880. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  881. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  882. pvUsedCostMap := map[string]float64{}
  883. if withBreakdown {
  884. resCPUModePct, _ := resChs[6].Await()
  885. resRAMSystemPct, _ := resChs[7].Await()
  886. resRAMUserPct, _ := resChs[8].Await()
  887. if ctx.HasErrors() {
  888. return nil, ctx.ErrorCollection()
  889. }
  890. for _, result := range resCPUModePct {
  891. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  892. if clusterID == "" {
  893. clusterID = defaultClusterID
  894. }
  895. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  896. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  897. }
  898. cpuBD := cpuBreakdownMap[clusterID]
  899. mode, err := result.GetString("mode")
  900. if err != nil {
  901. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  902. mode = "other"
  903. }
  904. switch mode {
  905. case "idle":
  906. cpuBD.Idle += result.Values[0].Value
  907. case "system":
  908. cpuBD.System += result.Values[0].Value
  909. case "user":
  910. cpuBD.User += result.Values[0].Value
  911. default:
  912. cpuBD.Other += result.Values[0].Value
  913. }
  914. }
  915. for _, result := range resRAMSystemPct {
  916. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  917. if clusterID == "" {
  918. clusterID = defaultClusterID
  919. }
  920. if _, ok := ramBreakdownMap[clusterID]; !ok {
  921. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  922. }
  923. ramBD := ramBreakdownMap[clusterID]
  924. ramBD.System += result.Values[0].Value
  925. }
  926. for _, result := range resRAMUserPct {
  927. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  928. if clusterID == "" {
  929. clusterID = defaultClusterID
  930. }
  931. if _, ok := ramBreakdownMap[clusterID]; !ok {
  932. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  933. }
  934. ramBD := ramBreakdownMap[clusterID]
  935. ramBD.User += result.Values[0].Value
  936. }
  937. for _, ramBD := range ramBreakdownMap {
  938. remaining := 1.0
  939. remaining -= ramBD.Other
  940. remaining -= ramBD.System
  941. remaining -= ramBD.User
  942. ramBD.Idle = remaining
  943. }
  944. if queryUsedLocalStorage != "" {
  945. resUsedLocalStorage, err := resChs[9].Await()
  946. if err != nil {
  947. return nil, err
  948. }
  949. for _, result := range resUsedLocalStorage {
  950. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  951. if clusterID == "" {
  952. clusterID = defaultClusterID
  953. }
  954. pvUsedCostMap[clusterID] += result.Values[0].Value
  955. }
  956. }
  957. }
  958. if ctx.HasErrors() {
  959. for _, err := range ctx.Errors() {
  960. log.Errorf("ComputeClusterCosts: %s", err)
  961. }
  962. return nil, ctx.ErrorCollection()
  963. }
  964. // Convert intermediate structure to Costs instances
  965. costsByCluster := map[string]*ClusterCosts{}
  966. for id, cd := range costData {
  967. dataMins, ok := dataMinsByCluster[id]
  968. if !ok {
  969. dataMins = mins
  970. log.Warnf("Cluster cost data count not found for cluster %s", id)
  971. }
  972. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  973. if err != nil {
  974. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  975. return nil, err
  976. }
  977. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  978. costs.CPUBreakdown = cpuBD
  979. }
  980. if ramBD, ok := ramBreakdownMap[id]; ok {
  981. costs.RAMBreakdown = ramBD
  982. }
  983. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  984. if pvUC, ok := pvUsedCostMap[id]; ok {
  985. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  986. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  987. }
  988. costs.DataMinutes = dataMins
  989. costsByCluster[id] = costs
  990. }
  991. return costsByCluster, nil
  992. }
  993. type Totals struct {
  994. TotalCost [][]string `json:"totalcost"`
  995. CPUCost [][]string `json:"cpucost"`
  996. MemCost [][]string `json:"memcost"`
  997. StorageCost [][]string `json:"storageCost"`
  998. }
  999. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1000. if len(qrs) == 0 {
  1001. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1002. }
  1003. result := qrs[0]
  1004. totals := [][]string{}
  1005. for _, value := range result.Values {
  1006. d0 := fmt.Sprintf("%f", value.Timestamp)
  1007. d1 := fmt.Sprintf("%f", value.Value)
  1008. toAppend := []string{
  1009. d0,
  1010. d1,
  1011. }
  1012. totals = append(totals, toAppend)
  1013. }
  1014. return totals, nil
  1015. }
  1016. // ClusterCostsOverTime gives the full cluster costs over time
  1017. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1018. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1019. if localStorageQuery != "" {
  1020. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1021. }
  1022. layout := "2006-01-02T15:04:05.000Z"
  1023. start, err := time.Parse(layout, startString)
  1024. if err != nil {
  1025. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1026. return nil, err
  1027. }
  1028. end, err := time.Parse(layout, endString)
  1029. if err != nil {
  1030. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1031. return nil, err
  1032. }
  1033. fmtWindow := timeutil.DurationString(window)
  1034. if fmtWindow == "" {
  1035. err := fmt.Errorf("window value invalid or missing")
  1036. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1037. return nil, err
  1038. }
  1039. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1040. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1041. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1042. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1043. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1044. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1045. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1046. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1047. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1048. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1049. resultClusterCores, err := resChClusterCores.Await()
  1050. if err != nil {
  1051. return nil, err
  1052. }
  1053. resultClusterRAM, err := resChClusterRAM.Await()
  1054. if err != nil {
  1055. return nil, err
  1056. }
  1057. resultStorage, err := resChStorage.Await()
  1058. if err != nil {
  1059. return nil, err
  1060. }
  1061. resultTotal, err := resChTotal.Await()
  1062. if err != nil {
  1063. return nil, err
  1064. }
  1065. coreTotal, err := resultToTotals(resultClusterCores)
  1066. if err != nil {
  1067. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1068. return nil, err
  1069. }
  1070. ramTotal, err := resultToTotals(resultClusterRAM)
  1071. if err != nil {
  1072. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1073. return nil, err
  1074. }
  1075. storageTotal, err := resultToTotals(resultStorage)
  1076. if err != nil {
  1077. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1078. }
  1079. clusterTotal, err := resultToTotals(resultTotal)
  1080. if err != nil {
  1081. // If clusterTotal query failed, it's likely because there are no PVs, which
  1082. // causes the qTotal query to return no data. Instead, query only node costs.
  1083. // If that fails, return an error because something is actually wrong.
  1084. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  1085. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1086. for _, warning := range warnings {
  1087. log.Warnf(warning)
  1088. }
  1089. if err != nil {
  1090. return nil, err
  1091. }
  1092. clusterTotal, err = resultToTotals(resultNodes)
  1093. if err != nil {
  1094. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1095. return nil, err
  1096. }
  1097. }
  1098. return &Totals{
  1099. TotalCost: clusterTotal,
  1100. CPUCost: coreTotal,
  1101. MemCost: ramTotal,
  1102. StorageCost: storageTotal,
  1103. }, nil
  1104. }
  1105. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp cloud.Provider) {
  1106. for _, result := range resActiveMins {
  1107. cluster, err := result.GetString(env.GetPromClusterLabel())
  1108. if err != nil {
  1109. cluster = env.GetClusterID()
  1110. }
  1111. name, err := result.GetString("persistentvolume")
  1112. if err != nil {
  1113. log.Warnf("ClusterDisks: active mins missing pv name")
  1114. continue
  1115. }
  1116. if len(result.Values) == 0 {
  1117. continue
  1118. }
  1119. key := DiskIdentifier{cluster, name}
  1120. if _, ok := diskMap[key]; !ok {
  1121. diskMap[key] = &Disk{
  1122. Cluster: cluster,
  1123. Name: name,
  1124. Breakdown: &ClusterCostsBreakdown{},
  1125. }
  1126. }
  1127. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  1128. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  1129. mins := e.Sub(s).Minutes()
  1130. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  1131. diskMap[key].End = e
  1132. diskMap[key].Start = s
  1133. diskMap[key].Minutes = mins
  1134. }
  1135. for _, result := range resPVSize {
  1136. cluster, err := result.GetString(env.GetPromClusterLabel())
  1137. if err != nil {
  1138. cluster = env.GetClusterID()
  1139. }
  1140. name, err := result.GetString("persistentvolume")
  1141. if err != nil {
  1142. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1143. continue
  1144. }
  1145. // TODO niko/assets storage class
  1146. bytes := result.Values[0].Value
  1147. key := DiskIdentifier{cluster, name}
  1148. if _, ok := diskMap[key]; !ok {
  1149. diskMap[key] = &Disk{
  1150. Cluster: cluster,
  1151. Name: name,
  1152. Breakdown: &ClusterCostsBreakdown{},
  1153. }
  1154. }
  1155. diskMap[key].Bytes = bytes
  1156. }
  1157. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  1158. customPricingConfig, err := cp.GetConfig()
  1159. if err != nil {
  1160. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1161. }
  1162. for _, result := range resPVCost {
  1163. cluster, err := result.GetString(env.GetPromClusterLabel())
  1164. if err != nil {
  1165. cluster = env.GetClusterID()
  1166. }
  1167. name, err := result.GetString("persistentvolume")
  1168. if err != nil {
  1169. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1170. continue
  1171. }
  1172. // TODO niko/assets storage class
  1173. var cost float64
  1174. if customPricingEnabled && customPricingConfig != nil {
  1175. customPVCostStr := customPricingConfig.Storage
  1176. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1177. if err != nil {
  1178. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1179. }
  1180. cost = customPVCost
  1181. } else {
  1182. cost = result.Values[0].Value
  1183. }
  1184. key := DiskIdentifier{cluster, name}
  1185. if _, ok := diskMap[key]; !ok {
  1186. diskMap[key] = &Disk{
  1187. Cluster: cluster,
  1188. Name: name,
  1189. Breakdown: &ClusterCostsBreakdown{},
  1190. }
  1191. }
  1192. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1193. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1194. if providerID != "" {
  1195. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1196. }
  1197. }
  1198. for _, result := range resPVUsedAvg {
  1199. cluster, err := result.GetString(env.GetPromClusterLabel())
  1200. if err != nil {
  1201. cluster = env.GetClusterID()
  1202. }
  1203. claimName, err := result.GetString("persistentvolumeclaim")
  1204. if err != nil {
  1205. log.Warnf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1206. continue
  1207. }
  1208. claimNamespace, err := result.GetString("namespace")
  1209. if err != nil {
  1210. log.Warnf("ClusterDisks: pv usage data missing namespace")
  1211. continue
  1212. }
  1213. var volumeName string
  1214. for _, thatRes := range resPVCInfo {
  1215. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1216. if err != nil {
  1217. thatCluster = env.GetClusterID()
  1218. }
  1219. thatVolumeName, err := thatRes.GetString("volumename")
  1220. if err != nil {
  1221. log.Warnf("ClusterDisks: pv claim data missing volumename")
  1222. continue
  1223. }
  1224. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1225. if err != nil {
  1226. log.Warnf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1227. continue
  1228. }
  1229. thatClaimNamespace, err := thatRes.GetString("namespace")
  1230. if err != nil {
  1231. log.Warnf("ClusterDisks: pv claim data missing namespace")
  1232. continue
  1233. }
  1234. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1235. volumeName = thatVolumeName
  1236. }
  1237. }
  1238. usage := result.Values[0].Value
  1239. key := DiskIdentifier{cluster, volumeName}
  1240. if _, ok := diskMap[key]; !ok {
  1241. diskMap[key] = &Disk{
  1242. Cluster: cluster,
  1243. Name: volumeName,
  1244. Breakdown: &ClusterCostsBreakdown{},
  1245. }
  1246. }
  1247. diskMap[key].BytesUsedAvg = usage
  1248. }
  1249. for _, result := range resPVUsedMax {
  1250. cluster, err := result.GetString(env.GetPromClusterLabel())
  1251. if err != nil {
  1252. cluster = env.GetClusterID()
  1253. }
  1254. claimName, err := result.GetString("persistentvolumeclaim")
  1255. if err != nil {
  1256. log.Warnf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1257. continue
  1258. }
  1259. claimNamespace, err := result.GetString("namespace")
  1260. if err != nil {
  1261. log.Warnf("ClusterDisks: pv usage data missing namespace")
  1262. continue
  1263. }
  1264. var volumeName string
  1265. for _, thatRes := range resPVCInfo {
  1266. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1267. if err != nil {
  1268. thatCluster = env.GetClusterID()
  1269. }
  1270. thatVolumeName, err := thatRes.GetString("volumename")
  1271. if err != nil {
  1272. log.Warnf("ClusterDisks: pv claim data missing volumename")
  1273. continue
  1274. }
  1275. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1276. if err != nil {
  1277. log.Warnf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1278. continue
  1279. }
  1280. thatClaimNamespace, err := thatRes.GetString("namespace")
  1281. if err != nil {
  1282. log.Warnf("ClusterDisks: pv claim data missing namespace")
  1283. continue
  1284. }
  1285. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1286. volumeName = thatVolumeName
  1287. }
  1288. }
  1289. usage := result.Values[0].Value
  1290. key := DiskIdentifier{cluster, volumeName}
  1291. if _, ok := diskMap[key]; !ok {
  1292. diskMap[key] = &Disk{
  1293. Cluster: cluster,
  1294. Name: volumeName,
  1295. Breakdown: &ClusterCostsBreakdown{},
  1296. }
  1297. }
  1298. diskMap[key].BytesUsedMax = usage
  1299. }
  1300. }