cluster.go 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. prometheus "github.com/prometheus/client_golang/api"
  7. "golang.org/x/exp/slices"
  8. "github.com/opencost/opencost/pkg/cloud"
  9. "github.com/opencost/opencost/pkg/cloud/models"
  10. "github.com/opencost/opencost/pkg/env"
  11. "github.com/opencost/opencost/pkg/kubecost"
  12. "github.com/opencost/opencost/pkg/log"
  13. "github.com/opencost/opencost/pkg/prom"
  14. "github.com/opencost/opencost/pkg/util/timeutil"
  15. )
  16. const (
  17. queryClusterCores = `sum(
  18. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  19. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  20. ) by (%s)`
  21. queryClusterRAM = `sum(
  22. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  23. ) by (%s)`
  24. queryStorage = `sum(
  25. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  26. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  27. ) by (%s) %s`
  28. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  29. sum(
  30. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  31. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  32. ) by (%s) %s`
  33. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  34. )
  35. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  36. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  37. // are broken down by cores, memory, and storage.
  38. type ClusterCosts struct {
  39. Start *time.Time `json:"startTime"`
  40. End *time.Time `json:"endTime"`
  41. CPUCumulative float64 `json:"cpuCumulativeCost"`
  42. CPUMonthly float64 `json:"cpuMonthlyCost"`
  43. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  44. GPUCumulative float64 `json:"gpuCumulativeCost"`
  45. GPUMonthly float64 `json:"gpuMonthlyCost"`
  46. RAMCumulative float64 `json:"ramCumulativeCost"`
  47. RAMMonthly float64 `json:"ramMonthlyCost"`
  48. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  49. StorageCumulative float64 `json:"storageCumulativeCost"`
  50. StorageMonthly float64 `json:"storageMonthlyCost"`
  51. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  52. TotalCumulative float64 `json:"totalCumulativeCost"`
  53. TotalMonthly float64 `json:"totalMonthlyCost"`
  54. DataMinutes float64
  55. }
  56. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  57. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  58. type ClusterCostsBreakdown struct {
  59. Idle float64 `json:"idle"`
  60. Other float64 `json:"other"`
  61. System float64 `json:"system"`
  62. User float64 `json:"user"`
  63. }
  64. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  65. // the associated monthly rate data, and returns the Costs.
  66. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  67. start, end := timeutil.ParseTimeRange(window, offset)
  68. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  69. if dataHours == 0 {
  70. dataHours = end.Sub(start).Hours()
  71. }
  72. // Do not allow zero-length windows to prevent divide-by-zero issues
  73. if dataHours == 0 {
  74. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  75. }
  76. cc := &ClusterCosts{
  77. Start: &start,
  78. End: &end,
  79. CPUCumulative: cpu,
  80. GPUCumulative: gpu,
  81. RAMCumulative: ram,
  82. StorageCumulative: storage,
  83. TotalCumulative: cpu + gpu + ram + storage,
  84. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  85. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  86. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  87. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  88. }
  89. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  90. return cc, nil
  91. }
  92. type Disk struct {
  93. Cluster string
  94. Name string
  95. ProviderID string
  96. StorageClass string
  97. VolumeName string
  98. ClaimName string
  99. ClaimNamespace string
  100. Cost float64
  101. Bytes float64
  102. // These two fields may not be available at all times because they rely on
  103. // a new set of metrics that may or may not be available. Thus, they must
  104. // be nilable to represent the complete absence of the data.
  105. //
  106. // In other words, nilability here lets us distinguish between
  107. // "metric is not available" and "metric is available but is 0".
  108. //
  109. // They end in "Ptr" to distinguish from an earlier version in order to
  110. // ensure that all usages are checked for nil.
  111. BytesUsedAvgPtr *float64
  112. BytesUsedMaxPtr *float64
  113. Local bool
  114. Start time.Time
  115. End time.Time
  116. Minutes float64
  117. Breakdown *ClusterCostsBreakdown
  118. }
  119. type DiskIdentifier struct {
  120. Cluster string
  121. Name string
  122. }
  123. func ClusterDisks(client prometheus.Client, provider models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  124. // Query for the duration between start and end
  125. durStr := timeutil.DurationString(end.Sub(start))
  126. if durStr == "" {
  127. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  128. }
  129. // Start from the time "end", querying backwards
  130. t := end
  131. // minsPerResolution determines accuracy and resource use for the following
  132. // queries. Smaller values (higher resolution) result in better accuracy,
  133. // but more expensive queries, and vice-a-versa.
  134. resolution := env.GetETLResolution()
  135. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  136. var minsPerResolution int
  137. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  138. minsPerResolution = 1
  139. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  140. }
  141. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  142. // value, converts it to a cumulative value; i.e.
  143. // [$/hr] * [min/res]*[hr/min] = [$/res]
  144. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  145. // TODO niko/assets how do we not hard-code this price?
  146. costPerGBHr := 0.04 / 730.0
  147. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  148. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  149. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  150. queryActiveMins := fmt.Sprintf(`avg(kube_persistentvolume_capacity_bytes) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  151. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info[%s])) by (%s, persistentvolume, storageclass)`, durStr, env.GetPromClusterLabel())
  152. queryPVUsedAvg := fmt.Sprintf(`avg(avg_over_time(kubelet_volume_stats_used_bytes[%s])) by (%s, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  153. queryPVUsedMax := fmt.Sprintf(`max(max_over_time(kubelet_volume_stats_used_bytes[%s])) by (%s, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  154. queryPVCInfo := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolumeclaim_info[%s])) by (%s, volumename, persistentvolumeclaim, namespace)`, durStr, env.GetPromClusterLabel())
  155. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  156. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  157. queryLocalStorageUsedAvg := fmt.Sprintf(`avg(avg_over_time(container_fs_usage_bytes{device!="tmpfs", id="/"}[%s])) by (instance, %s)`, durStr, env.GetPromClusterLabel())
  158. queryLocalStorageUsedMax := fmt.Sprintf(`max(max_over_time(container_fs_usage_bytes{device!="tmpfs", id="/"}[%s])) by (instance, %s)`, durStr, env.GetPromClusterLabel())
  159. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  160. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  161. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  162. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  163. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  164. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  165. resChPVUsedAvg := ctx.QueryAtTime(queryPVUsedAvg, t)
  166. resChPVUsedMax := ctx.QueryAtTime(queryPVUsedMax, t)
  167. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, t)
  168. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  169. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  170. resChLocalStoreageUsedAvg := ctx.QueryAtTime(queryLocalStorageUsedAvg, t)
  171. resChLocalStoreageUsedMax := ctx.QueryAtTime(queryLocalStorageUsedMax, t)
  172. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  173. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  174. resPVCost, _ := resChPVCost.Await()
  175. resPVSize, _ := resChPVSize.Await()
  176. resActiveMins, _ := resChActiveMins.Await()
  177. resPVStorageClass, _ := resChPVStorageClass.Await()
  178. resPVUsedAvg, _ := resChPVUsedAvg.Await()
  179. resPVUsedMax, _ := resChPVUsedMax.Await()
  180. resPVCInfo, _ := resChPVCInfo.Await()
  181. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  182. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  183. resLocalStorageUsedAvg, _ := resChLocalStoreageUsedAvg.Await()
  184. resLocalStorageUsedMax, _ := resChLocalStoreageUsedMax.Await()
  185. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  186. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  187. if ctx.HasErrors() {
  188. return nil, ctx.ErrorCollection()
  189. }
  190. diskMap := map[DiskIdentifier]*Disk{}
  191. for _, result := range resPVCInfo {
  192. cluster, err := result.GetString(env.GetPromClusterLabel())
  193. if err != nil {
  194. cluster = env.GetClusterID()
  195. }
  196. volumeName, err := result.GetString("volumename")
  197. if err != nil {
  198. log.Debugf("ClusterDisks: pv claim data missing volumename")
  199. continue
  200. }
  201. claimName, err := result.GetString("persistentvolumeclaim")
  202. if err != nil {
  203. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  204. continue
  205. }
  206. claimNamespace, err := result.GetString("namespace")
  207. if err != nil {
  208. log.Debugf("ClusterDisks: pv claim data missing namespace")
  209. continue
  210. }
  211. key := DiskIdentifier{cluster, volumeName}
  212. if _, ok := diskMap[key]; !ok {
  213. diskMap[key] = &Disk{
  214. Cluster: cluster,
  215. Name: volumeName,
  216. Breakdown: &ClusterCostsBreakdown{},
  217. }
  218. }
  219. diskMap[key].VolumeName = volumeName
  220. diskMap[key].ClaimName = claimName
  221. diskMap[key].ClaimNamespace = claimNamespace
  222. }
  223. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo, provider)
  224. for _, result := range resLocalStorageCost {
  225. cluster, err := result.GetString(env.GetPromClusterLabel())
  226. if err != nil {
  227. cluster = env.GetClusterID()
  228. }
  229. name, err := result.GetString("instance")
  230. if err != nil {
  231. log.Warnf("ClusterDisks: local storage data missing instance")
  232. continue
  233. }
  234. cost := result.Values[0].Value
  235. key := DiskIdentifier{cluster, name}
  236. if _, ok := diskMap[key]; !ok {
  237. diskMap[key] = &Disk{
  238. Cluster: cluster,
  239. Name: name,
  240. Breakdown: &ClusterCostsBreakdown{},
  241. Local: true,
  242. }
  243. }
  244. diskMap[key].Cost += cost
  245. //Assigning explicitly the storage class of local storage to local
  246. diskMap[key].StorageClass = kubecost.LocalStorageClass
  247. }
  248. for _, result := range resLocalStorageUsedCost {
  249. cluster, err := result.GetString(env.GetPromClusterLabel())
  250. if err != nil {
  251. cluster = env.GetClusterID()
  252. }
  253. name, err := result.GetString("instance")
  254. if err != nil {
  255. log.Warnf("ClusterDisks: local storage usage data missing instance")
  256. continue
  257. }
  258. cost := result.Values[0].Value
  259. key := DiskIdentifier{cluster, name}
  260. if _, ok := diskMap[key]; !ok {
  261. diskMap[key] = &Disk{
  262. Cluster: cluster,
  263. Name: name,
  264. Breakdown: &ClusterCostsBreakdown{},
  265. Local: true,
  266. }
  267. }
  268. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  269. }
  270. for _, result := range resLocalStorageUsedAvg {
  271. cluster, err := result.GetString(env.GetPromClusterLabel())
  272. if err != nil {
  273. cluster = env.GetClusterID()
  274. }
  275. name, err := result.GetString("instance")
  276. if err != nil {
  277. log.Warnf("ClusterDisks: local storage data missing instance")
  278. continue
  279. }
  280. bytesAvg := result.Values[0].Value
  281. key := DiskIdentifier{cluster, name}
  282. if _, ok := diskMap[key]; !ok {
  283. diskMap[key] = &Disk{
  284. Cluster: cluster,
  285. Name: name,
  286. Breakdown: &ClusterCostsBreakdown{},
  287. Local: true,
  288. }
  289. }
  290. diskMap[key].BytesUsedAvgPtr = &bytesAvg
  291. }
  292. for _, result := range resLocalStorageUsedMax {
  293. cluster, err := result.GetString(env.GetPromClusterLabel())
  294. if err != nil {
  295. cluster = env.GetClusterID()
  296. }
  297. name, err := result.GetString("instance")
  298. if err != nil {
  299. log.Warnf("ClusterDisks: local storage data missing instance")
  300. continue
  301. }
  302. bytesMax := result.Values[0].Value
  303. key := DiskIdentifier{cluster, name}
  304. if _, ok := diskMap[key]; !ok {
  305. diskMap[key] = &Disk{
  306. Cluster: cluster,
  307. Name: name,
  308. Breakdown: &ClusterCostsBreakdown{},
  309. Local: true,
  310. }
  311. }
  312. diskMap[key].BytesUsedMaxPtr = &bytesMax
  313. }
  314. for _, result := range resLocalStorageBytes {
  315. cluster, err := result.GetString(env.GetPromClusterLabel())
  316. if err != nil {
  317. cluster = env.GetClusterID()
  318. }
  319. name, err := result.GetString("instance")
  320. if err != nil {
  321. log.Warnf("ClusterDisks: local storage data missing instance")
  322. continue
  323. }
  324. bytes := result.Values[0].Value
  325. key := DiskIdentifier{cluster, name}
  326. if _, ok := diskMap[key]; !ok {
  327. diskMap[key] = &Disk{
  328. Cluster: cluster,
  329. Name: name,
  330. Breakdown: &ClusterCostsBreakdown{},
  331. Local: true,
  332. }
  333. }
  334. diskMap[key].Bytes = bytes
  335. if bytes/1024/1024/1024 > maxLocalDiskSize {
  336. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  337. delete(diskMap, key)
  338. }
  339. }
  340. for _, result := range resLocalActiveMins {
  341. cluster, err := result.GetString(env.GetPromClusterLabel())
  342. if err != nil {
  343. cluster = env.GetClusterID()
  344. }
  345. name, err := result.GetString("node")
  346. if err != nil {
  347. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  348. continue
  349. }
  350. key := DiskIdentifier{cluster, name}
  351. if _, ok := diskMap[key]; !ok {
  352. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  353. continue
  354. }
  355. if len(result.Values) == 0 {
  356. continue
  357. }
  358. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  359. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  360. mins := e.Sub(s).Minutes()
  361. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  362. diskMap[key].End = e
  363. diskMap[key].Start = s
  364. diskMap[key].Minutes = mins
  365. }
  366. var unTracedDiskLogData []DiskIdentifier
  367. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  368. for _, result := range resPVStorageClass {
  369. cluster, err := result.GetString(env.GetPromClusterLabel())
  370. if err != nil {
  371. cluster = env.GetClusterID()
  372. }
  373. name, _ := result.GetString("persistentvolume")
  374. key := DiskIdentifier{cluster, name}
  375. if _, ok := diskMap[key]; !ok {
  376. if !slices.Contains(unTracedDiskLogData, key) {
  377. unTracedDiskLogData = append(unTracedDiskLogData, key)
  378. }
  379. continue
  380. }
  381. if len(result.Values) == 0 {
  382. continue
  383. }
  384. storageClass, err := result.GetString("storageclass")
  385. if err != nil {
  386. diskMap[key].StorageClass = kubecost.UnknownStorageClass
  387. } else {
  388. diskMap[key].StorageClass = storageClass
  389. }
  390. }
  391. // Logging the unidentified disk information outside the loop
  392. for _, unIdentifiedDisk := range unTracedDiskLogData {
  393. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  394. }
  395. for _, disk := range diskMap {
  396. // Apply all remaining RAM to Idle
  397. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  398. // Set provider Id to the name for reconciliation
  399. if disk.ProviderID == "" {
  400. disk.ProviderID = disk.Name
  401. }
  402. }
  403. return diskMap, nil
  404. }
  405. type Node struct {
  406. Cluster string
  407. Name string
  408. ProviderID string
  409. NodeType string
  410. CPUCost float64
  411. CPUCores float64
  412. GPUCost float64
  413. GPUCount float64
  414. RAMCost float64
  415. RAMBytes float64
  416. Discount float64
  417. Preemptible bool
  418. CPUBreakdown *ClusterCostsBreakdown
  419. RAMBreakdown *ClusterCostsBreakdown
  420. Start time.Time
  421. End time.Time
  422. Minutes float64
  423. Labels map[string]string
  424. CostPerCPUHr float64
  425. CostPerRAMGiBHr float64
  426. CostPerGPUHr float64
  427. }
  428. // GKE lies about the number of cores e2 nodes have. This table
  429. // contains a mapping from node type -> actual CPU cores
  430. // for those cases.
  431. var partialCPUMap = map[string]float64{
  432. "e2-micro": 0.25,
  433. "e2-small": 0.5,
  434. "e2-medium": 1.0,
  435. }
  436. type NodeIdentifier struct {
  437. Cluster string
  438. Name string
  439. ProviderID string
  440. }
  441. type nodeIdentifierNoProviderID struct {
  442. Cluster string
  443. Name string
  444. }
  445. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  446. for k, v := range activeDataMap {
  447. keyNon := nodeIdentifierNoProviderID{
  448. Cluster: k.Cluster,
  449. Name: k.Name,
  450. }
  451. if cost, ok := costMap[k]; ok {
  452. minutes := v.minutes
  453. count := 1.0
  454. if c, ok := resourceCountMap[keyNon]; ok {
  455. count = c
  456. }
  457. costMap[k] = cost * (minutes / 60) * count
  458. }
  459. }
  460. }
  461. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  462. for k, v := range activeDataMap {
  463. if cost, ok := costMap[k]; ok {
  464. minutes := v.minutes
  465. costMap[k] = cost * (minutes / 60)
  466. }
  467. }
  468. }
  469. func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  470. // Query for the duration between start and end
  471. durStr := timeutil.DurationString(end.Sub(start))
  472. if durStr == "" {
  473. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  474. }
  475. // Start from the time "end", querying backwards
  476. t := end
  477. // minsPerResolution determines accuracy and resource use for the following
  478. // queries. Smaller values (higher resolution) result in better accuracy,
  479. // but more expensive queries, and vice-a-versa.
  480. resolution := env.GetETLResolution()
  481. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  482. var minsPerResolution int
  483. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  484. minsPerResolution = 1
  485. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  486. }
  487. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  488. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  489. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  490. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  491. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  492. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  493. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  494. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  495. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  496. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  497. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  498. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  499. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  500. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  501. // Return errors if these fail
  502. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  503. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  504. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  505. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  506. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  507. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  508. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  509. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  510. // Do not return errors if these fail, but log warnings
  511. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  512. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  513. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  514. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  515. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  516. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  517. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  518. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  519. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  520. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  521. resIsSpot, _ := resChIsSpot.Await()
  522. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  523. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  524. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  525. resActiveMins, _ := resChActiveMins.Await()
  526. resLabels, _ := resChLabels.Await()
  527. if optionalCtx.HasErrors() {
  528. for _, err := range optionalCtx.Errors() {
  529. log.Warnf("ClusterNodes: %s", err)
  530. }
  531. }
  532. if requiredCtx.HasErrors() {
  533. for _, err := range requiredCtx.Errors() {
  534. log.Errorf("ClusterNodes: %s", err)
  535. }
  536. return nil, requiredCtx.ErrorCollection()
  537. }
  538. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  539. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  540. preemptibleMap := buildPreemptibleMap(resIsSpot)
  541. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  542. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  543. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  544. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  545. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  546. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  547. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  548. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  549. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  550. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  551. labelsMap := buildLabelsMap(resLabels)
  552. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  553. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  554. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  555. nodeMap := buildNodeMap(
  556. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  557. cpuCoresMap, ramBytesMap, ramUserPctMap,
  558. ramSystemPctMap,
  559. cpuBreakdownMap,
  560. activeDataMap,
  561. preemptibleMap,
  562. labelsMap,
  563. clusterAndNameToType,
  564. resolution,
  565. )
  566. c, err := cp.GetConfig()
  567. if err != nil {
  568. return nil, err
  569. }
  570. discount, err := ParsePercentString(c.Discount)
  571. if err != nil {
  572. return nil, err
  573. }
  574. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  575. if err != nil {
  576. return nil, err
  577. }
  578. for _, node := range nodeMap {
  579. // TODO take GKE Reserved Instances into account
  580. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  581. // Apply all remaining resources to Idle
  582. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  583. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  584. }
  585. return nodeMap, nil
  586. }
  587. type LoadBalancerIdentifier struct {
  588. Cluster string
  589. Namespace string
  590. Name string
  591. }
  592. type LoadBalancer struct {
  593. Cluster string
  594. Namespace string
  595. Name string
  596. ProviderID string
  597. Cost float64
  598. Start time.Time
  599. End time.Time
  600. Minutes float64
  601. }
  602. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  603. // Query for the duration between start and end
  604. durStr := timeutil.DurationString(end.Sub(start))
  605. if durStr == "" {
  606. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  607. }
  608. // Start from the time "end", querying backwards
  609. t := end
  610. // minsPerResolution determines accuracy and resource use for the following
  611. // queries. Smaller values (higher resolution) result in better accuracy,
  612. // but more expensive queries, and vice-a-versa.
  613. resolution := env.GetETLResolution()
  614. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  615. var minsPerResolution int
  616. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  617. minsPerResolution = 1
  618. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  619. }
  620. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  621. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  622. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  623. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  624. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  625. resLBCost, _ := resChLBCost.Await()
  626. resActiveMins, _ := resChActiveMins.Await()
  627. if ctx.HasErrors() {
  628. return nil, ctx.ErrorCollection()
  629. }
  630. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  631. for _, result := range resActiveMins {
  632. cluster, err := result.GetString(env.GetPromClusterLabel())
  633. if err != nil {
  634. cluster = env.GetClusterID()
  635. }
  636. namespace, err := result.GetString("namespace")
  637. if err != nil {
  638. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  639. continue
  640. }
  641. name, err := result.GetString("service_name")
  642. if err != nil {
  643. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  644. continue
  645. }
  646. providerID, err := result.GetString("ingress_ip")
  647. if err != nil {
  648. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  649. providerID = ""
  650. }
  651. key := LoadBalancerIdentifier{
  652. Cluster: cluster,
  653. Namespace: namespace,
  654. Name: name,
  655. }
  656. // Skip if there are no data
  657. if len(result.Values) == 0 {
  658. continue
  659. }
  660. // Add load balancer to the set of load balancers
  661. if _, ok := loadBalancerMap[key]; !ok {
  662. loadBalancerMap[key] = &LoadBalancer{
  663. Cluster: cluster,
  664. Namespace: namespace,
  665. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  666. ProviderID: cloud.ParseLBID(providerID),
  667. }
  668. }
  669. // Append start, end, and minutes. This should come before all other data.
  670. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  671. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  672. loadBalancerMap[key].Start = s
  673. loadBalancerMap[key].End = e
  674. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  675. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  676. // Prevents there from being a duplicate LoadBalancers on the same day
  677. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  678. loadBalancerMap[key].ProviderID = providerID
  679. }
  680. }
  681. for _, result := range resLBCost {
  682. cluster, err := result.GetString(env.GetPromClusterLabel())
  683. if err != nil {
  684. cluster = env.GetClusterID()
  685. }
  686. namespace, err := result.GetString("namespace")
  687. if err != nil {
  688. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  689. continue
  690. }
  691. name, err := result.GetString("service_name")
  692. if err != nil {
  693. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  694. continue
  695. }
  696. key := LoadBalancerIdentifier{
  697. Cluster: cluster,
  698. Namespace: namespace,
  699. Name: name,
  700. }
  701. // Apply cost as price-per-hour * hours
  702. if lb, ok := loadBalancerMap[key]; ok {
  703. lbPricePerHr := result.Values[0].Value
  704. hrs := lb.Minutes / 60.0
  705. lb.Cost += lbPricePerHr * hrs
  706. } else {
  707. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  708. }
  709. }
  710. return loadBalancerMap, nil
  711. }
  712. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  713. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider models.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  714. if window < 10*time.Minute {
  715. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  716. }
  717. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  718. start, end := timeutil.ParseTimeRange(window, offset)
  719. mins := end.Sub(start).Minutes()
  720. windowStr := timeutil.DurationString(window)
  721. // minsPerResolution determines accuracy and resource use for the following
  722. // queries. Smaller values (higher resolution) result in better accuracy,
  723. // but more expensive queries, and vice-a-versa.
  724. resolution := env.GetETLResolution()
  725. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  726. var minsPerResolution int
  727. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  728. minsPerResolution = 1
  729. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  730. }
  731. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  732. // value, converts it to a cumulative value; i.e.
  733. // [$/hr] * [min/res]*[hr/min] = [$/res]
  734. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  735. const fmtQueryDataCount = `
  736. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  737. `
  738. const fmtQueryTotalGPU = `
  739. sum(
  740. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  741. ) by (%s)
  742. `
  743. const fmtQueryTotalCPU = `
  744. sum(
  745. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  746. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  747. ) by (%s)
  748. `
  749. const fmtQueryTotalRAM = `
  750. sum(
  751. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  752. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  753. ) by (%s)
  754. `
  755. const fmtQueryTotalStorage = `
  756. sum(
  757. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  758. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  759. ) by (%s)
  760. `
  761. const fmtQueryCPUModePct = `
  762. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  763. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  764. `
  765. const fmtQueryRAMSystemPct = `
  766. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  767. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  768. `
  769. const fmtQueryRAMUserPct = `
  770. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  771. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  772. `
  773. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  774. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  775. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  776. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  777. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  778. if queryTotalLocalStorage != "" {
  779. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  780. }
  781. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  782. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  783. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  784. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  785. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  786. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  787. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  788. resChs := ctx.QueryAll(
  789. queryDataCount,
  790. queryTotalGPU,
  791. queryTotalCPU,
  792. queryTotalRAM,
  793. queryTotalStorage,
  794. )
  795. // Only submit the local storage query if it is valid. Otherwise Prometheus
  796. // will return errors. Always append something to resChs, regardless, to
  797. // maintain indexing.
  798. if queryTotalLocalStorage != "" {
  799. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  800. } else {
  801. resChs = append(resChs, nil)
  802. }
  803. if withBreakdown {
  804. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  805. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  806. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  807. bdResChs := ctx.QueryAll(
  808. queryCPUModePct,
  809. queryRAMSystemPct,
  810. queryRAMUserPct,
  811. )
  812. // Only submit the local storage query if it is valid. Otherwise Prometheus
  813. // will return errors. Always append something to resChs, regardless, to
  814. // maintain indexing.
  815. if queryUsedLocalStorage != "" {
  816. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  817. } else {
  818. bdResChs = append(bdResChs, nil)
  819. }
  820. resChs = append(resChs, bdResChs...)
  821. }
  822. resDataCount, _ := resChs[0].Await()
  823. resTotalGPU, _ := resChs[1].Await()
  824. resTotalCPU, _ := resChs[2].Await()
  825. resTotalRAM, _ := resChs[3].Await()
  826. resTotalStorage, _ := resChs[4].Await()
  827. if ctx.HasErrors() {
  828. return nil, ctx.ErrorCollection()
  829. }
  830. defaultClusterID := env.GetClusterID()
  831. dataMinsByCluster := map[string]float64{}
  832. for _, result := range resDataCount {
  833. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  834. if clusterID == "" {
  835. clusterID = defaultClusterID
  836. }
  837. dataMins := mins
  838. if len(result.Values) > 0 {
  839. dataMins = result.Values[0].Value
  840. } else {
  841. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  842. }
  843. dataMinsByCluster[clusterID] = dataMins
  844. }
  845. // Determine combined discount
  846. discount, customDiscount := 0.0, 0.0
  847. c, err := a.CloudProvider.GetConfig()
  848. if err == nil {
  849. discount, err = ParsePercentString(c.Discount)
  850. if err != nil {
  851. discount = 0.0
  852. }
  853. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  854. if err != nil {
  855. customDiscount = 0.0
  856. }
  857. }
  858. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  859. costData := make(map[string]map[string]float64)
  860. // Helper function to iterate over Prom query results, parsing the raw values into
  861. // the intermediate costData structure.
  862. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  863. for _, result := range results {
  864. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  865. if clusterID == "" {
  866. clusterID = defaultClusterID
  867. }
  868. if _, ok := costData[clusterID]; !ok {
  869. costData[clusterID] = map[string]float64{}
  870. }
  871. if len(result.Values) > 0 {
  872. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  873. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  874. }
  875. }
  876. }
  877. // Apply both sustained use and custom discounts to RAM and CPU
  878. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  879. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  880. // Apply only custom discount to GPU and storage
  881. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  882. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  883. if queryTotalLocalStorage != "" {
  884. resTotalLocalStorage, err := resChs[5].Await()
  885. if err != nil {
  886. return nil, err
  887. }
  888. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  889. }
  890. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  891. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  892. pvUsedCostMap := map[string]float64{}
  893. if withBreakdown {
  894. resCPUModePct, _ := resChs[6].Await()
  895. resRAMSystemPct, _ := resChs[7].Await()
  896. resRAMUserPct, _ := resChs[8].Await()
  897. if ctx.HasErrors() {
  898. return nil, ctx.ErrorCollection()
  899. }
  900. for _, result := range resCPUModePct {
  901. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  902. if clusterID == "" {
  903. clusterID = defaultClusterID
  904. }
  905. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  906. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  907. }
  908. cpuBD := cpuBreakdownMap[clusterID]
  909. mode, err := result.GetString("mode")
  910. if err != nil {
  911. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  912. mode = "other"
  913. }
  914. switch mode {
  915. case "idle":
  916. cpuBD.Idle += result.Values[0].Value
  917. case "system":
  918. cpuBD.System += result.Values[0].Value
  919. case "user":
  920. cpuBD.User += result.Values[0].Value
  921. default:
  922. cpuBD.Other += result.Values[0].Value
  923. }
  924. }
  925. for _, result := range resRAMSystemPct {
  926. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  927. if clusterID == "" {
  928. clusterID = defaultClusterID
  929. }
  930. if _, ok := ramBreakdownMap[clusterID]; !ok {
  931. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  932. }
  933. ramBD := ramBreakdownMap[clusterID]
  934. ramBD.System += result.Values[0].Value
  935. }
  936. for _, result := range resRAMUserPct {
  937. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  938. if clusterID == "" {
  939. clusterID = defaultClusterID
  940. }
  941. if _, ok := ramBreakdownMap[clusterID]; !ok {
  942. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  943. }
  944. ramBD := ramBreakdownMap[clusterID]
  945. ramBD.User += result.Values[0].Value
  946. }
  947. for _, ramBD := range ramBreakdownMap {
  948. remaining := 1.0
  949. remaining -= ramBD.Other
  950. remaining -= ramBD.System
  951. remaining -= ramBD.User
  952. ramBD.Idle = remaining
  953. }
  954. if queryUsedLocalStorage != "" {
  955. resUsedLocalStorage, err := resChs[9].Await()
  956. if err != nil {
  957. return nil, err
  958. }
  959. for _, result := range resUsedLocalStorage {
  960. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  961. if clusterID == "" {
  962. clusterID = defaultClusterID
  963. }
  964. pvUsedCostMap[clusterID] += result.Values[0].Value
  965. }
  966. }
  967. }
  968. if ctx.HasErrors() {
  969. for _, err := range ctx.Errors() {
  970. log.Errorf("ComputeClusterCosts: %s", err)
  971. }
  972. return nil, ctx.ErrorCollection()
  973. }
  974. // Convert intermediate structure to Costs instances
  975. costsByCluster := map[string]*ClusterCosts{}
  976. for id, cd := range costData {
  977. dataMins, ok := dataMinsByCluster[id]
  978. if !ok {
  979. dataMins = mins
  980. log.Warnf("Cluster cost data count not found for cluster %s", id)
  981. }
  982. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  983. if err != nil {
  984. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  985. return nil, err
  986. }
  987. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  988. costs.CPUBreakdown = cpuBD
  989. }
  990. if ramBD, ok := ramBreakdownMap[id]; ok {
  991. costs.RAMBreakdown = ramBD
  992. }
  993. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  994. if pvUC, ok := pvUsedCostMap[id]; ok {
  995. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  996. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  997. }
  998. costs.DataMinutes = dataMins
  999. costsByCluster[id] = costs
  1000. }
  1001. return costsByCluster, nil
  1002. }
  1003. type Totals struct {
  1004. TotalCost [][]string `json:"totalcost"`
  1005. CPUCost [][]string `json:"cpucost"`
  1006. MemCost [][]string `json:"memcost"`
  1007. StorageCost [][]string `json:"storageCost"`
  1008. }
  1009. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1010. if len(qrs) == 0 {
  1011. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1012. }
  1013. result := qrs[0]
  1014. totals := [][]string{}
  1015. for _, value := range result.Values {
  1016. d0 := fmt.Sprintf("%f", value.Timestamp)
  1017. d1 := fmt.Sprintf("%f", value.Value)
  1018. toAppend := []string{
  1019. d0,
  1020. d1,
  1021. }
  1022. totals = append(totals, toAppend)
  1023. }
  1024. return totals, nil
  1025. }
  1026. // ClusterCostsOverTime gives the full cluster costs over time
  1027. func ClusterCostsOverTime(cli prometheus.Client, provider models.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  1028. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  1029. if localStorageQuery != "" {
  1030. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1031. }
  1032. layout := "2006-01-02T15:04:05.000Z"
  1033. start, err := time.Parse(layout, startString)
  1034. if err != nil {
  1035. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  1036. return nil, err
  1037. }
  1038. end, err := time.Parse(layout, endString)
  1039. if err != nil {
  1040. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  1041. return nil, err
  1042. }
  1043. fmtWindow := timeutil.DurationString(window)
  1044. if fmtWindow == "" {
  1045. err := fmt.Errorf("window value invalid or missing")
  1046. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  1047. return nil, err
  1048. }
  1049. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  1050. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1051. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  1052. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1053. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  1054. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  1055. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1056. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1057. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1058. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1059. resultClusterCores, err := resChClusterCores.Await()
  1060. if err != nil {
  1061. return nil, err
  1062. }
  1063. resultClusterRAM, err := resChClusterRAM.Await()
  1064. if err != nil {
  1065. return nil, err
  1066. }
  1067. resultStorage, err := resChStorage.Await()
  1068. if err != nil {
  1069. return nil, err
  1070. }
  1071. resultTotal, err := resChTotal.Await()
  1072. if err != nil {
  1073. return nil, err
  1074. }
  1075. coreTotal, err := resultToTotals(resultClusterCores)
  1076. if err != nil {
  1077. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1078. return nil, err
  1079. }
  1080. ramTotal, err := resultToTotals(resultClusterRAM)
  1081. if err != nil {
  1082. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1083. return nil, err
  1084. }
  1085. storageTotal, err := resultToTotals(resultStorage)
  1086. if err != nil {
  1087. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1088. }
  1089. clusterTotal, err := resultToTotals(resultTotal)
  1090. if err != nil {
  1091. // If clusterTotal query failed, it's likely because there are no PVs, which
  1092. // causes the qTotal query to return no data. Instead, query only node costs.
  1093. // If that fails, return an error because something is actually wrong.
  1094. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  1095. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1096. for _, warning := range warnings {
  1097. log.Warnf(warning)
  1098. }
  1099. if err != nil {
  1100. return nil, err
  1101. }
  1102. clusterTotal, err = resultToTotals(resultNodes)
  1103. if err != nil {
  1104. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1105. return nil, err
  1106. }
  1107. }
  1108. return &Totals{
  1109. TotalCost: clusterTotal,
  1110. CPUCost: coreTotal,
  1111. MemCost: ramTotal,
  1112. StorageCost: storageTotal,
  1113. }, nil
  1114. }
  1115. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost, resPVUsedAvg, resPVUsedMax, resPVCInfo []*prom.QueryResult, cp models.Provider) {
  1116. for _, result := range resActiveMins {
  1117. cluster, err := result.GetString(env.GetPromClusterLabel())
  1118. if err != nil {
  1119. cluster = env.GetClusterID()
  1120. }
  1121. name, err := result.GetString("persistentvolume")
  1122. if err != nil {
  1123. log.Warnf("ClusterDisks: active mins missing pv name")
  1124. continue
  1125. }
  1126. if len(result.Values) == 0 {
  1127. continue
  1128. }
  1129. key := DiskIdentifier{cluster, name}
  1130. if _, ok := diskMap[key]; !ok {
  1131. diskMap[key] = &Disk{
  1132. Cluster: cluster,
  1133. Name: name,
  1134. Breakdown: &ClusterCostsBreakdown{},
  1135. }
  1136. }
  1137. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  1138. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  1139. mins := e.Sub(s).Minutes()
  1140. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  1141. diskMap[key].End = e
  1142. diskMap[key].Start = s
  1143. diskMap[key].Minutes = mins
  1144. }
  1145. for _, result := range resPVSize {
  1146. cluster, err := result.GetString(env.GetPromClusterLabel())
  1147. if err != nil {
  1148. cluster = env.GetClusterID()
  1149. }
  1150. name, err := result.GetString("persistentvolume")
  1151. if err != nil {
  1152. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1153. continue
  1154. }
  1155. // TODO niko/assets storage class
  1156. bytes := result.Values[0].Value
  1157. key := DiskIdentifier{cluster, name}
  1158. if _, ok := diskMap[key]; !ok {
  1159. diskMap[key] = &Disk{
  1160. Cluster: cluster,
  1161. Name: name,
  1162. Breakdown: &ClusterCostsBreakdown{},
  1163. }
  1164. }
  1165. diskMap[key].Bytes = bytes
  1166. }
  1167. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  1168. customPricingConfig, err := cp.GetConfig()
  1169. if err != nil {
  1170. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1171. }
  1172. for _, result := range resPVCost {
  1173. cluster, err := result.GetString(env.GetPromClusterLabel())
  1174. if err != nil {
  1175. cluster = env.GetClusterID()
  1176. }
  1177. name, err := result.GetString("persistentvolume")
  1178. if err != nil {
  1179. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1180. continue
  1181. }
  1182. // TODO niko/assets storage class
  1183. var cost float64
  1184. if customPricingEnabled && customPricingConfig != nil {
  1185. customPVCostStr := customPricingConfig.Storage
  1186. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1187. if err != nil {
  1188. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1189. }
  1190. cost = customPVCost
  1191. } else {
  1192. cost = result.Values[0].Value
  1193. }
  1194. key := DiskIdentifier{cluster, name}
  1195. if _, ok := diskMap[key]; !ok {
  1196. diskMap[key] = &Disk{
  1197. Cluster: cluster,
  1198. Name: name,
  1199. Breakdown: &ClusterCostsBreakdown{},
  1200. }
  1201. }
  1202. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1203. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1204. if providerID != "" {
  1205. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1206. }
  1207. }
  1208. for _, result := range resPVUsedAvg {
  1209. cluster, err := result.GetString(env.GetPromClusterLabel())
  1210. if err != nil {
  1211. cluster = env.GetClusterID()
  1212. }
  1213. claimName, err := result.GetString("persistentvolumeclaim")
  1214. if err != nil {
  1215. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1216. continue
  1217. }
  1218. claimNamespace, err := result.GetString("namespace")
  1219. if err != nil {
  1220. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1221. continue
  1222. }
  1223. var volumeName string
  1224. for _, thatRes := range resPVCInfo {
  1225. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1226. if err != nil {
  1227. thatCluster = env.GetClusterID()
  1228. }
  1229. thatVolumeName, err := thatRes.GetString("volumename")
  1230. if err != nil {
  1231. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1232. continue
  1233. }
  1234. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1235. if err != nil {
  1236. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1237. continue
  1238. }
  1239. thatClaimNamespace, err := thatRes.GetString("namespace")
  1240. if err != nil {
  1241. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1242. continue
  1243. }
  1244. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1245. volumeName = thatVolumeName
  1246. }
  1247. }
  1248. usage := result.Values[0].Value
  1249. key := DiskIdentifier{cluster, volumeName}
  1250. if _, ok := diskMap[key]; !ok {
  1251. diskMap[key] = &Disk{
  1252. Cluster: cluster,
  1253. Name: volumeName,
  1254. Breakdown: &ClusterCostsBreakdown{},
  1255. }
  1256. }
  1257. diskMap[key].BytesUsedAvgPtr = &usage
  1258. }
  1259. for _, result := range resPVUsedMax {
  1260. cluster, err := result.GetString(env.GetPromClusterLabel())
  1261. if err != nil {
  1262. cluster = env.GetClusterID()
  1263. }
  1264. claimName, err := result.GetString("persistentvolumeclaim")
  1265. if err != nil {
  1266. log.Debugf("ClusterDisks: pv usage data missing persistentvolumeclaim")
  1267. continue
  1268. }
  1269. claimNamespace, err := result.GetString("namespace")
  1270. if err != nil {
  1271. log.Debugf("ClusterDisks: pv usage data missing namespace")
  1272. continue
  1273. }
  1274. var volumeName string
  1275. for _, thatRes := range resPVCInfo {
  1276. thatCluster, err := thatRes.GetString(env.GetPromClusterLabel())
  1277. if err != nil {
  1278. thatCluster = env.GetClusterID()
  1279. }
  1280. thatVolumeName, err := thatRes.GetString("volumename")
  1281. if err != nil {
  1282. log.Debugf("ClusterDisks: pv claim data missing volumename")
  1283. continue
  1284. }
  1285. thatClaimName, err := thatRes.GetString("persistentvolumeclaim")
  1286. if err != nil {
  1287. log.Debugf("ClusterDisks: pv claim data missing persistentvolumeclaim")
  1288. continue
  1289. }
  1290. thatClaimNamespace, err := thatRes.GetString("namespace")
  1291. if err != nil {
  1292. log.Debugf("ClusterDisks: pv claim data missing namespace")
  1293. continue
  1294. }
  1295. if cluster == thatCluster && claimName == thatClaimName && claimNamespace == thatClaimNamespace {
  1296. volumeName = thatVolumeName
  1297. }
  1298. }
  1299. usage := result.Values[0].Value
  1300. key := DiskIdentifier{cluster, volumeName}
  1301. if _, ok := diskMap[key]; !ok {
  1302. diskMap[key] = &Disk{
  1303. Cluster: cluster,
  1304. Name: volumeName,
  1305. Breakdown: &ClusterCostsBreakdown{},
  1306. }
  1307. }
  1308. diskMap[key].BytesUsedMaxPtr = &usage
  1309. }
  1310. }