cluster.go 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/opencost/opencost/pkg/kubecost"
  7. "github.com/opencost/opencost/pkg/util/timeutil"
  8. "github.com/opencost/opencost/pkg/cloud"
  9. "github.com/opencost/opencost/pkg/env"
  10. "github.com/opencost/opencost/pkg/log"
  11. "github.com/opencost/opencost/pkg/prom"
  12. prometheus "github.com/prometheus/client_golang/api"
  13. )
  14. const (
  15. queryClusterCores = `sum(
  16. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  17. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  18. ) by (%s)`
  19. queryClusterRAM = `sum(
  20. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryStorage = `sum(
  23. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  24. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  25. ) by (%s) %s`
  26. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  27. sum(
  28. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  29. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  30. ) by (%s) %s`
  31. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  32. )
  33. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  34. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  35. // are broken down by cores, memory, and storage.
  36. type ClusterCosts struct {
  37. Start *time.Time `json:"startTime"`
  38. End *time.Time `json:"endTime"`
  39. CPUCumulative float64 `json:"cpuCumulativeCost"`
  40. CPUMonthly float64 `json:"cpuMonthlyCost"`
  41. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  42. GPUCumulative float64 `json:"gpuCumulativeCost"`
  43. GPUMonthly float64 `json:"gpuMonthlyCost"`
  44. RAMCumulative float64 `json:"ramCumulativeCost"`
  45. RAMMonthly float64 `json:"ramMonthlyCost"`
  46. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  47. StorageCumulative float64 `json:"storageCumulativeCost"`
  48. StorageMonthly float64 `json:"storageMonthlyCost"`
  49. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  50. TotalCumulative float64 `json:"totalCumulativeCost"`
  51. TotalMonthly float64 `json:"totalMonthlyCost"`
  52. DataMinutes float64
  53. }
  54. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  55. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  56. type ClusterCostsBreakdown struct {
  57. Idle float64 `json:"idle"`
  58. Other float64 `json:"other"`
  59. System float64 `json:"system"`
  60. User float64 `json:"user"`
  61. }
  62. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  63. // the associated monthly rate data, and returns the Costs.
  64. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  65. start, end := timeutil.ParseTimeRange(window, offset)
  66. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  67. if dataHours == 0 {
  68. dataHours = end.Sub(start).Hours()
  69. }
  70. // Do not allow zero-length windows to prevent divide-by-zero issues
  71. if dataHours == 0 {
  72. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  73. }
  74. cc := &ClusterCosts{
  75. Start: &start,
  76. End: &end,
  77. CPUCumulative: cpu,
  78. GPUCumulative: gpu,
  79. RAMCumulative: ram,
  80. StorageCumulative: storage,
  81. TotalCumulative: cpu + gpu + ram + storage,
  82. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  83. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  84. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  85. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  86. }
  87. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  88. return cc, nil
  89. }
  90. type Disk struct {
  91. Cluster string
  92. Name string
  93. ProviderID string
  94. Cost float64
  95. Bytes float64
  96. Local bool
  97. Start time.Time
  98. End time.Time
  99. Minutes float64
  100. Breakdown *ClusterCostsBreakdown
  101. }
  102. type DiskIdentifier struct {
  103. Cluster string
  104. Name string
  105. }
  106. func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  107. // Query for the duration between start and end
  108. durStr := timeutil.DurationString(end.Sub(start))
  109. if durStr == "" {
  110. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  111. }
  112. // Start from the time "end", querying backwards
  113. t := end
  114. // minsPerResolution determines accuracy and resource use for the following
  115. // queries. Smaller values (higher resolution) result in better accuracy,
  116. // but more expensive queries, and vice-a-versa.
  117. resolution := env.GetETLResolution()
  118. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  119. var minsPerResolution int
  120. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  121. minsPerResolution = 1
  122. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  123. }
  124. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  125. // value, converts it to a cumulative value; i.e.
  126. // [$/hr] * [min/res]*[hr/min] = [$/res]
  127. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  128. // TODO niko/assets how do we not hard-code this price?
  129. costPerGBHr := 0.04 / 730.0
  130. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  131. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  132. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  133. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  134. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  135. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  136. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  137. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  138. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  139. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  140. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  141. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  142. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  143. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  144. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  145. resPVCost, _ := resChPVCost.Await()
  146. resPVSize, _ := resChPVSize.Await()
  147. resActiveMins, _ := resChActiveMins.Await()
  148. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  149. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  150. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  151. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  152. if ctx.HasErrors() {
  153. return nil, ctx.ErrorCollection()
  154. }
  155. diskMap := map[DiskIdentifier]*Disk{}
  156. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
  157. for _, result := range resLocalStorageCost {
  158. cluster, err := result.GetString(env.GetPromClusterLabel())
  159. if err != nil {
  160. cluster = env.GetClusterID()
  161. }
  162. name, err := result.GetString("instance")
  163. if err != nil {
  164. log.Warnf("ClusterDisks: local storage data missing instance")
  165. continue
  166. }
  167. cost := result.Values[0].Value
  168. key := DiskIdentifier{cluster, name}
  169. if _, ok := diskMap[key]; !ok {
  170. diskMap[key] = &Disk{
  171. Cluster: cluster,
  172. Name: name,
  173. Breakdown: &ClusterCostsBreakdown{},
  174. Local: true,
  175. }
  176. }
  177. diskMap[key].Cost += cost
  178. }
  179. for _, result := range resLocalStorageUsedCost {
  180. cluster, err := result.GetString(env.GetPromClusterLabel())
  181. if err != nil {
  182. cluster = env.GetClusterID()
  183. }
  184. name, err := result.GetString("instance")
  185. if err != nil {
  186. log.Warnf("ClusterDisks: local storage usage data missing instance")
  187. continue
  188. }
  189. cost := result.Values[0].Value
  190. key := DiskIdentifier{cluster, name}
  191. if _, ok := diskMap[key]; !ok {
  192. diskMap[key] = &Disk{
  193. Cluster: cluster,
  194. Name: name,
  195. Breakdown: &ClusterCostsBreakdown{},
  196. Local: true,
  197. }
  198. }
  199. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  200. }
  201. for _, result := range resLocalStorageBytes {
  202. cluster, err := result.GetString(env.GetPromClusterLabel())
  203. if err != nil {
  204. cluster = env.GetClusterID()
  205. }
  206. name, err := result.GetString("instance")
  207. if err != nil {
  208. log.Warnf("ClusterDisks: local storage data missing instance")
  209. continue
  210. }
  211. bytes := result.Values[0].Value
  212. key := DiskIdentifier{cluster, name}
  213. if _, ok := diskMap[key]; !ok {
  214. diskMap[key] = &Disk{
  215. Cluster: cluster,
  216. Name: name,
  217. Breakdown: &ClusterCostsBreakdown{},
  218. Local: true,
  219. }
  220. }
  221. diskMap[key].Bytes = bytes
  222. if bytes/1024/1024/1024 > maxLocalDiskSize {
  223. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  224. delete(diskMap, key)
  225. }
  226. }
  227. for _, result := range resLocalActiveMins {
  228. cluster, err := result.GetString(env.GetPromClusterLabel())
  229. if err != nil {
  230. cluster = env.GetClusterID()
  231. }
  232. name, err := result.GetString("node")
  233. if err != nil {
  234. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  235. continue
  236. }
  237. key := DiskIdentifier{cluster, name}
  238. if _, ok := diskMap[key]; !ok {
  239. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  240. continue
  241. }
  242. if len(result.Values) == 0 {
  243. continue
  244. }
  245. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  246. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  247. mins := e.Sub(s).Minutes()
  248. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  249. diskMap[key].End = e
  250. diskMap[key].Start = s
  251. diskMap[key].Minutes = mins
  252. }
  253. for _, disk := range diskMap {
  254. // Apply all remaining RAM to Idle
  255. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  256. // Set provider Id to the name for reconciliation
  257. if disk.ProviderID == "" {
  258. disk.ProviderID = disk.Name
  259. }
  260. }
  261. return diskMap, nil
  262. }
  263. type Node struct {
  264. Cluster string
  265. Name string
  266. ProviderID string
  267. NodeType string
  268. CPUCost float64
  269. CPUCores float64
  270. GPUCost float64
  271. GPUCount float64
  272. RAMCost float64
  273. RAMBytes float64
  274. Discount float64
  275. Preemptible bool
  276. CPUBreakdown *ClusterCostsBreakdown
  277. RAMBreakdown *ClusterCostsBreakdown
  278. Start time.Time
  279. End time.Time
  280. Minutes float64
  281. Labels map[string]string
  282. CostPerCPUHr float64
  283. CostPerRAMGiBHr float64
  284. CostPerGPUHr float64
  285. }
  286. // GKE lies about the number of cores e2 nodes have. This table
  287. // contains a mapping from node type -> actual CPU cores
  288. // for those cases.
  289. var partialCPUMap = map[string]float64{
  290. "e2-micro": 0.25,
  291. "e2-small": 0.5,
  292. "e2-medium": 1.0,
  293. }
  294. type NodeIdentifier struct {
  295. Cluster string
  296. Name string
  297. ProviderID string
  298. }
  299. type nodeIdentifierNoProviderID struct {
  300. Cluster string
  301. Name string
  302. }
  303. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  304. for k, v := range activeDataMap {
  305. keyNon := nodeIdentifierNoProviderID{
  306. Cluster: k.Cluster,
  307. Name: k.Name,
  308. }
  309. if cost, ok := costMap[k]; ok {
  310. minutes := v.minutes
  311. count := 1.0
  312. if c, ok := resourceCountMap[keyNon]; ok {
  313. count = c
  314. }
  315. costMap[k] = cost * (minutes / 60) * count
  316. }
  317. }
  318. }
  319. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  320. for k, v := range activeDataMap {
  321. if cost, ok := costMap[k]; ok {
  322. minutes := v.minutes
  323. costMap[k] = cost * (minutes / 60)
  324. }
  325. }
  326. }
  327. func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  328. // Query for the duration between start and end
  329. durStr := timeutil.DurationString(end.Sub(start))
  330. if durStr == "" {
  331. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  332. }
  333. // Start from the time "end", querying backwards
  334. t := end
  335. // minsPerResolution determines accuracy and resource use for the following
  336. // queries. Smaller values (higher resolution) result in better accuracy,
  337. // but more expensive queries, and vice-a-versa.
  338. resolution := env.GetETLResolution()
  339. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  340. var minsPerResolution int
  341. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  342. minsPerResolution = 1
  343. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  344. }
  345. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  346. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  347. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  348. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  349. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  350. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  351. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  352. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  353. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  354. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  355. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  356. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  357. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  358. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  359. // Return errors if these fail
  360. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  361. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  362. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  363. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  364. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  365. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  366. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  367. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  368. // Do not return errors if these fail, but log warnings
  369. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  370. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  371. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  372. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  373. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  374. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  375. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  376. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  377. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  378. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  379. resIsSpot, _ := resChIsSpot.Await()
  380. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  381. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  382. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  383. resActiveMins, _ := resChActiveMins.Await()
  384. resLabels, _ := resChLabels.Await()
  385. if optionalCtx.HasErrors() {
  386. for _, err := range optionalCtx.Errors() {
  387. log.Warnf("ClusterNodes: %s", err)
  388. }
  389. }
  390. if requiredCtx.HasErrors() {
  391. for _, err := range requiredCtx.Errors() {
  392. log.Errorf("ClusterNodes: %s", err)
  393. }
  394. return nil, requiredCtx.ErrorCollection()
  395. }
  396. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  397. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  398. preemptibleMap := buildPreemptibleMap(resIsSpot)
  399. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  400. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  401. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  402. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  403. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  404. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  405. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  406. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  407. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  408. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  409. labelsMap := buildLabelsMap(resLabels)
  410. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  411. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  412. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  413. nodeMap := buildNodeMap(
  414. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  415. cpuCoresMap, ramBytesMap, ramUserPctMap,
  416. ramSystemPctMap,
  417. cpuBreakdownMap,
  418. activeDataMap,
  419. preemptibleMap,
  420. labelsMap,
  421. clusterAndNameToType,
  422. resolution,
  423. )
  424. c, err := cp.GetConfig()
  425. if err != nil {
  426. return nil, err
  427. }
  428. discount, err := ParsePercentString(c.Discount)
  429. if err != nil {
  430. return nil, err
  431. }
  432. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  433. if err != nil {
  434. return nil, err
  435. }
  436. for _, node := range nodeMap {
  437. // TODO take GKE Reserved Instances into account
  438. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  439. // Apply all remaining resources to Idle
  440. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  441. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  442. }
  443. return nodeMap, nil
  444. }
  445. type LoadBalancerIdentifier struct {
  446. Cluster string
  447. Namespace string
  448. Name string
  449. }
  450. type LoadBalancer struct {
  451. Cluster string
  452. Namespace string
  453. Name string
  454. ProviderID string
  455. Cost float64
  456. Start time.Time
  457. End time.Time
  458. Minutes float64
  459. }
  460. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  461. // Query for the duration between start and end
  462. durStr := timeutil.DurationString(end.Sub(start))
  463. if durStr == "" {
  464. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  465. }
  466. // Start from the time "end", querying backwards
  467. t := end
  468. // minsPerResolution determines accuracy and resource use for the following
  469. // queries. Smaller values (higher resolution) result in better accuracy,
  470. // but more expensive queries, and vice-a-versa.
  471. resolution := env.GetETLResolution()
  472. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  473. var minsPerResolution int
  474. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  475. minsPerResolution = 1
  476. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  477. }
  478. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  479. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  480. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  481. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  482. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  483. resLBCost, _ := resChLBCost.Await()
  484. resActiveMins, _ := resChActiveMins.Await()
  485. if ctx.HasErrors() {
  486. return nil, ctx.ErrorCollection()
  487. }
  488. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  489. for _, result := range resActiveMins {
  490. cluster, err := result.GetString(env.GetPromClusterLabel())
  491. if err != nil {
  492. cluster = env.GetClusterID()
  493. }
  494. namespace, err := result.GetString("namespace")
  495. if err != nil {
  496. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  497. continue
  498. }
  499. name, err := result.GetString("service_name")
  500. if err != nil {
  501. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  502. continue
  503. }
  504. providerID, err := result.GetString("ingress_ip")
  505. if err != nil {
  506. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  507. providerID = ""
  508. }
  509. key := LoadBalancerIdentifier{
  510. Cluster: cluster,
  511. Namespace: namespace,
  512. Name: name,
  513. }
  514. // Skip if there are no data
  515. if len(result.Values) == 0 {
  516. continue
  517. }
  518. // Add load balancer to the set of load balancers
  519. if _, ok := loadBalancerMap[key]; !ok {
  520. loadBalancerMap[key] = &LoadBalancer{
  521. Cluster: cluster,
  522. Namespace: namespace,
  523. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  524. ProviderID: cloud.ParseLBID(providerID),
  525. }
  526. }
  527. // Append start, end, and minutes. This should come before all other data.
  528. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  529. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  530. loadBalancerMap[key].Start = s
  531. loadBalancerMap[key].End = e
  532. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  533. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  534. // Prevents there from being a duplicate LoadBalancers on the same day
  535. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  536. loadBalancerMap[key].ProviderID = providerID
  537. }
  538. }
  539. for _, result := range resLBCost {
  540. cluster, err := result.GetString(env.GetPromClusterLabel())
  541. if err != nil {
  542. cluster = env.GetClusterID()
  543. }
  544. namespace, err := result.GetString("namespace")
  545. if err != nil {
  546. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  547. continue
  548. }
  549. name, err := result.GetString("service_name")
  550. if err != nil {
  551. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  552. continue
  553. }
  554. key := LoadBalancerIdentifier{
  555. Cluster: cluster,
  556. Namespace: namespace,
  557. Name: name,
  558. }
  559. // Apply cost as price-per-hour * hours
  560. if lb, ok := loadBalancerMap[key]; ok {
  561. lbPricePerHr := result.Values[0].Value
  562. hrs := lb.Minutes / 60.0
  563. lb.Cost += lbPricePerHr * hrs
  564. } else {
  565. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  566. }
  567. }
  568. return loadBalancerMap, nil
  569. }
  570. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  571. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  572. if window < 10*time.Minute {
  573. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  574. }
  575. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  576. start, end := timeutil.ParseTimeRange(window, offset)
  577. mins := end.Sub(start).Minutes()
  578. windowStr := timeutil.DurationString(window)
  579. // minsPerResolution determines accuracy and resource use for the following
  580. // queries. Smaller values (higher resolution) result in better accuracy,
  581. // but more expensive queries, and vice-a-versa.
  582. resolution := env.GetETLResolution()
  583. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  584. var minsPerResolution int
  585. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  586. minsPerResolution = 1
  587. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  588. }
  589. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  590. // value, converts it to a cumulative value; i.e.
  591. // [$/hr] * [min/res]*[hr/min] = [$/res]
  592. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  593. const fmtQueryDataCount = `
  594. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  595. `
  596. const fmtQueryTotalGPU = `
  597. sum(
  598. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  599. ) by (%s)
  600. `
  601. const fmtQueryTotalCPU = `
  602. sum(
  603. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  604. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  605. ) by (%s)
  606. `
  607. const fmtQueryTotalRAM = `
  608. sum(
  609. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  610. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  611. ) by (%s)
  612. `
  613. const fmtQueryTotalStorage = `
  614. sum(
  615. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  616. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  617. ) by (%s)
  618. `
  619. const fmtQueryCPUModePct = `
  620. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  621. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  622. `
  623. const fmtQueryRAMSystemPct = `
  624. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  625. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  626. `
  627. const fmtQueryRAMUserPct = `
  628. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  629. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  630. `
  631. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  632. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  633. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  634. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  635. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  636. if queryTotalLocalStorage != "" {
  637. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  638. }
  639. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  640. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  641. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  642. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  643. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  644. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  645. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  646. resChs := ctx.QueryAll(
  647. queryDataCount,
  648. queryTotalGPU,
  649. queryTotalCPU,
  650. queryTotalRAM,
  651. queryTotalStorage,
  652. )
  653. // Only submit the local storage query if it is valid. Otherwise Prometheus
  654. // will return errors. Always append something to resChs, regardless, to
  655. // maintain indexing.
  656. if queryTotalLocalStorage != "" {
  657. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  658. } else {
  659. resChs = append(resChs, nil)
  660. }
  661. if withBreakdown {
  662. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  663. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  664. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  665. bdResChs := ctx.QueryAll(
  666. queryCPUModePct,
  667. queryRAMSystemPct,
  668. queryRAMUserPct,
  669. )
  670. // Only submit the local storage query if it is valid. Otherwise Prometheus
  671. // will return errors. Always append something to resChs, regardless, to
  672. // maintain indexing.
  673. if queryUsedLocalStorage != "" {
  674. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  675. } else {
  676. bdResChs = append(bdResChs, nil)
  677. }
  678. resChs = append(resChs, bdResChs...)
  679. }
  680. resDataCount, _ := resChs[0].Await()
  681. resTotalGPU, _ := resChs[1].Await()
  682. resTotalCPU, _ := resChs[2].Await()
  683. resTotalRAM, _ := resChs[3].Await()
  684. resTotalStorage, _ := resChs[4].Await()
  685. if ctx.HasErrors() {
  686. return nil, ctx.ErrorCollection()
  687. }
  688. defaultClusterID := env.GetClusterID()
  689. dataMinsByCluster := map[string]float64{}
  690. for _, result := range resDataCount {
  691. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  692. if clusterID == "" {
  693. clusterID = defaultClusterID
  694. }
  695. dataMins := mins
  696. if len(result.Values) > 0 {
  697. dataMins = result.Values[0].Value
  698. } else {
  699. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  700. }
  701. dataMinsByCluster[clusterID] = dataMins
  702. }
  703. // Determine combined discount
  704. discount, customDiscount := 0.0, 0.0
  705. c, err := a.CloudProvider.GetConfig()
  706. if err == nil {
  707. discount, err = ParsePercentString(c.Discount)
  708. if err != nil {
  709. discount = 0.0
  710. }
  711. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  712. if err != nil {
  713. customDiscount = 0.0
  714. }
  715. }
  716. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  717. costData := make(map[string]map[string]float64)
  718. // Helper function to iterate over Prom query results, parsing the raw values into
  719. // the intermediate costData structure.
  720. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  721. for _, result := range results {
  722. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  723. if clusterID == "" {
  724. clusterID = defaultClusterID
  725. }
  726. if _, ok := costData[clusterID]; !ok {
  727. costData[clusterID] = map[string]float64{}
  728. }
  729. if len(result.Values) > 0 {
  730. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  731. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  732. }
  733. }
  734. }
  735. // Apply both sustained use and custom discounts to RAM and CPU
  736. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  737. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  738. // Apply only custom discount to GPU and storage
  739. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  740. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  741. if queryTotalLocalStorage != "" {
  742. resTotalLocalStorage, err := resChs[5].Await()
  743. if err != nil {
  744. return nil, err
  745. }
  746. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  747. }
  748. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  749. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  750. pvUsedCostMap := map[string]float64{}
  751. if withBreakdown {
  752. resCPUModePct, _ := resChs[6].Await()
  753. resRAMSystemPct, _ := resChs[7].Await()
  754. resRAMUserPct, _ := resChs[8].Await()
  755. if ctx.HasErrors() {
  756. return nil, ctx.ErrorCollection()
  757. }
  758. for _, result := range resCPUModePct {
  759. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  760. if clusterID == "" {
  761. clusterID = defaultClusterID
  762. }
  763. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  764. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  765. }
  766. cpuBD := cpuBreakdownMap[clusterID]
  767. mode, err := result.GetString("mode")
  768. if err != nil {
  769. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  770. mode = "other"
  771. }
  772. switch mode {
  773. case "idle":
  774. cpuBD.Idle += result.Values[0].Value
  775. case "system":
  776. cpuBD.System += result.Values[0].Value
  777. case "user":
  778. cpuBD.User += result.Values[0].Value
  779. default:
  780. cpuBD.Other += result.Values[0].Value
  781. }
  782. }
  783. for _, result := range resRAMSystemPct {
  784. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  785. if clusterID == "" {
  786. clusterID = defaultClusterID
  787. }
  788. if _, ok := ramBreakdownMap[clusterID]; !ok {
  789. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  790. }
  791. ramBD := ramBreakdownMap[clusterID]
  792. ramBD.System += result.Values[0].Value
  793. }
  794. for _, result := range resRAMUserPct {
  795. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  796. if clusterID == "" {
  797. clusterID = defaultClusterID
  798. }
  799. if _, ok := ramBreakdownMap[clusterID]; !ok {
  800. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  801. }
  802. ramBD := ramBreakdownMap[clusterID]
  803. ramBD.User += result.Values[0].Value
  804. }
  805. for _, ramBD := range ramBreakdownMap {
  806. remaining := 1.0
  807. remaining -= ramBD.Other
  808. remaining -= ramBD.System
  809. remaining -= ramBD.User
  810. ramBD.Idle = remaining
  811. }
  812. if queryUsedLocalStorage != "" {
  813. resUsedLocalStorage, err := resChs[9].Await()
  814. if err != nil {
  815. return nil, err
  816. }
  817. for _, result := range resUsedLocalStorage {
  818. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  819. if clusterID == "" {
  820. clusterID = defaultClusterID
  821. }
  822. pvUsedCostMap[clusterID] += result.Values[0].Value
  823. }
  824. }
  825. }
  826. if ctx.HasErrors() {
  827. for _, err := range ctx.Errors() {
  828. log.Errorf("ComputeClusterCosts: %s", err)
  829. }
  830. return nil, ctx.ErrorCollection()
  831. }
  832. // Convert intermediate structure to Costs instances
  833. costsByCluster := map[string]*ClusterCosts{}
  834. for id, cd := range costData {
  835. dataMins, ok := dataMinsByCluster[id]
  836. if !ok {
  837. dataMins = mins
  838. log.Warnf("Cluster cost data count not found for cluster %s", id)
  839. }
  840. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  841. if err != nil {
  842. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  843. return nil, err
  844. }
  845. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  846. costs.CPUBreakdown = cpuBD
  847. }
  848. if ramBD, ok := ramBreakdownMap[id]; ok {
  849. costs.RAMBreakdown = ramBD
  850. }
  851. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  852. if pvUC, ok := pvUsedCostMap[id]; ok {
  853. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  854. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  855. }
  856. costs.DataMinutes = dataMins
  857. costsByCluster[id] = costs
  858. }
  859. return costsByCluster, nil
  860. }
  861. type Totals struct {
  862. TotalCost [][]string `json:"totalcost"`
  863. CPUCost [][]string `json:"cpucost"`
  864. MemCost [][]string `json:"memcost"`
  865. StorageCost [][]string `json:"storageCost"`
  866. }
  867. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  868. if len(qrs) == 0 {
  869. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  870. }
  871. result := qrs[0]
  872. totals := [][]string{}
  873. for _, value := range result.Values {
  874. d0 := fmt.Sprintf("%f", value.Timestamp)
  875. d1 := fmt.Sprintf("%f", value.Value)
  876. toAppend := []string{
  877. d0,
  878. d1,
  879. }
  880. totals = append(totals, toAppend)
  881. }
  882. return totals, nil
  883. }
  884. // ClusterCostsOverTime gives the full cluster costs over time
  885. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  886. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  887. if localStorageQuery != "" {
  888. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  889. }
  890. layout := "2006-01-02T15:04:05.000Z"
  891. start, err := time.Parse(layout, startString)
  892. if err != nil {
  893. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  894. return nil, err
  895. }
  896. end, err := time.Parse(layout, endString)
  897. if err != nil {
  898. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  899. return nil, err
  900. }
  901. fmtWindow := timeutil.DurationString(window)
  902. if fmtWindow == "" {
  903. err := fmt.Errorf("window value invalid or missing")
  904. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  905. return nil, err
  906. }
  907. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  908. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  909. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  910. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  911. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  912. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  913. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  914. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  915. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  916. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  917. resultClusterCores, err := resChClusterCores.Await()
  918. if err != nil {
  919. return nil, err
  920. }
  921. resultClusterRAM, err := resChClusterRAM.Await()
  922. if err != nil {
  923. return nil, err
  924. }
  925. resultStorage, err := resChStorage.Await()
  926. if err != nil {
  927. return nil, err
  928. }
  929. resultTotal, err := resChTotal.Await()
  930. if err != nil {
  931. return nil, err
  932. }
  933. coreTotal, err := resultToTotals(resultClusterCores)
  934. if err != nil {
  935. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  936. return nil, err
  937. }
  938. ramTotal, err := resultToTotals(resultClusterRAM)
  939. if err != nil {
  940. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  941. return nil, err
  942. }
  943. storageTotal, err := resultToTotals(resultStorage)
  944. if err != nil {
  945. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  946. }
  947. clusterTotal, err := resultToTotals(resultTotal)
  948. if err != nil {
  949. // If clusterTotal query failed, it's likely because there are no PVs, which
  950. // causes the qTotal query to return no data. Instead, query only node costs.
  951. // If that fails, return an error because something is actually wrong.
  952. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  953. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  954. for _, warning := range warnings {
  955. log.Warnf(warning)
  956. }
  957. if err != nil {
  958. return nil, err
  959. }
  960. clusterTotal, err = resultToTotals(resultNodes)
  961. if err != nil {
  962. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  963. return nil, err
  964. }
  965. }
  966. return &Totals{
  967. TotalCost: clusterTotal,
  968. CPUCost: coreTotal,
  969. MemCost: ramTotal,
  970. StorageCost: storageTotal,
  971. }, nil
  972. }
  973. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
  974. for _, result := range resActiveMins {
  975. cluster, err := result.GetString(env.GetPromClusterLabel())
  976. if err != nil {
  977. cluster = env.GetClusterID()
  978. }
  979. name, err := result.GetString("persistentvolume")
  980. if err != nil {
  981. log.Warnf("ClusterDisks: active mins missing pv name")
  982. continue
  983. }
  984. if len(result.Values) == 0 {
  985. continue
  986. }
  987. key := DiskIdentifier{cluster, name}
  988. if _, ok := diskMap[key]; !ok {
  989. diskMap[key] = &Disk{
  990. Cluster: cluster,
  991. Name: name,
  992. Breakdown: &ClusterCostsBreakdown{},
  993. }
  994. }
  995. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  996. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  997. mins := e.Sub(s).Minutes()
  998. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  999. diskMap[key].End = e
  1000. diskMap[key].Start = s
  1001. diskMap[key].Minutes = mins
  1002. }
  1003. for _, result := range resPVSize {
  1004. cluster, err := result.GetString(env.GetPromClusterLabel())
  1005. if err != nil {
  1006. cluster = env.GetClusterID()
  1007. }
  1008. name, err := result.GetString("persistentvolume")
  1009. if err != nil {
  1010. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1011. continue
  1012. }
  1013. // TODO niko/assets storage class
  1014. bytes := result.Values[0].Value
  1015. key := DiskIdentifier{cluster, name}
  1016. if _, ok := diskMap[key]; !ok {
  1017. diskMap[key] = &Disk{
  1018. Cluster: cluster,
  1019. Name: name,
  1020. Breakdown: &ClusterCostsBreakdown{},
  1021. }
  1022. }
  1023. diskMap[key].Bytes = bytes
  1024. }
  1025. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  1026. customPricingConfig, err := cp.GetConfig()
  1027. if err != nil {
  1028. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1029. }
  1030. for _, result := range resPVCost {
  1031. cluster, err := result.GetString(env.GetPromClusterLabel())
  1032. if err != nil {
  1033. cluster = env.GetClusterID()
  1034. }
  1035. name, err := result.GetString("persistentvolume")
  1036. if err != nil {
  1037. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1038. continue
  1039. }
  1040. // TODO niko/assets storage class
  1041. var cost float64
  1042. if customPricingEnabled && customPricingConfig != nil {
  1043. customPVCostStr := customPricingConfig.Storage
  1044. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1045. if err != nil {
  1046. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1047. }
  1048. cost = customPVCost
  1049. } else {
  1050. cost = result.Values[0].Value
  1051. }
  1052. key := DiskIdentifier{cluster, name}
  1053. if _, ok := diskMap[key]; !ok {
  1054. diskMap[key] = &Disk{
  1055. Cluster: cluster,
  1056. Name: name,
  1057. Breakdown: &ClusterCostsBreakdown{},
  1058. }
  1059. }
  1060. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1061. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1062. if providerID != "" {
  1063. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1064. }
  1065. }
  1066. }