cluster.go 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/cloud"
  6. "github.com/kubecost/cost-model/pkg/env"
  7. "github.com/kubecost/cost-model/pkg/log"
  8. "github.com/kubecost/cost-model/pkg/prom"
  9. "github.com/kubecost/cost-model/pkg/util"
  10. prometheus "github.com/prometheus/client_golang/api"
  11. "k8s.io/klog"
  12. )
  13. const (
  14. queryClusterCores = `sum(
  15. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  16. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryClusterRAM = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  20. ) by (cluster_id)`
  21. queryStorage = `sum(
  22. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  23. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  24. ) by (cluster_id) %s`
  25. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  26. sum(
  27. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  29. ) by (cluster_id) %s`
  30. queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
  31. )
  32. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  33. // are broken down by cores, memory, and storage.
  34. type ClusterCosts struct {
  35. Start *time.Time `json:"startTime"`
  36. End *time.Time `json:"endTime"`
  37. CPUCumulative float64 `json:"cpuCumulativeCost"`
  38. CPUMonthly float64 `json:"cpuMonthlyCost"`
  39. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  40. GPUCumulative float64 `json:"gpuCumulativeCost"`
  41. GPUMonthly float64 `json:"gpuMonthlyCost"`
  42. RAMCumulative float64 `json:"ramCumulativeCost"`
  43. RAMMonthly float64 `json:"ramMonthlyCost"`
  44. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  45. StorageCumulative float64 `json:"storageCumulativeCost"`
  46. StorageMonthly float64 `json:"storageMonthlyCost"`
  47. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  48. TotalCumulative float64 `json:"totalCumulativeCost"`
  49. TotalMonthly float64 `json:"totalMonthlyCost"`
  50. DataMinutes float64
  51. }
  52. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  53. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  54. type ClusterCostsBreakdown struct {
  55. Idle float64 `json:"idle"`
  56. Other float64 `json:"other"`
  57. System float64 `json:"system"`
  58. User float64 `json:"user"`
  59. }
  60. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  61. // the associated monthly rate data, and returns the Costs.
  62. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
  63. start, end, err := util.ParseTimeRange(window, offset)
  64. if err != nil {
  65. return nil, err
  66. }
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(*start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: start,
  77. End: end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (util.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (util.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (util.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (util.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. Cost float64
  96. Bytes float64
  97. Local bool
  98. Start time.Time
  99. End time.Time
  100. Minutes float64
  101. Breakdown *ClusterCostsBreakdown
  102. }
  103. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, []error) {
  104. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  105. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  106. if offset < time.Minute {
  107. offsetStr = ""
  108. }
  109. // minsPerResolution determines accuracy and resource use for the following
  110. // queries. Smaller values (higher resolution) result in better accuracy,
  111. // but more expensive queries, and vice-a-versa.
  112. minsPerResolution := 1
  113. resolution := time.Duration(minsPerResolution) * time.Minute
  114. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  115. // value, converts it to a cumulative value; i.e.
  116. // [$/hr] * [min/res]*[hr/min] = [$/res]
  117. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  118. // TODO niko/assets how do we not hard-code this price?
  119. costPerGBHr := 0.04 / 730.0
  120. ctx := prom.NewContext(client)
  121. queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * avg(pv_hourly_cost) by (cluster_id, persistentvolume))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  122. queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  123. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  124. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  125. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  126. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  127. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  128. resChPVCost := ctx.Query(queryPVCost)
  129. resChPVSize := ctx.Query(queryPVSize)
  130. resChActiveMins := ctx.Query(queryActiveMins)
  131. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  132. resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
  133. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  134. resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
  135. resPVCost, _ := resChPVCost.Await()
  136. resPVSize, _ := resChPVSize.Await()
  137. resActiveMins, _ := resChActiveMins.Await()
  138. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  139. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  140. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  141. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  142. if ctx.ErrorCollector.IsError() {
  143. return nil, ctx.Errors()
  144. }
  145. diskMap := map[string]*Disk{}
  146. for _, result := range resPVCost {
  147. cluster, err := result.GetString("cluster_id")
  148. if err != nil {
  149. cluster = env.GetClusterID()
  150. }
  151. name, err := result.GetString("persistentvolume")
  152. if err != nil {
  153. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  154. continue
  155. }
  156. // TODO niko/assets storage class
  157. cost := result.Values[0].Value
  158. key := fmt.Sprintf("%s/%s", cluster, name)
  159. if _, ok := diskMap[key]; !ok {
  160. diskMap[key] = &Disk{
  161. Cluster: cluster,
  162. Name: name,
  163. Breakdown: &ClusterCostsBreakdown{},
  164. }
  165. }
  166. diskMap[key].Cost += cost
  167. }
  168. for _, result := range resPVSize {
  169. cluster, err := result.GetString("cluster_id")
  170. if err != nil {
  171. cluster = env.GetClusterID()
  172. }
  173. name, err := result.GetString("persistentvolume")
  174. if err != nil {
  175. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  176. continue
  177. }
  178. // TODO niko/assets storage class
  179. bytes := result.Values[0].Value
  180. key := fmt.Sprintf("%s/%s", cluster, name)
  181. if _, ok := diskMap[key]; !ok {
  182. diskMap[key] = &Disk{
  183. Cluster: cluster,
  184. Name: name,
  185. Breakdown: &ClusterCostsBreakdown{},
  186. }
  187. }
  188. diskMap[key].Bytes = bytes
  189. }
  190. for _, result := range resLocalStorageCost {
  191. cluster, err := result.GetString("cluster_id")
  192. if err != nil {
  193. cluster = env.GetClusterID()
  194. }
  195. name, err := result.GetString("instance")
  196. if err != nil {
  197. log.Warningf("ClusterDisks: local storage data missing instance")
  198. continue
  199. }
  200. cost := result.Values[0].Value
  201. key := fmt.Sprintf("%s/%s", cluster, name)
  202. if _, ok := diskMap[key]; !ok {
  203. diskMap[key] = &Disk{
  204. Cluster: cluster,
  205. Name: name,
  206. Breakdown: &ClusterCostsBreakdown{},
  207. Local: true,
  208. }
  209. }
  210. diskMap[key].Cost += cost
  211. }
  212. for _, result := range resLocalStorageUsedCost {
  213. cluster, err := result.GetString("cluster_id")
  214. if err != nil {
  215. cluster = env.GetClusterID()
  216. }
  217. name, err := result.GetString("instance")
  218. if err != nil {
  219. log.Warningf("ClusterDisks: local storage usage data missing instance")
  220. continue
  221. }
  222. cost := result.Values[0].Value
  223. key := fmt.Sprintf("%s/%s", cluster, name)
  224. if _, ok := diskMap[key]; !ok {
  225. diskMap[key] = &Disk{
  226. Cluster: cluster,
  227. Name: name,
  228. Breakdown: &ClusterCostsBreakdown{},
  229. Local: true,
  230. }
  231. }
  232. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  233. }
  234. for _, result := range resLocalStorageBytes {
  235. cluster, err := result.GetString("cluster_id")
  236. if err != nil {
  237. cluster = env.GetClusterID()
  238. }
  239. name, err := result.GetString("instance")
  240. if err != nil {
  241. log.Warningf("ClusterDisks: local storage data missing instance")
  242. continue
  243. }
  244. bytes := result.Values[0].Value
  245. key := fmt.Sprintf("%s/%s", cluster, name)
  246. if _, ok := diskMap[key]; !ok {
  247. diskMap[key] = &Disk{
  248. Cluster: cluster,
  249. Name: name,
  250. Local: true,
  251. }
  252. }
  253. diskMap[key].Bytes = bytes
  254. }
  255. for _, result := range resActiveMins {
  256. cluster, err := result.GetString("cluster_id")
  257. if err != nil {
  258. cluster = env.GetClusterID()
  259. }
  260. name, err := result.GetString("persistentvolume")
  261. if err != nil {
  262. log.Warningf("ClusterDisks: active mins missing instance")
  263. continue
  264. }
  265. key := fmt.Sprintf("%s/%s", cluster, name)
  266. if _, ok := diskMap[key]; !ok {
  267. log.Warningf("ClusterDisks: active mins for unidentified disk")
  268. continue
  269. }
  270. if len(result.Values) == 0 {
  271. continue
  272. }
  273. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  274. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  275. mins := e.Sub(s).Minutes()
  276. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  277. diskMap[key].End = e
  278. diskMap[key].Start = s
  279. diskMap[key].Minutes = mins
  280. }
  281. for _, result := range resLocalActiveMins {
  282. cluster, err := result.GetString("cluster_id")
  283. if err != nil {
  284. cluster = env.GetClusterID()
  285. }
  286. name, err := result.GetString("node")
  287. if err != nil {
  288. log.Warningf("ClusterDisks: local active mins data missing instance")
  289. continue
  290. }
  291. key := fmt.Sprintf("%s/%s", cluster, name)
  292. if _, ok := diskMap[key]; !ok {
  293. log.Warningf("ClusterDisks: local active mins for unidentified disk")
  294. continue
  295. }
  296. if len(result.Values) == 0 {
  297. continue
  298. }
  299. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  300. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  301. mins := e.Sub(s).Minutes()
  302. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  303. diskMap[key].End = e
  304. diskMap[key].Start = s
  305. diskMap[key].Minutes = mins
  306. }
  307. for _, disk := range diskMap {
  308. // Apply all remaining RAM to Idle
  309. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  310. }
  311. return diskMap, nil
  312. }
  313. type Node struct {
  314. Cluster string
  315. Name string
  316. ProviderID string
  317. NodeType string
  318. CPUCost float64
  319. CPUCores float64
  320. GPUCost float64
  321. RAMCost float64
  322. RAMBytes float64
  323. Discount float64
  324. Preemptible bool
  325. CPUBreakdown *ClusterCostsBreakdown
  326. RAMBreakdown *ClusterCostsBreakdown
  327. Start time.Time
  328. End time.Time
  329. Minutes float64
  330. }
  331. var partialCPUMap = map[string]float64{
  332. "e2-micro": 0.25,
  333. "e2-small": 0.5,
  334. "e2-medium": 1.0,
  335. }
  336. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, []error) {
  337. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  338. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  339. if offset < time.Minute {
  340. offsetStr = ""
  341. }
  342. // minsPerResolution determines accuracy and resource use for the following
  343. // queries. Smaller values (higher resolution) result in better accuracy,
  344. // but more expensive queries, and vice-a-versa.
  345. minsPerResolution := 1
  346. resolution := time.Duration(minsPerResolution) * time.Minute
  347. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  348. // value, converts it to a cumulative value; i.e.
  349. // [$/hr] * [min/res]*[hr/min] = [$/res]
  350. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  351. ctx := prom.NewContext(client)
  352. queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  353. queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  354. queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  355. queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  356. queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  357. queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  358. queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
  359. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
  360. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
  361. queryActiveMins := fmt.Sprintf(`node_total_hourly_cost[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  362. resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
  363. resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
  364. resChNodeRAMCost := ctx.Query(queryNodeRAMCost)
  365. resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
  366. resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
  367. resChNodeLabels := ctx.Query(queryNodeLabels)
  368. resChNodeCPUModePct := ctx.Query(queryNodeCPUModePct)
  369. resChNodeRAMSystemPct := ctx.Query(queryNodeRAMSystemPct)
  370. resChNodeRAMUserPct := ctx.Query(queryNodeRAMUserPct)
  371. resChActiveMins := ctx.Query(queryActiveMins)
  372. resNodeCPUCost, _ := resChNodeCPUCost.Await()
  373. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  374. resNodeGPUCost, _ := resChNodeGPUCost.Await()
  375. resNodeRAMCost, _ := resChNodeRAMCost.Await()
  376. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  377. resNodeLabels, _ := resChNodeLabels.Await()
  378. resNodeCPUModePct, _ := resChNodeCPUModePct.Await()
  379. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  380. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  381. resActiveMins, _ := resChActiveMins.Await()
  382. if ctx.ErrorCollector.IsError() {
  383. return nil, ctx.Errors()
  384. }
  385. nodeMap := map[string]*Node{}
  386. for _, result := range resNodeCPUCost {
  387. cluster, err := result.GetString("cluster_id")
  388. if err != nil {
  389. cluster = env.GetClusterID()
  390. }
  391. name, err := result.GetString("node")
  392. if err != nil {
  393. log.Warningf("ClusterNodes: CPU cost data missing node")
  394. continue
  395. }
  396. nodeType, _ := result.GetString("instance_type")
  397. providerID, _ := result.GetString("provider_id")
  398. cpuCost := result.Values[0].Value
  399. key := fmt.Sprintf("%s/%s", cluster, name)
  400. if _, ok := nodeMap[key]; !ok {
  401. nodeMap[key] = &Node{
  402. Cluster: cluster,
  403. Name: name,
  404. NodeType: nodeType,
  405. ProviderID: cp.ParseID(providerID),
  406. CPUBreakdown: &ClusterCostsBreakdown{},
  407. RAMBreakdown: &ClusterCostsBreakdown{},
  408. }
  409. }
  410. nodeMap[key].CPUCost += cpuCost
  411. nodeMap[key].NodeType = nodeType
  412. if nodeMap[key].ProviderID == "" {
  413. nodeMap[key].ProviderID = cp.ParseID(providerID)
  414. }
  415. }
  416. for _, result := range resNodeCPUCores {
  417. cluster, err := result.GetString("cluster_id")
  418. if err != nil {
  419. cluster = env.GetClusterID()
  420. }
  421. name, err := result.GetString("node")
  422. if err != nil {
  423. log.Warningf("ClusterNodes: CPU cores data missing node")
  424. continue
  425. }
  426. cpuCores := result.Values[0].Value
  427. key := fmt.Sprintf("%s/%s", cluster, name)
  428. if _, ok := nodeMap[key]; !ok {
  429. nodeMap[key] = &Node{
  430. Cluster: cluster,
  431. Name: name,
  432. CPUBreakdown: &ClusterCostsBreakdown{},
  433. RAMBreakdown: &ClusterCostsBreakdown{},
  434. }
  435. }
  436. node := nodeMap[key]
  437. if v, ok := partialCPUMap[node.NodeType]; ok {
  438. node.CPUCores = v
  439. if cpuCores > 0 {
  440. adjustmentFactor := v / cpuCores
  441. node.CPUCost = node.CPUCost * adjustmentFactor
  442. }
  443. } else {
  444. nodeMap[key].CPUCores = cpuCores
  445. }
  446. }
  447. for _, result := range resNodeRAMCost {
  448. cluster, err := result.GetString("cluster_id")
  449. if err != nil {
  450. cluster = env.GetClusterID()
  451. }
  452. name, err := result.GetString("node")
  453. if err != nil {
  454. log.Warningf("ClusterNodes: RAM cost data missing node")
  455. continue
  456. }
  457. nodeType, _ := result.GetString("instance_type")
  458. providerID, _ := result.GetString("provider_id")
  459. ramCost := result.Values[0].Value
  460. key := fmt.Sprintf("%s/%s", cluster, name)
  461. if _, ok := nodeMap[key]; !ok {
  462. nodeMap[key] = &Node{
  463. Cluster: cluster,
  464. Name: name,
  465. NodeType: nodeType,
  466. ProviderID: cp.ParseID(providerID),
  467. CPUBreakdown: &ClusterCostsBreakdown{},
  468. RAMBreakdown: &ClusterCostsBreakdown{},
  469. }
  470. }
  471. nodeMap[key].RAMCost += ramCost
  472. nodeMap[key].NodeType = nodeType
  473. if nodeMap[key].ProviderID == "" {
  474. nodeMap[key].ProviderID = cp.ParseID(providerID)
  475. }
  476. }
  477. for _, result := range resNodeRAMBytes {
  478. cluster, err := result.GetString("cluster_id")
  479. if err != nil {
  480. cluster = env.GetClusterID()
  481. }
  482. name, err := result.GetString("node")
  483. if err != nil {
  484. log.Warningf("ClusterNodes: RAM bytes data missing node")
  485. continue
  486. }
  487. ramBytes := result.Values[0].Value
  488. key := fmt.Sprintf("%s/%s", cluster, name)
  489. if _, ok := nodeMap[key]; !ok {
  490. nodeMap[key] = &Node{
  491. Cluster: cluster,
  492. Name: name,
  493. CPUBreakdown: &ClusterCostsBreakdown{},
  494. RAMBreakdown: &ClusterCostsBreakdown{},
  495. }
  496. }
  497. nodeMap[key].RAMBytes = ramBytes
  498. }
  499. for _, result := range resNodeGPUCost {
  500. cluster, err := result.GetString("cluster_id")
  501. if err != nil {
  502. cluster = env.GetClusterID()
  503. }
  504. name, err := result.GetString("node")
  505. if err != nil {
  506. log.Warningf("ClusterNodes: GPU cost data missing node")
  507. continue
  508. }
  509. nodeType, _ := result.GetString("instance_type")
  510. providerID, _ := result.GetString("provider_id")
  511. gpuCost := result.Values[0].Value
  512. key := fmt.Sprintf("%s/%s", cluster, name)
  513. if _, ok := nodeMap[key]; !ok {
  514. nodeMap[key] = &Node{
  515. Cluster: cluster,
  516. Name: name,
  517. NodeType: nodeType,
  518. ProviderID: cp.ParseID(providerID),
  519. CPUBreakdown: &ClusterCostsBreakdown{},
  520. RAMBreakdown: &ClusterCostsBreakdown{},
  521. }
  522. }
  523. nodeMap[key].GPUCost += gpuCost
  524. if nodeMap[key].ProviderID == "" {
  525. nodeMap[key].ProviderID = cp.ParseID(providerID)
  526. }
  527. }
  528. for _, result := range resNodeCPUModePct {
  529. cluster, err := result.GetString("cluster_id")
  530. if err != nil {
  531. cluster = env.GetClusterID()
  532. }
  533. name, err := result.GetString("kubernetes_node")
  534. if err != nil {
  535. log.DedupedWarningf(5, "ClusterNodes: CPU mode data missing node")
  536. continue
  537. }
  538. mode, err := result.GetString("mode")
  539. if err != nil {
  540. log.Warningf("ClusterNodes: unable to read CPU mode: %s", err)
  541. mode = "other"
  542. }
  543. pct := result.Values[0].Value
  544. key := fmt.Sprintf("%s/%s", cluster, name)
  545. if _, ok := nodeMap[key]; !ok {
  546. log.Warningf("ClusterNodes: CPU mode data for unidentified node")
  547. continue
  548. }
  549. switch mode {
  550. case "idle":
  551. nodeMap[key].CPUBreakdown.Idle += pct
  552. case "system":
  553. nodeMap[key].CPUBreakdown.System += pct
  554. case "user":
  555. nodeMap[key].CPUBreakdown.User += pct
  556. default:
  557. nodeMap[key].CPUBreakdown.Other += pct
  558. }
  559. }
  560. for _, result := range resNodeRAMSystemPct {
  561. cluster, err := result.GetString("cluster_id")
  562. if err != nil {
  563. cluster = env.GetClusterID()
  564. }
  565. name, err := result.GetString("instance")
  566. if err != nil {
  567. log.Warningf("ClusterNodes: RAM system percent missing node")
  568. continue
  569. }
  570. pct := result.Values[0].Value
  571. key := fmt.Sprintf("%s/%s", cluster, name)
  572. if _, ok := nodeMap[key]; !ok {
  573. log.Warningf("ClusterNodes: RAM system percent for unidentified node")
  574. continue
  575. }
  576. nodeMap[key].RAMBreakdown.System += pct
  577. }
  578. for _, result := range resNodeRAMUserPct {
  579. cluster, err := result.GetString("cluster_id")
  580. if err != nil {
  581. cluster = env.GetClusterID()
  582. }
  583. name, err := result.GetString("instance")
  584. if err != nil {
  585. log.Warningf("ClusterNodes: RAM system percent missing node")
  586. continue
  587. }
  588. pct := result.Values[0].Value
  589. key := fmt.Sprintf("%s/%s", cluster, name)
  590. if _, ok := nodeMap[key]; !ok {
  591. log.Warningf("ClusterNodes: RAM system percent for unidentified node")
  592. continue
  593. }
  594. nodeMap[key].RAMBreakdown.User += pct
  595. }
  596. for _, result := range resActiveMins {
  597. cluster, err := result.GetString("cluster_id")
  598. if err != nil {
  599. cluster = env.GetClusterID()
  600. }
  601. name, err := result.GetString("node")
  602. if err != nil {
  603. log.Warningf("ClusterNodes: active mins missing node")
  604. continue
  605. }
  606. key := fmt.Sprintf("%s/%s", cluster, name)
  607. if _, ok := nodeMap[key]; !ok {
  608. log.Warningf("ClusterNodes: active mins for unidentified node")
  609. continue
  610. }
  611. if len(result.Values) == 0 {
  612. continue
  613. }
  614. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  615. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  616. mins := e.Sub(s).Minutes()
  617. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  618. nodeMap[key].End = e
  619. nodeMap[key].Start = s
  620. nodeMap[key].Minutes = mins
  621. }
  622. // Determine preemptibility with node labels
  623. for _, result := range resNodeLabels {
  624. nodeName, err := result.GetString("node")
  625. if err != nil {
  626. continue
  627. }
  628. // GCP preemptible label
  629. pre := result.Values[0].Value
  630. cluster, err := result.GetString("cluster_id")
  631. if err != nil {
  632. cluster = env.GetClusterID()
  633. }
  634. key := fmt.Sprintf("%s/%s", cluster, nodeName)
  635. if node, ok := nodeMap[key]; pre > 0.0 && ok {
  636. node.Preemptible = true
  637. }
  638. // TODO AWS preemptible
  639. // TODO Azure preemptible
  640. }
  641. c, err := cp.GetConfig()
  642. if err != nil {
  643. return nil, []error{err}
  644. }
  645. discount, err := ParsePercentString(c.Discount)
  646. if err != nil {
  647. return nil, []error{err}
  648. }
  649. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  650. if err != nil {
  651. return nil, []error{err}
  652. }
  653. for _, node := range nodeMap {
  654. // TODO take RI into account
  655. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  656. // Apply all remaining RAM to Idle
  657. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  658. }
  659. return nodeMap, nil
  660. }
  661. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  662. func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
  663. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  664. start, end, err := util.ParseTimeRange(window, offset)
  665. if err != nil {
  666. return nil, err
  667. }
  668. mins := end.Sub(*start).Minutes()
  669. // minsPerResolution determines accuracy and resource use for the following
  670. // queries. Smaller values (higher resolution) result in better accuracy,
  671. // but more expensive queries, and vice-a-versa.
  672. minsPerResolution := 5
  673. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  674. // value, converts it to a cumulative value; i.e.
  675. // [$/hr] * [min/res]*[hr/min] = [$/res]
  676. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  677. const fmtQueryDataCount = `
  678. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (cluster_id)[%s:%dm]%s) * %d
  679. `
  680. const fmtQueryTotalGPU = `
  681. sum(
  682. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  683. ) by (cluster_id)
  684. `
  685. const fmtQueryTotalCPU = `
  686. sum(
  687. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, cluster_id)[%s:%dm]%s) *
  688. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  689. ) by (cluster_id)
  690. `
  691. const fmtQueryTotalRAM = `
  692. sum(
  693. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  694. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  695. ) by (cluster_id)
  696. `
  697. const fmtQueryTotalStorage = `
  698. sum(
  699. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  700. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, cluster_id) * %f
  701. ) by (cluster_id)
  702. `
  703. const fmtQueryCPUModePct = `
  704. sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
  705. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)
  706. `
  707. const fmtQueryRAMSystemPct = `
  708. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (cluster_id)
  709. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  710. `
  711. const fmtQueryRAMUserPct = `
  712. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (cluster_id)
  713. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  714. `
  715. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  716. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  717. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  718. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  719. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  720. if queryTotalLocalStorage != "" {
  721. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  722. }
  723. fmtOffset := ""
  724. if offset != "" {
  725. fmtOffset = fmt.Sprintf("offset %s", offset)
  726. }
  727. queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, minsPerResolution, fmtOffset, minsPerResolution)
  728. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  729. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  730. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  731. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  732. ctx := prom.NewContext(client)
  733. resChs := ctx.QueryAll(
  734. queryDataCount,
  735. queryTotalGPU,
  736. queryTotalCPU,
  737. queryTotalRAM,
  738. queryTotalStorage,
  739. )
  740. // Only submit the local storage query if it is valid. Otherwise Prometheus
  741. // will return errors. Always append something to resChs, regardless, to
  742. // maintain indexing.
  743. if queryTotalLocalStorage != "" {
  744. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  745. } else {
  746. resChs = append(resChs, nil)
  747. }
  748. if withBreakdown {
  749. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
  750. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  751. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  752. bdResChs := ctx.QueryAll(
  753. queryCPUModePct,
  754. queryRAMSystemPct,
  755. queryRAMUserPct,
  756. )
  757. // Only submit the local storage query if it is valid. Otherwise Prometheus
  758. // will return errors. Always append something to resChs, regardless, to
  759. // maintain indexing.
  760. if queryUsedLocalStorage != "" {
  761. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  762. } else {
  763. bdResChs = append(bdResChs, nil)
  764. }
  765. resChs = append(resChs, bdResChs...)
  766. }
  767. resDataCount, _ := resChs[0].Await()
  768. resTotalGPU, _ := resChs[1].Await()
  769. resTotalCPU, _ := resChs[2].Await()
  770. resTotalRAM, _ := resChs[3].Await()
  771. resTotalStorage, _ := resChs[4].Await()
  772. if ctx.HasErrors() {
  773. return nil, ctx.Errors()[0]
  774. }
  775. defaultClusterID := env.GetClusterID()
  776. dataMinsByCluster := map[string]float64{}
  777. for _, result := range resDataCount {
  778. clusterID, _ := result.GetString("cluster_id")
  779. if clusterID == "" {
  780. clusterID = defaultClusterID
  781. }
  782. dataMins := mins
  783. if len(result.Values) > 0 {
  784. dataMins = result.Values[0].Value
  785. } else {
  786. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  787. }
  788. dataMinsByCluster[clusterID] = dataMins
  789. }
  790. // Determine combined discount
  791. discount, customDiscount := 0.0, 0.0
  792. c, err := A.Cloud.GetConfig()
  793. if err == nil {
  794. discount, err = ParsePercentString(c.Discount)
  795. if err != nil {
  796. discount = 0.0
  797. }
  798. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  799. if err != nil {
  800. customDiscount = 0.0
  801. }
  802. }
  803. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  804. costData := make(map[string]map[string]float64)
  805. // Helper function to iterate over Prom query results, parsing the raw values into
  806. // the intermediate costData structure.
  807. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  808. for _, result := range results {
  809. clusterID, _ := result.GetString("cluster_id")
  810. if clusterID == "" {
  811. clusterID = defaultClusterID
  812. }
  813. if _, ok := costData[clusterID]; !ok {
  814. costData[clusterID] = map[string]float64{}
  815. }
  816. if len(result.Values) > 0 {
  817. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  818. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  819. }
  820. }
  821. }
  822. // Apply both sustained use and custom discounts to RAM and CPU
  823. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  824. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  825. // Apply only custom discount to GPU and storage
  826. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  827. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  828. if queryTotalLocalStorage != "" {
  829. resTotalLocalStorage, err := resChs[5].Await()
  830. if err != nil {
  831. return nil, err
  832. }
  833. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  834. }
  835. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  836. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  837. pvUsedCostMap := map[string]float64{}
  838. if withBreakdown {
  839. resCPUModePct, _ := resChs[6].Await()
  840. resRAMSystemPct, _ := resChs[7].Await()
  841. resRAMUserPct, _ := resChs[8].Await()
  842. if ctx.HasErrors() {
  843. return nil, ctx.Errors()[0]
  844. }
  845. for _, result := range resCPUModePct {
  846. clusterID, _ := result.GetString("cluster_id")
  847. if clusterID == "" {
  848. clusterID = defaultClusterID
  849. }
  850. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  851. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  852. }
  853. cpuBD := cpuBreakdownMap[clusterID]
  854. mode, err := result.GetString("mode")
  855. if err != nil {
  856. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  857. mode = "other"
  858. }
  859. switch mode {
  860. case "idle":
  861. cpuBD.Idle += result.Values[0].Value
  862. case "system":
  863. cpuBD.System += result.Values[0].Value
  864. case "user":
  865. cpuBD.User += result.Values[0].Value
  866. default:
  867. cpuBD.Other += result.Values[0].Value
  868. }
  869. }
  870. for _, result := range resRAMSystemPct {
  871. clusterID, _ := result.GetString("cluster_id")
  872. if clusterID == "" {
  873. clusterID = defaultClusterID
  874. }
  875. if _, ok := ramBreakdownMap[clusterID]; !ok {
  876. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  877. }
  878. ramBD := ramBreakdownMap[clusterID]
  879. ramBD.System += result.Values[0].Value
  880. }
  881. for _, result := range resRAMUserPct {
  882. clusterID, _ := result.GetString("cluster_id")
  883. if clusterID == "" {
  884. clusterID = defaultClusterID
  885. }
  886. if _, ok := ramBreakdownMap[clusterID]; !ok {
  887. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  888. }
  889. ramBD := ramBreakdownMap[clusterID]
  890. ramBD.User += result.Values[0].Value
  891. }
  892. for _, ramBD := range ramBreakdownMap {
  893. remaining := 1.0
  894. remaining -= ramBD.Other
  895. remaining -= ramBD.System
  896. remaining -= ramBD.User
  897. ramBD.Idle = remaining
  898. }
  899. if queryUsedLocalStorage != "" {
  900. resUsedLocalStorage, err := resChs[9].Await()
  901. if err != nil {
  902. return nil, err
  903. }
  904. for _, result := range resUsedLocalStorage {
  905. clusterID, _ := result.GetString("cluster_id")
  906. if clusterID == "" {
  907. clusterID = defaultClusterID
  908. }
  909. pvUsedCostMap[clusterID] += result.Values[0].Value
  910. }
  911. }
  912. }
  913. if ctx.ErrorCollector.IsError() {
  914. for _, err := range ctx.Errors() {
  915. log.Errorf("ComputeClusterCosts: %s", err)
  916. }
  917. return nil, ctx.Errors()[0]
  918. }
  919. // Convert intermediate structure to Costs instances
  920. costsByCluster := map[string]*ClusterCosts{}
  921. for id, cd := range costData {
  922. dataMins, ok := dataMinsByCluster[id]
  923. if !ok {
  924. dataMins = mins
  925. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  926. }
  927. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/util.MinsPerHour)
  928. if err != nil {
  929. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  930. return nil, err
  931. }
  932. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  933. costs.CPUBreakdown = cpuBD
  934. }
  935. if ramBD, ok := ramBreakdownMap[id]; ok {
  936. costs.RAMBreakdown = ramBD
  937. }
  938. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  939. if pvUC, ok := pvUsedCostMap[id]; ok {
  940. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  941. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  942. }
  943. costs.DataMinutes = dataMins
  944. costsByCluster[id] = costs
  945. }
  946. return costsByCluster, nil
  947. }
  948. type Totals struct {
  949. TotalCost [][]string `json:"totalcost"`
  950. CPUCost [][]string `json:"cpucost"`
  951. MemCost [][]string `json:"memcost"`
  952. StorageCost [][]string `json:"storageCost"`
  953. }
  954. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  955. if len(qrs) == 0 {
  956. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  957. }
  958. result := qrs[0]
  959. totals := [][]string{}
  960. for _, value := range result.Values {
  961. d0 := fmt.Sprintf("%f", value.Timestamp)
  962. d1 := fmt.Sprintf("%f", value.Value)
  963. toAppend := []string{
  964. d0,
  965. d1,
  966. }
  967. totals = append(totals, toAppend)
  968. }
  969. return totals, nil
  970. }
  971. // ClusterCostsOverTime gives the full cluster costs over time
  972. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  973. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
  974. if localStorageQuery != "" {
  975. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  976. }
  977. layout := "2006-01-02T15:04:05.000Z"
  978. start, err := time.Parse(layout, startString)
  979. if err != nil {
  980. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  981. return nil, err
  982. }
  983. end, err := time.Parse(layout, endString)
  984. if err != nil {
  985. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  986. return nil, err
  987. }
  988. window, err := time.ParseDuration(windowString)
  989. if err != nil {
  990. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  991. return nil, err
  992. }
  993. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  994. if offset != "" {
  995. offset = fmt.Sprintf("offset %s", offset)
  996. }
  997. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  998. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  999. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  1000. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  1001. ctx := prom.NewContext(cli)
  1002. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1003. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1004. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1005. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1006. resultClusterCores, err := resChClusterCores.Await()
  1007. if err != nil {
  1008. return nil, err
  1009. }
  1010. resultClusterRAM, err := resChClusterRAM.Await()
  1011. if err != nil {
  1012. return nil, err
  1013. }
  1014. resultStorage, err := resChStorage.Await()
  1015. if err != nil {
  1016. return nil, err
  1017. }
  1018. resultTotal, err := resChTotal.Await()
  1019. if err != nil {
  1020. return nil, err
  1021. }
  1022. coreTotal, err := resultToTotals(resultClusterCores)
  1023. if err != nil {
  1024. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1025. return nil, err
  1026. }
  1027. ramTotal, err := resultToTotals(resultClusterRAM)
  1028. if err != nil {
  1029. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1030. return nil, err
  1031. }
  1032. storageTotal, err := resultToTotals(resultStorage)
  1033. if err != nil {
  1034. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1035. }
  1036. clusterTotal, err := resultToTotals(resultTotal)
  1037. if err != nil {
  1038. // If clusterTotal query failed, it's likely because there are no PVs, which
  1039. // causes the qTotal query to return no data. Instead, query only node costs.
  1040. // If that fails, return an error because something is actually wrong.
  1041. qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
  1042. resultNodes, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1043. if err != nil {
  1044. return nil, err
  1045. }
  1046. clusterTotal, err = resultToTotals(resultNodes)
  1047. if err != nil {
  1048. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1049. return nil, err
  1050. }
  1051. }
  1052. return &Totals{
  1053. TotalCost: clusterTotal,
  1054. CPUCost: coreTotal,
  1055. MemCost: ramTotal,
  1056. StorageCost: storageTotal,
  1057. }, nil
  1058. }