cluster.go 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/opencost/opencost/pkg/kubecost"
  7. "github.com/opencost/opencost/pkg/util/timeutil"
  8. "golang.org/x/exp/slices"
  9. "github.com/opencost/opencost/pkg/cloud"
  10. "github.com/opencost/opencost/pkg/env"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/prom"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. const (
  16. queryClusterCores = `sum(
  17. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  18. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  19. ) by (%s)`
  20. queryClusterRAM = `sum(
  21. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  22. ) by (%s)`
  23. queryStorage = `sum(
  24. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  26. ) by (%s) %s`
  27. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  28. sum(
  29. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  30. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  31. ) by (%s) %s`
  32. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  33. )
  34. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  35. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  36. // are broken down by cores, memory, and storage.
  37. type ClusterCosts struct {
  38. Start *time.Time `json:"startTime"`
  39. End *time.Time `json:"endTime"`
  40. CPUCumulative float64 `json:"cpuCumulativeCost"`
  41. CPUMonthly float64 `json:"cpuMonthlyCost"`
  42. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  43. GPUCumulative float64 `json:"gpuCumulativeCost"`
  44. GPUMonthly float64 `json:"gpuMonthlyCost"`
  45. RAMCumulative float64 `json:"ramCumulativeCost"`
  46. RAMMonthly float64 `json:"ramMonthlyCost"`
  47. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  48. StorageCumulative float64 `json:"storageCumulativeCost"`
  49. StorageMonthly float64 `json:"storageMonthlyCost"`
  50. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  51. TotalCumulative float64 `json:"totalCumulativeCost"`
  52. TotalMonthly float64 `json:"totalMonthlyCost"`
  53. DataMinutes float64
  54. }
  55. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  56. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  57. type ClusterCostsBreakdown struct {
  58. Idle float64 `json:"idle"`
  59. Other float64 `json:"other"`
  60. System float64 `json:"system"`
  61. User float64 `json:"user"`
  62. }
  63. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  64. // the associated monthly rate data, and returns the Costs.
  65. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  66. start, end := timeutil.ParseTimeRange(window, offset)
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: &start,
  77. End: &end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. StorageClass string
  96. VolumeName string
  97. ClaimName string
  98. ClaimNamespace string
  99. Cost float64
  100. Bytes float64
  101. // These two fields may not be available at all times because they rely on
  102. // a new set of metrics that may or may not be available. Thus, they must
  103. // be nilable to represent the complete absence of the data.
  104. //
  105. // In other words, nilability here lets us distinguish between
  106. // "metric is not available" and "metric is available but is 0".
  107. //
  108. // They end in "Ptr" to distinguish from an earlier version in order to
  109. // ensure that all usages are checked for nil.
  110. BytesUsedAvgPtr *float64
  111. BytesUsedMaxPtr *float64
  112. Local bool
  113. Start time.Time
  114. End time.Time
  115. Minutes float64
  116. Breakdown *ClusterCostsBreakdown
  117. }
  118. type DiskIdentifier struct {
  119. Cluster string
  120. Name string
  121. }
  122. func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  123. // Query for the duration between start and end
  124. durStr := timeutil.DurationString(end.Sub(start))
  125. if durStr == "" {
  126. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  127. }
  128. // Start from the time "end", querying backwards
  129. t := end
  130. // minsPerResolution determines accuracy and resource use for the following
  131. // queries. Smaller values (higher resolution) result in better accuracy,
  132. // but more expensive queries, and vice-a-versa.
  133. resolution := env.GetETLResolution()
  134. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  135. var minsPerResolution int
  136. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  137. minsPerResolution = 1
  138. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  139. }
  140. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  141. // value, converts it to a cumulative value; i.e.
  142. // [$/hr] * [min/res]*[hr/min] = [$/res]
  143. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  144. // TODO niko/assets how do we not hard-code this price?
  145. costPerGBHr := 0.04 / 730.0
  146. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  147. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  148. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  149. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  150. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info[%s])) by (%s, persistentvolume, storageclass)`, durStr, env.GetPromClusterLabel())
  151. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes[%s])) by (%s, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  152. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes[%s])) by (%s, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  153. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  154. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  155. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  156. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(avg_over_time(container_fs_usage_bytes{device!="tmpfs", id="/"}[%s])) by (instance, %s)`, durStr, env.GetPromClusterLabel())
  157. queryLocalStorageUsedMax := fmt.Sprintf(`max(max_over_time(container_fs_usage_bytes{device!="tmpfs", id="/"}[%s])) by (instance, %s)`, durStr, env.GetPromClusterLabel())
  158. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  159. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  160. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  161. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  162. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  163. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  164. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  165. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  166. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  167. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  168. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  169. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  170. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  171. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  172. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  173. resPVCost, _ := resChPVCost.Await()
  174. resPVSize, _ := resChPVSize.Await()
  175. resActiveMins, _ := resChActiveMins.Await()
  176. resPVStorageClass, _ := resChPVStorageClass.Await()
  177. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  178. resPVUsedMax, _ := resChPVUsedMax.Await()
  179. resPVCInfo, _ := resChPVCInfo.Await()
  180. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  181. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  182. resLocalStorageUsedAvg, _ := resChLocalStoreageUsedAvg.Await()
  183. resLocalStorageUsedMax, _ := resChLocalStoreageUsedMax.Await()
  184. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  185. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  186. if ctx.HasErrors() {
  187. return nil, ctx.ErrorCollection()
  188. }
  189. diskMap := map[DiskIdentifier]*Disk{}
  190. for _, result := range resPVCInfo {
  191. cluster, err := result.GetString(env.GetPromClusterLabel())
  192. if err != nil {
  193. cluster = env.GetClusterID()
  194. }
  195. volumeName, err := result.GetString("volumename")
  196. if err != nil {
  197. log.Debugf("ClusterDisks: pv claim data missing volumename")
  198. continue
  199. }
  200. claimName, err := result.GetString("persistentvolumeclaim")
  201. if err != nil {
  202. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  203. continue
  204. }
  205. claimNamespace, err := result.GetString("namespace")
  206. if err != nil {
  207. log.Debugf("ClusterDisks: pv claim data missing namespace")
  208. continue
  209. }
  210. key := DiskIdentifier{cluster, volumeName}
  211. if _, ok := diskMap[key]; !ok {
  212. diskMap[key] = &Disk{
  213. Cluster: cluster,
  214. Name: volumeName,
  215. Breakdown: &ClusterCostsBreakdown{},
  216. }
  217. }
  218. diskMap[key].VolumeName = volumeName
  219. diskMap[key].ClaimName = claimName
  220. diskMap[key].ClaimNamespace = claimNamespace
  221. }
  222. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, provider)
  223. for _, result := range resLocalStorageCost {
  224. cluster, err := result.GetString(env.GetPromClusterLabel())
  225. if err != nil {
  226. cluster = env.GetClusterID()
  227. }
  228. name, err := result.GetString("instance")
  229. if err != nil {
  230. log.Warnf("ClusterDisks: local storage data missing instance")
  231. continue
  232. }
  233. cost := result.Values[0].Value
  234. key := DiskIdentifier{cluster, name}
  235. if _, ok := diskMap[key]; !ok {
  236. diskMap[key] = &Disk{
  237. Cluster: cluster,
  238. Name: name,
  239. Breakdown: &ClusterCostsBreakdown{},
  240. Local: true,
  241. }
  242. }
  243. diskMap[key].Cost += cost
  244. //Assigning explicitly the storage class of local storage to local
  245. diskMap[key].StorageClass = kubecost.LocalStorageClass
  246. }
  247. for _, result := range resLocalStorageUsedCost {
  248. cluster, err := result.GetString(env.GetPromClusterLabel())
  249. if err != nil {
  250. cluster = env.GetClusterID()
  251. }
  252. name, err := result.GetString("instance")
  253. if err != nil {
  254. log.Warnf("ClusterDisks: local storage usage data missing instance")
  255. continue
  256. }
  257. cost := result.Values[0].Value
  258. key := DiskIdentifier{cluster, name}
  259. if _, ok := diskMap[key]; !ok {
  260. diskMap[key] = &Disk{
  261. Cluster: cluster,
  262. Name: name,
  263. Breakdown: &ClusterCostsBreakdown{},
  264. Local: true,
  265. }
  266. }
  267. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  268. }
  269. for _, result := range resLocalStorageUsedAvg {
  270. cluster, err := result.GetString(env.GetPromClusterLabel())
  271. if err != nil {
  272. cluster = env.GetClusterID()
  273. }
  274. name, err := result.GetString("instance")
  275. if err != nil {
  276. log.Warnf("ClusterDisks: local storage data missing instance")
  277. continue
  278. }
  279. bytesAvg := result.Values[0].Value
  280. key := DiskIdentifier{cluster, name}
  281. if _, ok := diskMap[key]; !ok {
  282. diskMap[key] = &Disk{
  283. Cluster: cluster,
  284. Name: name,
  285. Breakdown: &ClusterCostsBreakdown{},
  286. Local: true,
  287. }
  288. }
  289. diskMap[key].BytesUsedAvgPtr = &bytesAvg
  290. }
  291. for _, result := range resLocalStorageUsedMax {
  292. cluster, err := result.GetString(env.GetPromClusterLabel())
  293. if err != nil {
  294. cluster = env.GetClusterID()
  295. }
  296. name, err := result.GetString("instance")
  297. if err != nil {
  298. log.Warnf("ClusterDisks: local storage data missing instance")
  299. continue
  300. }
  301. bytesMax := result.Values[0].Value
  302. key := DiskIdentifier{cluster, name}
  303. if _, ok := diskMap[key]; !ok {
  304. diskMap[key] = &Disk{
  305. Cluster: cluster,
  306. Name: name,
  307. Breakdown: &ClusterCostsBreakdown{},
  308. Local: true,
  309. }
  310. }
  311. diskMap[key].BytesUsedMaxPtr = &bytesMax
  312. }
  313. for _, result := range resLocalStorageBytes {
  314. cluster, err := result.GetString(env.GetPromClusterLabel())
  315. if err != nil {
  316. cluster = env.GetClusterID()
  317. }
  318. name, err := result.GetString("instance")
  319. if err != nil {
  320. log.Warnf("ClusterDisks: local storage data missing instance")
  321. continue
  322. }
  323. bytes := result.Values[0].Value
  324. key := DiskIdentifier{cluster, name}
  325. if _, ok := diskMap[key]; !ok {
  326. diskMap[key] = &Disk{
  327. Cluster: cluster,
  328. Name: name,
  329. Breakdown: &ClusterCostsBreakdown{},
  330. Local: true,
  331. }
  332. }
  333. diskMap[key].Bytes = bytes
  334. if bytes/1024/1024/1024 > maxLocalDiskSize {
  335. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  336. delete(diskMap, key)
  337. }
  338. }
  339. for _, result := range resLocalActiveMins {
  340. cluster, err := result.GetString(env.GetPromClusterLabel())
  341. if err != nil {
  342. cluster = env.GetClusterID()
  343. }
  344. name, err := result.GetString("node")
  345. if err != nil {
  346. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  347. continue
  348. }
  349. key := DiskIdentifier{cluster, name}
  350. if _, ok := diskMap[key]; !ok {
  351. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  352. continue
  353. }
  354. if len(result.Values) == 0 {
  355. continue
  356. }
  357. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  358. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  359. mins := e.Sub(s).Minutes()
  360. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  361. diskMap[key].End = e
  362. diskMap[key].Start = s
  363. diskMap[key].Minutes = mins
  364. }
  365. var unTracedDiskLogData []DiskIdentifier
  366. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  367. for _, result := range resPVStorageClass {
  368. cluster, err := result.GetString(env.GetPromClusterLabel())
  369. if err != nil {
  370. cluster = env.GetClusterID()
  371. }
  372. name, _ := result.GetString("persistentvolume")
  373. key := DiskIdentifier{cluster, name}
  374. if _, ok := diskMap[key]; !ok {
  375. if !slices.Contains(unTracedDiskLogData, key) {
  376. unTracedDiskLogData = append(unTracedDiskLogData, key)
  377. }
  378. continue
  379. }
  380. if len(result.Values) == 0 {
  381. continue
  382. }
  383. storageClass, err := result.GetString("storageclass")
  384. if err != nil {
  385. diskMap[key].StorageClass = kubecost.UnknownStorageClass
  386. } else {
  387. diskMap[key].StorageClass = storageClass
  388. }
  389. }
  390. // Logging the unidentified disk information outside the loop
  391. for _, unIdentifiedDisk := range unTracedDiskLogData {
  392. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  393. }
  394. for _, disk := range diskMap {
  395. // Apply all remaining RAM to Idle
  396. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  397. // Set provider Id to the name for reconciliation
  398. if disk.ProviderID == "" {
  399. disk.ProviderID = disk.Name
  400. }
  401. }
  402. return diskMap, nil
  403. }
  404. type Node struct {
  405. Cluster string
  406. Name string
  407. ProviderID string
  408. NodeType string
  409. CPUCost float64
  410. CPUCores float64
  411. GPUCost float64
  412. GPUCount float64
  413. RAMCost float64
  414. RAMBytes float64
  415. Discount float64
  416. Preemptible bool
  417. CPUBreakdown *ClusterCostsBreakdown
  418. RAMBreakdown *ClusterCostsBreakdown
  419. Start time.Time
  420. End time.Time
  421. Minutes float64
  422. Labels map[string]string
  423. CostPerCPUHr float64
  424. CostPerRAMGiBHr float64
  425. CostPerGPUHr float64
  426. }
  427. // GKE lies about the number of cores e2 nodes have. This table
  428. // contains a mapping from node type -> actual CPU cores
  429. // for those cases.
  430. var partialCPUMap = map[string]float64{
  431. "e2-micro": 0.25,
  432. "e2-small": 0.5,
  433. "e2-medium": 1.0,
  434. }
  435. type NodeIdentifier struct {
  436. Cluster string
  437. Name string
  438. ProviderID string
  439. }
  440. type nodeIdentifierNoProviderID struct {
  441. Cluster string
  442. Name string
  443. }
  444. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  445. for k, v := range activeDataMap {
  446. keyNon := nodeIdentifierNoProviderID{
  447. Cluster: k.Cluster,
  448. Name: k.Name,
  449. }
  450. if cost, ok := costMap[k]; ok {
  451. minutes := v.minutes
  452. count := 1.0
  453. if c, ok := resourceCountMap[keyNon]; ok {
  454. count = c
  455. }
  456. costMap[k] = cost * (minutes / 60) * count
  457. }
  458. }
  459. }
  460. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  461. for k, v := range activeDataMap {
  462. if cost, ok := costMap[k]; ok {
  463. minutes := v.minutes
  464. costMap[k] = cost * (minutes / 60)
  465. }
  466. }
  467. }
  468. func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  469. // Query for the duration between start and end
  470. durStr := timeutil.DurationString(end.Sub(start))
  471. if durStr == "" {
  472. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  473. }
  474. // Start from the time "end", querying backwards
  475. t := end
  476. // minsPerResolution determines accuracy and resource use for the following
  477. // queries. Smaller values (higher resolution) result in better accuracy,
  478. // but more expensive queries, and vice-a-versa.
  479. resolution := env.GetETLResolution()
  480. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  481. var minsPerResolution int
  482. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  483. minsPerResolution = 1
  484. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  485. }
  486. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  487. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  488. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  489. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  490. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  491. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  492. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  493. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  494. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  495. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  496. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  497. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  498. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  499. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  500. // Return errors if these fail
  501. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  502. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  503. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  504. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  505. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  506. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  507. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  508. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  509. // Do not return errors if these fail, but log warnings
  510. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  511. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  512. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  513. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  514. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  515. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  516. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  517. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  518. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  519. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  520. resIsSpot, _ := resChIsSpot.Await()
  521. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  522. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  523. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  524. resActiveMins, _ := resChActiveMins.Await()
  525. resLabels, _ := resChLabels.Await()
  526. if optionalCtx.HasErrors() {
  527. for _, err := range optionalCtx.Errors() {
  528. log.Warnf("ClusterNodes: %s", err)
  529. }
  530. }
  531. if requiredCtx.HasErrors() {
  532. for _, err := range requiredCtx.Errors() {
  533. log.Errorf("ClusterNodes: %s", err)
  534. }
  535. return nil, requiredCtx.ErrorCollection()
  536. }
  537. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  538. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  539. preemptibleMap := buildPreemptibleMap(resIsSpot)
  540. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  541. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  542. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  543. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  544. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  545. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  546. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  547. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  548. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  549. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  550. labelsMap := buildLabelsMap(resLabels)
  551. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  552. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  553. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  554. nodeMap := buildNodeMap(
  555. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  556. cpuCoresMap, ramBytesMap, ramUserPctMap,
  557. ramSystemPctMap,
  558. cpuBreakdownMap,
  559. activeDataMap,
  560. preemptibleMap,
  561. labelsMap,
  562. clusterAndNameToType,
  563. resolution,
  564. )
  565. c, err := cp.GetConfig()
  566. if err != nil {
  567. return nil, err
  568. }
  569. discount, err := ParsePercentString(c.Discount)
  570. if err != nil {
  571. return nil, err
  572. }
  573. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  574. if err != nil {
  575. return nil, err
  576. }
  577. for _, node := range nodeMap {
  578. // TODO take GKE Reserved Instances into account
  579. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  580. // Apply all remaining resources to Idle
  581. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  582. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  583. }
  584. return nodeMap, nil
  585. }
  586. type LoadBalancerIdentifier struct {
  587. Cluster string
  588. Namespace string
  589. Name string
  590. }
  591. type LoadBalancer struct {
  592. Cluster string
  593. Namespace string
  594. Name string
  595. ProviderID string
  596. Cost float64
  597. Start time.Time
  598. End time.Time
  599. Minutes float64
  600. }
  601. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  602. // Query for the duration between start and end
  603. durStr := timeutil.DurationString(end.Sub(start))
  604. if durStr == "" {
  605. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  606. }
  607. // Start from the time "end", querying backwards
  608. t := end
  609. // minsPerResolution determines accuracy and resource use for the following
  610. // queries. Smaller values (higher resolution) result in better accuracy,
  611. // but more expensive queries, and vice-a-versa.
  612. resolution := env.GetETLResolution()
  613. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  614. var minsPerResolution int
  615. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  616. minsPerResolution = 1
  617. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  618. }
  619. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  620. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  621. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  622. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  623. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  624. resLBCost, _ := resChLBCost.Await()
  625. resActiveMins, _ := resChActiveMins.Await()
  626. if ctx.HasErrors() {
  627. return nil, ctx.ErrorCollection()
  628. }
  629. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  630. for _, result := range resActiveMins {
  631. cluster, err := result.GetString(env.GetPromClusterLabel())
  632. if err != nil {
  633. cluster = env.GetClusterID()
  634. }
  635. namespace, err := result.GetString("namespace")
  636. if err != nil {
  637. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  638. continue
  639. }
  640. name, err := result.GetString("service_name")
  641. if err != nil {
  642. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  643. continue
  644. }
  645. providerID, err := result.GetString("ingress_ip")
  646. if err != nil {
  647. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  648. providerID = ""
  649. }
  650. key := LoadBalancerIdentifier{
  651. Cluster: cluster,
  652. Namespace: namespace,
  653. Name: name,
  654. }
  655. // Skip if there are no data
  656. if len(result.Values) == 0 {
  657. continue
  658. }
  659. // Add load balancer to the set of load balancers
  660. if _, ok := loadBalancerMap[key]; !ok {
  661. loadBalancerMap[key] = &LoadBalancer{
  662. Cluster: cluster,
  663. Namespace: namespace,
  664. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  665. ProviderID: cloud.ParseLBID(providerID),
  666. }
  667. }
  668. // Append start, end, and minutes. This should come before all other data.
  669. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  670. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  671. loadBalancerMap[key].Start = s
  672. loadBalancerMap[key].End = e
  673. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  674. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  675. // Prevents there from being a duplicate LoadBalancers on the same day
  676. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  677. loadBalancerMap[key].ProviderID = providerID
  678. }
  679. }
  680. for _, result := range resLBCost {
  681. cluster, err := result.GetString(env.GetPromClusterLabel())
  682. if err != nil {
  683. cluster = env.GetClusterID()
  684. }
  685. namespace, err := result.GetString("namespace")
  686. if err != nil {
  687. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  688. continue
  689. }
  690. name, err := result.GetString("service_name")
  691. if err != nil {
  692. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  693. continue
  694. }
  695. key := LoadBalancerIdentifier{
  696. Cluster: cluster,
  697. Namespace: namespace,
  698. Name: name,
  699. }
  700. // Apply cost as price-per-hour * hours
  701. if lb, ok := loadBalancerMap[key]; ok {
  702. lbPricePerHr := result.Values[0].Value
  703. hrs := lb.Minutes / 60.0
  704. lb.Cost += lbPricePerHr * hrs
  705. } else {
  706. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  707. }
  708. }
  709. return loadBalancerMap, nil
  710. }
  711. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  712. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  713. if window < 10*time.Minute {
  714. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  715. }
  716. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  717. start, end := timeutil.ParseTimeRange(window, offset)
  718. mins := end.Sub(start).Minutes()
  719. windowStr := timeutil.DurationString(window)
  720. // minsPerResolution determines accuracy and resource use for the following
  721. // queries. Smaller values (higher resolution) result in better accuracy,
  722. // but more expensive queries, and vice-a-versa.
  723. resolution := env.GetETLResolution()
  724. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  725. var minsPerResolution int
  726. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  727. minsPerResolution = 1
  728. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  729. }
  730. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  731. // value, converts it to a cumulative value; i.e.
  732. // [$/hr] * [min/res]*[hr/min] = [$/res]
  733. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  734. const fmtQueryDataCount = `
  735. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  736. `
  737. const fmtQueryTotalGPU = `
  738. sum(
  739. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  740. ) by (%s)
  741. `
  742. const fmtQueryTotalCPU = `
  743. sum(
  744. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  745. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  746. ) by (%s)
  747. `
  748. const fmtQueryTotalRAM = `
  749. sum(
  750. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  751. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  752. ) by (%s)
  753. `
  754. const fmtQueryTotalStorage = `
  755. sum(
  756. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  757. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  758. ) by (%s)
  759. `
  760. const fmtQueryCPUModePct = `
  761. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  762. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  763. `
  764. const fmtQueryRAMSystemPct = `
  765. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  766. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  767. `
  768. const fmtQueryRAMUserPct = `
  769. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  770. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  771. `
  772. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  773. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  774. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  775. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  776. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  777. if queryTotalLocalStorage != "" {
  778. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  779. }
  780. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  781. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  782. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  783. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  784. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  785. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  786. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  787. resChs := ctx.QueryAll(
  788. queryDataCount,
  789. queryTotalGPU,
  790. queryTotalCPU,
  791. queryTotalRAM,
  792. queryTotalStorage,
  793. )
  794. // Only submit the local storage query if it is valid. Otherwise Prometheus
  795. // will return errors. Always append something to resChs, regardless, to
  796. // maintain indexing.
  797. if queryTotalLocalStorage != "" {
  798. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  799. } else {
  800. resChs = append(resChs, nil)
  801. }
  802. if withBreakdown {
  803. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  804. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  805. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  806. bdResChs := ctx.QueryAll(
  807. queryCPUModePct,
  808. queryRAMSystemPct,
  809. queryRAMUserPct,
  810. )
  811. // Only submit the local storage query if it is valid. Otherwise Prometheus
  812. // will return errors. Always append something to resChs, regardless, to
  813. // maintain indexing.
  814. if queryUsedLocalStorage != "" {
  815. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  816. } else {
  817. bdResChs = append(bdResChs, nil)
  818. }
  819. resChs = append(resChs, bdResChs...)
  820. }
  821. resDataCount, _ := resChs[0].Await()
  822. resTotalGPU, _ := resChs[1].Await()
  823. resTotalCPU, _ := resChs[2].Await()
  824. resTotalRAM, _ := resChs[3].Await()
  825. resTotalStorage, _ := resChs[4].Await()
  826. if ctx.HasErrors() {
  827. return nil, ctx.ErrorCollection()
  828. }
  829. defaultClusterID := env.GetClusterID()
  830. dataMinsByCluster := map[string]float64{}
  831. for _, result := range resDataCount {
  832. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  833. if clusterID == "" {
  834. clusterID = defaultClusterID
  835. }
  836. dataMins := mins
  837. if len(result.Values) > 0 {
  838. dataMins = result.Values[0].Value
  839. } else {
  840. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  841. }
  842. dataMinsByCluster[clusterID] = dataMins
  843. }
  844. // Determine combined discount
  845. discount, customDiscount := 0.0, 0.0
  846. c, err := a.CloudProvider.GetConfig()
  847. if err == nil {
  848. discount, err = ParsePercentString(c.Discount)
  849. if err != nil {
  850. discount = 0.0
  851. }
  852. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  853. if err != nil {
  854. customDiscount = 0.0
  855. }
  856. }
  857. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  858. costData := make(map[string]map[string]float64)
  859. // Helper function to iterate over Prom query results, parsing the raw values into
  860. // the intermediate costData structure.
  861. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  862. for _, result := range results {
  863. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  864. if clusterID == "" {
  865. clusterID = defaultClusterID
  866. }
  867. if _, ok := costData[clusterID]; !ok {
  868. costData[clusterID] = map[string]float64{}
  869. }
  870. if len(result.Values) > 0 {
  871. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  872. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  873. }
  874. }
  875. }
  876. // Apply both sustained use and custom discounts to RAM and CPU
  877. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  878. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  879. // Apply only custom discount to GPU and storage
  880. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  881. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  882. if queryTotalLocalStorage != "" {
  883. resTotalLocalStorage, err := resChs[5].Await()
  884. if err != nil {
  885. return nil, err
  886. }
  887. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  888. }
  889. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  890. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  891. pvUsedCostMap := map[string]float64{}
  892. if withBreakdown {
  893. resCPUModePct, _ := resChs[6].Await()
  894. resRAMSystemPct, _ := resChs[7].Await()
  895. resRAMUserPct, _ := resChs[8].Await()
  896. if ctx.HasErrors() {
  897. return nil, ctx.ErrorCollection()
  898. }
  899. for _, result := range resCPUModePct {
  900. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  901. if clusterID == "" {
  902. clusterID = defaultClusterID
  903. }
  904. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  905. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  906. }
  907. cpuBD := cpuBreakdownMap[clusterID]
  908. mode, err := result.GetString("mode")
  909. if err != nil {
  910. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  911. mode = "other"
  912. }
  913. switch mode {
  914. case "idle":
  915. cpuBD.Idle += result.Values[0].Value
  916. case "system":
  917. cpuBD.System += result.Values[0].Value
  918. case "user":
  919. cpuBD.User += result.Values[0].Value
  920. default:
  921. cpuBD.Other += result.Values[0].Value
  922. }
  923. }
  924. for _, result := range resRAMSystemPct {
  925. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  926. if clusterID == "" {
  927. clusterID = defaultClusterID
  928. }
  929. if _, ok := ramBreakdownMap[clusterID]; !ok {
  930. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  931. }
  932. ramBD := ramBreakdownMap[clusterID]
  933. ramBD.System += result.Values[0].Value
  934. }
  935. for _, result := range resRAMUserPct {
  936. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  937. if clusterID == "" {
  938. clusterID = defaultClusterID
  939. }
  940. if _, ok := ramBreakdownMap[clusterID]; !ok {
  941. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  942. }
  943. ramBD := ramBreakdownMap[clusterID]
  944. ramBD.User += result.Values[0].Value
  945. }
  946. for _, ramBD := range ramBreakdownMap {
  947. remaining := 1.0
  948. remaining -= ramBD.Other
  949. remaining -= ramBD.System
  950. remaining -= ramBD.User
  951. ramBD.Idle = remaining
  952. }
  953. if queryUsedLocalStorage != "" {
  954. resUsedLocalStorage, err := resChs[9].Await()
  955. if err != nil {
  956. return nil, err
  957. }
  958. for _, result := range resUsedLocalStorage {
  959. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  960. if clusterID == "" {
  961. clusterID = defaultClusterID
  962. }
  963. pvUsedCostMap[clusterID] += result.Values[0].Value
  964. }
  965. }
  966. }
  967. if ctx.HasErrors() {
  968. for _, err := range ctx.Errors() {
  969. log.Errorf("ComputeClusterCosts: %s", err)
  970. }
  971. return nil, ctx.ErrorCollection()
  972. }
  973. // Convert intermediate structure to Costs instances
  974. costsByCluster := map[string]*ClusterCosts{}
  975. for id, cd := range costData {
  976. dataMins, ok := dataMinsByCluster[id]
  977. if !ok {
  978. dataMins = mins
  979. log.Warnf("Cluster cost data count not found for cluster %s", id)
  980. }
  981. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  982. if err != nil {
  983. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  984. return nil, err
  985. }
  986. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  987. costs.CPUBreakdown = cpuBD
  988. }
  989. if ramBD, ok := ramBreakdownMap[id]; ok {
  990. costs.RAMBreakdown = ramBD
  991. }
  992. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  993. if pvUC, ok := pvUsedCostMap[id]; ok {
  994. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  995. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  996. }
  997. costs.DataMinutes = dataMins
  998. costsByCluster[id] = costs
  999. }
  1000. return costsByCluster, nil
  1001. }
  1002. type Totals struct {
  1003. TotalCost [][]string `json:"totalcost"`
  1004. CPUCost [][]string `json:"cpucost"`
  1005. MemCost [][]string `json:"memcost"`
  1006. StorageCost [][]string `json:"storageCost"`
  1007. }
  1008. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1009. if len(qrs) == 0 {
  1010. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1011. }
  1012. result := qrs[0]
  1013. totals := [][]string{}
  1014. for _, value := range result.Values {
  1015. d0 := fmt.Sprintf("%f", value.Timestamp)
  1016. d1 := fmt.Sprintf("%f", value.Value)
  1017. toAppend := []string{
  1018. d0,
  1019. d1,
  1020. }
  1021. totals = append(totals, toAppend)
  1022. }
  1023. return totals, nil
  1024. }
  1025. // ClusterCostsOverTime gives the full cluster costs over time
  1026. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1027. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1028. if localStorageQuery != "" {
  1029. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1030. }
  1031. layout := "2006-01-02T15:04:05.000Z"
  1032. start, err := time.Parse(layout, startString)
  1033. if err != nil {
  1034. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1035. return nil, err
  1036. }
  1037. end, err := time.Parse(layout, endString)
  1038. if err != nil {
  1039. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1040. return nil, err
  1041. }
  1042. fmtWindow := timeutil.DurationString(window)
  1043. if fmtWindow == "" {
  1044. err := fmt.Errorf("window value invalid or missing")
  1045. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1046. return nil, err
  1047. }
  1048. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1049. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1050. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1051. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1052. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1053. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1054. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1055. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1056. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1057. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1058. resultClusterCores, err := resChClusterCores.Await()
  1059. if err != nil {
  1060. return nil, err
  1061. }
  1062. resultClusterRAM, err := resChClusterRAM.Await()
  1063. if err != nil {
  1064. return nil, err
  1065. }
  1066. resultStorage, err := resChStorage.Await()
  1067. if err != nil {
  1068. return nil, err
  1069. }
  1070. resultTotal, err := resChTotal.Await()
  1071. if err != nil {
  1072. return nil, err
  1073. }
  1074. coreTotal, err := resultToTotals(resultClusterCores)
  1075. if err != nil {
  1076. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1077. return nil, err
  1078. }
  1079. ramTotal, err := resultToTotals(resultClusterRAM)
  1080. if err != nil {
  1081. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1082. return nil, err
  1083. }
  1084. storageTotal, err := resultToTotals(resultStorage)
  1085. if err != nil {
  1086. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1087. }
  1088. clusterTotal, err := resultToTotals(resultTotal)
  1089. if err != nil {
  1090. // If clusterTotal query failed, it's likely because there are no PVs, which
  1091. // causes the qTotal query to return no data. Instead, query only node costs.
  1092. // If that fails, return an error because something is actually wrong.
  1093. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  1094. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1095. for _, warning := range warnings {
  1096. log.Warnf(warning)
  1097. }
  1098. if err != nil {
  1099. return nil, err
  1100. }
  1101. clusterTotal, err = resultToTotals(resultNodes)
  1102. if err != nil {
  1103. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1104. return nil, err
  1105. }
  1106. }
  1107. return &Totals{
  1108. TotalCost: clusterTotal,
  1109. CPUCost: coreTotal,
  1110. MemCost: ramTotal,
  1111. StorageCost: storageTotal,
  1112. }, nil
  1113. }
  1114. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp cloud.Provider) {
  1115. for _, result := range resActiveMins {
  1116. cluster, err := result.GetString(env.GetPromClusterLabel())
  1117. if err != nil {
  1118. cluster = env.GetClusterID()
  1119. }
  1120. name, err := result.GetString("persistentvolume")
  1121. if err != nil {
  1122. log.Warnf("ClusterDisks: active mins missing pv name")
  1123. continue
  1124. }
  1125. if len(result.Values) == 0 {
  1126. continue
  1127. }
  1128. key := DiskIdentifier{cluster, name}
  1129. if _, ok := diskMap[key]; !ok {
  1130. diskMap[key] = &Disk{
  1131. Cluster: cluster,
  1132. Name: name,
  1133. Breakdown: &ClusterCostsBreakdown{},
  1134. }
  1135. }
  1136. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  1137. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  1138. mins := e.Sub(s).Minutes()
  1139. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  1140. diskMap[key].End = e
  1141. diskMap[key].Start = s
  1142. diskMap[key].Minutes = mins
  1143. }
  1144. for _, result := range resPVSize {
  1145. cluster, err := result.GetString(env.GetPromClusterLabel())
  1146. if err != nil {
  1147. cluster = env.GetClusterID()
  1148. }
  1149. name, err := result.GetString("persistentvolume")
  1150. if err != nil {
  1151. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1152. continue
  1153. }
  1154. // TODO niko/assets storage class
  1155. bytes := result.Values[0].Value
  1156. key := DiskIdentifier{cluster, name}
  1157. if _, ok := diskMap[key]; !ok {
  1158. diskMap[key] = &Disk{
  1159. Cluster: cluster,
  1160. Name: name,
  1161. Breakdown: &ClusterCostsBreakdown{},
  1162. }
  1163. }
  1164. diskMap[key].Bytes = bytes
  1165. }
  1166. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  1167. customPricingConfig, err := cp.GetConfig()
  1168. if err != nil {
  1169. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1170. }
  1171. for _, result := range resPVCost {
  1172. cluster, err := result.GetString(env.GetPromClusterLabel())
  1173. if err != nil {
  1174. cluster = env.GetClusterID()
  1175. }
  1176. name, err := result.GetString("persistentvolume")
  1177. if err != nil {
  1178. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1179. continue
  1180. }
  1181. // TODO niko/assets storage class
  1182. var cost float64
  1183. if customPricingEnabled && customPricingConfig != nil {
  1184. customPVCostStr := customPricingConfig.Storage
  1185. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1186. if err != nil {
  1187. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1188. }
  1189. cost = customPVCost
  1190. } else {
  1191. cost = result.Values[0].Value
  1192. }
  1193. key := DiskIdentifier{cluster, name}
  1194. if _, ok := diskMap[key]; !ok {
  1195. diskMap[key] = &Disk{
  1196. Cluster: cluster,
  1197. Name: name,
  1198. Breakdown: &ClusterCostsBreakdown{},
  1199. }
  1200. }
  1201. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1202. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1203. if providerID != "" {
  1204. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1205. }
  1206. }
  1207. for _, result := range resPVUsedAvg {
  1208. cluster, err := result.GetString(env.GetPromClusterLabel())
  1209. if err != nil {
  1210. cluster = env.GetClusterID()
  1211. }
  1212. claimName, err := result.GetString("persistentvolumeclaim")
  1213. if err != nil {
  1214. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1215. continue
  1216. }
  1217. claimNamespace, err := result.GetString("namespace")
  1218. if err != nil {
  1219. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1220. continue
  1221. }
  1222. var volumeName string
  1223. for _, thatRes := range resPVCInfo {
  1224. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1225. if err != nil {
  1226. thatCluster = env.GetClusterID()
  1227. }
  1228. thatVolumeName, err := thatRes.GetString("volumename")
  1229. if err != nil {
  1230. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1231. continue
  1232. }
  1233. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1234. if err != nil {
  1235. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1236. continue
  1237. }
  1238. thatClaimNamespace, err := thatRes.GetString("namespace")
  1239. if err != nil {
  1240. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1241. continue
  1242. }
  1243. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1244. volumeName = thatVolumeName
  1245. }
  1246. }
  1247. usage := result.Values[0].Value
  1248. key := DiskIdentifier{cluster, volumeName}
  1249. if _, ok := diskMap[key]; !ok {
  1250. diskMap[key] = &Disk{
  1251. Cluster: cluster,
  1252. Name: volumeName,
  1253. Breakdown: &ClusterCostsBreakdown{},
  1254. }
  1255. }
  1256. diskMap[key].BytesUsedAvgPtr = &usage
  1257. }
  1258. for _, result := range resPVUsedMax {
  1259. cluster, err := result.GetString(env.GetPromClusterLabel())
  1260. if err != nil {
  1261. cluster = env.GetClusterID()
  1262. }
  1263. claimName, err := result.GetString("persistentvolumeclaim")
  1264. if err != nil {
  1265. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1266. continue
  1267. }
  1268. claimNamespace, err := result.GetString("namespace")
  1269. if err != nil {
  1270. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1271. continue
  1272. }
  1273. var volumeName string
  1274. for _, thatRes := range resPVCInfo {
  1275. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1276. if err != nil {
  1277. thatCluster = env.GetClusterID()
  1278. }
  1279. thatVolumeName, err := thatRes.GetString("volumename")
  1280. if err != nil {
  1281. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1282. continue
  1283. }
  1284. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1285. if err != nil {
  1286. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1287. continue
  1288. }
  1289. thatClaimNamespace, err := thatRes.GetString("namespace")
  1290. if err != nil {
  1291. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1292. continue
  1293. }
  1294. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1295. volumeName = thatVolumeName
  1296. }
  1297. }
  1298. usage := result.Values[0].Value
  1299. key := DiskIdentifier{cluster, volumeName}
  1300. if _, ok := diskMap[key]; !ok {
  1301. diskMap[key] = &Disk{
  1302. Cluster: cluster,
  1303. Name: volumeName,
  1304. Breakdown: &ClusterCostsBreakdown{},
  1305. }
  1306. }
  1307. diskMap[key].BytesUsedMaxPtr = &usage
  1308. }
  1309. }