cluster.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/cloud"
  6. "github.com/kubecost/cost-model/pkg/env"
  7. "github.com/kubecost/cost-model/pkg/log"
  8. "github.com/kubecost/cost-model/pkg/prom"
  9. "github.com/kubecost/cost-model/pkg/util"
  10. prometheus "github.com/prometheus/client_golang/api"
  11. "k8s.io/klog"
  12. )
  13. const (
  14. queryClusterCores = `sum(
  15. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  16. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryClusterRAM = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  20. ) by (cluster_id)`
  21. queryStorage = `sum(
  22. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  23. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  24. ) by (cluster_id) %s`
  25. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  26. sum(
  27. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  29. ) by (cluster_id) %s`
  30. queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
  31. )
  32. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  33. // are broken down by cores, memory, and storage.
  34. type ClusterCosts struct {
  35. Start *time.Time `json:"startTime"`
  36. End *time.Time `json:"endTime"`
  37. CPUCumulative float64 `json:"cpuCumulativeCost"`
  38. CPUMonthly float64 `json:"cpuMonthlyCost"`
  39. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  40. GPUCumulative float64 `json:"gpuCumulativeCost"`
  41. GPUMonthly float64 `json:"gpuMonthlyCost"`
  42. RAMCumulative float64 `json:"ramCumulativeCost"`
  43. RAMMonthly float64 `json:"ramMonthlyCost"`
  44. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  45. StorageCumulative float64 `json:"storageCumulativeCost"`
  46. StorageMonthly float64 `json:"storageMonthlyCost"`
  47. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  48. TotalCumulative float64 `json:"totalCumulativeCost"`
  49. TotalMonthly float64 `json:"totalMonthlyCost"`
  50. DataMinutes float64
  51. }
  52. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  53. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  54. type ClusterCostsBreakdown struct {
  55. Idle float64 `json:"idle"`
  56. Other float64 `json:"other"`
  57. System float64 `json:"system"`
  58. User float64 `json:"user"`
  59. }
  60. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  61. // the associated monthly rate data, and returns the Costs.
  62. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
  63. start, end, err := util.ParseTimeRange(window, offset)
  64. if err != nil {
  65. return nil, err
  66. }
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(*start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: start,
  77. End: end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (util.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (util.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (util.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (util.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. Cost float64
  96. Bytes float64
  97. Local bool
  98. Start time.Time
  99. End time.Time
  100. Minutes float64
  101. Breakdown *ClusterCostsBreakdown
  102. }
  103. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
  104. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  105. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  106. if offset < time.Minute {
  107. offsetStr = ""
  108. }
  109. // minsPerResolution determines accuracy and resource use for the following
  110. // queries. Smaller values (higher resolution) result in better accuracy,
  111. // but more expensive queries, and vice-a-versa.
  112. minsPerResolution := 1
  113. resolution := time.Duration(minsPerResolution) * time.Minute
  114. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  115. // value, converts it to a cumulative value; i.e.
  116. // [$/hr] * [min/res]*[hr/min] = [$/res]
  117. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  118. // TODO niko/assets how do we not hard-code this price?
  119. costPerGBHr := 0.04 / 730.0
  120. ctx := prom.NewContext(client)
  121. queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * on(cluster_id, persistentvolume) group_right avg(pv_hourly_cost) by (cluster_id, persistentvolume,provider_id))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  122. queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  123. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  124. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  125. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  126. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  127. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  128. resChPVCost := ctx.Query(queryPVCost)
  129. resChPVSize := ctx.Query(queryPVSize)
  130. resChActiveMins := ctx.Query(queryActiveMins)
  131. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  132. resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
  133. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  134. resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
  135. resPVCost, _ := resChPVCost.Await()
  136. resPVSize, _ := resChPVSize.Await()
  137. resActiveMins, _ := resChActiveMins.Await()
  138. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  139. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  140. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  141. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  142. if ctx.HasErrors() {
  143. return nil, ctx.ErrorCollection()
  144. }
  145. diskMap := map[string]*Disk{}
  146. for _, result := range resPVCost {
  147. cluster, err := result.GetString("cluster_id")
  148. if err != nil {
  149. cluster = env.GetClusterID()
  150. }
  151. name, err := result.GetString("persistentvolume")
  152. if err != nil {
  153. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  154. continue
  155. }
  156. // TODO niko/assets storage class
  157. cost := result.Values[0].Value
  158. key := fmt.Sprintf("%s/%s", cluster, name)
  159. if _, ok := diskMap[key]; !ok {
  160. diskMap[key] = &Disk{
  161. Cluster: cluster,
  162. Name: name,
  163. Breakdown: &ClusterCostsBreakdown{},
  164. }
  165. }
  166. diskMap[key].Cost += cost
  167. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  168. if providerID != "" {
  169. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  170. }
  171. }
  172. for _, result := range resPVSize {
  173. cluster, err := result.GetString("cluster_id")
  174. if err != nil {
  175. cluster = env.GetClusterID()
  176. }
  177. name, err := result.GetString("persistentvolume")
  178. if err != nil {
  179. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  180. continue
  181. }
  182. // TODO niko/assets storage class
  183. bytes := result.Values[0].Value
  184. key := fmt.Sprintf("%s/%s", cluster, name)
  185. if _, ok := diskMap[key]; !ok {
  186. diskMap[key] = &Disk{
  187. Cluster: cluster,
  188. Name: name,
  189. Breakdown: &ClusterCostsBreakdown{},
  190. }
  191. }
  192. diskMap[key].Bytes = bytes
  193. }
  194. for _, result := range resLocalStorageCost {
  195. cluster, err := result.GetString("cluster_id")
  196. if err != nil {
  197. cluster = env.GetClusterID()
  198. }
  199. name, err := result.GetString("instance")
  200. if err != nil {
  201. log.Warningf("ClusterDisks: local storage data missing instance")
  202. continue
  203. }
  204. cost := result.Values[0].Value
  205. key := fmt.Sprintf("%s/%s", cluster, name)
  206. if _, ok := diskMap[key]; !ok {
  207. diskMap[key] = &Disk{
  208. Cluster: cluster,
  209. Name: name,
  210. Breakdown: &ClusterCostsBreakdown{},
  211. Local: true,
  212. }
  213. }
  214. diskMap[key].Cost += cost
  215. }
  216. for _, result := range resLocalStorageUsedCost {
  217. cluster, err := result.GetString("cluster_id")
  218. if err != nil {
  219. cluster = env.GetClusterID()
  220. }
  221. name, err := result.GetString("instance")
  222. if err != nil {
  223. log.Warningf("ClusterDisks: local storage usage data missing instance")
  224. continue
  225. }
  226. cost := result.Values[0].Value
  227. key := fmt.Sprintf("%s/%s", cluster, name)
  228. if _, ok := diskMap[key]; !ok {
  229. diskMap[key] = &Disk{
  230. Cluster: cluster,
  231. Name: name,
  232. Breakdown: &ClusterCostsBreakdown{},
  233. Local: true,
  234. }
  235. }
  236. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  237. }
  238. for _, result := range resLocalStorageBytes {
  239. cluster, err := result.GetString("cluster_id")
  240. if err != nil {
  241. cluster = env.GetClusterID()
  242. }
  243. name, err := result.GetString("instance")
  244. if err != nil {
  245. log.Warningf("ClusterDisks: local storage data missing instance")
  246. continue
  247. }
  248. bytes := result.Values[0].Value
  249. key := fmt.Sprintf("%s/%s", cluster, name)
  250. if _, ok := diskMap[key]; !ok {
  251. diskMap[key] = &Disk{
  252. Cluster: cluster,
  253. Name: name,
  254. Breakdown: &ClusterCostsBreakdown{},
  255. Local: true,
  256. }
  257. }
  258. diskMap[key].Bytes = bytes
  259. }
  260. for _, result := range resActiveMins {
  261. cluster, err := result.GetString("cluster_id")
  262. if err != nil {
  263. cluster = env.GetClusterID()
  264. }
  265. name, err := result.GetString("persistentvolume")
  266. if err != nil {
  267. log.Warningf("ClusterDisks: active mins missing instance")
  268. continue
  269. }
  270. key := fmt.Sprintf("%s/%s", cluster, name)
  271. if _, ok := diskMap[key]; !ok {
  272. log.Warningf("ClusterDisks: active mins for unidentified disk")
  273. continue
  274. }
  275. if len(result.Values) == 0 {
  276. continue
  277. }
  278. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  279. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  280. mins := e.Sub(s).Minutes()
  281. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  282. diskMap[key].End = e
  283. diskMap[key].Start = s
  284. diskMap[key].Minutes = mins
  285. }
  286. for _, result := range resLocalActiveMins {
  287. cluster, err := result.GetString("cluster_id")
  288. if err != nil {
  289. cluster = env.GetClusterID()
  290. }
  291. name, err := result.GetString("node")
  292. if err != nil {
  293. log.Warningf("ClusterDisks: local active mins data missing instance")
  294. continue
  295. }
  296. key := fmt.Sprintf("%s/%s", cluster, name)
  297. if _, ok := diskMap[key]; !ok {
  298. log.Warningf("ClusterDisks: local active mins for unidentified disk")
  299. continue
  300. }
  301. if len(result.Values) == 0 {
  302. continue
  303. }
  304. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  305. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  306. mins := e.Sub(s).Minutes()
  307. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  308. diskMap[key].End = e
  309. diskMap[key].Start = s
  310. diskMap[key].Minutes = mins
  311. }
  312. for _, disk := range diskMap {
  313. // Apply all remaining RAM to Idle
  314. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  315. }
  316. return diskMap, nil
  317. }
  318. type Node struct {
  319. Cluster string
  320. Name string
  321. ProviderID string
  322. NodeType string
  323. CPUCost float64
  324. CPUCores float64
  325. GPUCost float64
  326. RAMCost float64
  327. RAMBytes float64
  328. Discount float64
  329. Preemptible bool
  330. CPUBreakdown *ClusterCostsBreakdown
  331. RAMBreakdown *ClusterCostsBreakdown
  332. Start time.Time
  333. End time.Time
  334. Minutes float64
  335. Labels map[string]string
  336. }
  337. var partialCPUMap = map[string]float64{
  338. "e2-micro": 0.25,
  339. "e2-small": 0.5,
  340. "e2-medium": 1.0,
  341. }
  342. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*Node, error) {
  343. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  344. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  345. if offset < time.Minute {
  346. offsetStr = ""
  347. }
  348. // minsPerResolution determines accuracy and resource use for the following
  349. // queries. Smaller values (higher resolution) result in better accuracy,
  350. // but more expensive queries, and vice-a-versa.
  351. minsPerResolution := 1
  352. resolution := time.Duration(minsPerResolution) * time.Minute
  353. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  354. // value, converts it to a cumulative value; i.e.
  355. // [$/hr] * [min/res]*[hr/min] = [$/res]
  356. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  357. requiredCtx := prom.NewContext(client)
  358. optionalCtx := prom.NewContext(client)
  359. queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  360. queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  361. queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  362. queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  363. queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
  364. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
  365. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
  366. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
  367. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node,cluster_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  368. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  369. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  370. // Return errors if these fail
  371. resChNodeCPUCost := requiredCtx.Query(queryNodeCPUCost)
  372. resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
  373. resChNodeRAMCost := requiredCtx.Query(queryNodeRAMCost)
  374. resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
  375. resChNodeGPUCost := requiredCtx.Query(queryNodeGPUCost)
  376. resChActiveMins := requiredCtx.Query(queryActiveMins)
  377. resChIsSpot := requiredCtx.Query(queryIsSpot)
  378. // Do not return errors if these fail, but log warnings
  379. resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
  380. resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
  381. resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
  382. resChLabels := optionalCtx.Query(queryLabels)
  383. resNodeCPUCost, _ := resChNodeCPUCost.Await()
  384. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  385. resNodeGPUCost, _ := resChNodeGPUCost.Await()
  386. resNodeRAMCost, _ := resChNodeRAMCost.Await()
  387. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  388. resIsSpot, _ := resChIsSpot.Await()
  389. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  390. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  391. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  392. resActiveMins, _ := resChActiveMins.Await()
  393. resLabels, _ := resChLabels.Await()
  394. if optionalCtx.HasErrors() {
  395. for _, err := range optionalCtx.Errors() {
  396. log.Warningf("ClusterNodes: %s", err)
  397. }
  398. }
  399. if requiredCtx.HasErrors() {
  400. for _, err := range requiredCtx.Errors() {
  401. log.Errorf("ClusterNodes: %s", err)
  402. }
  403. return nil, requiredCtx.ErrorCollection()
  404. }
  405. nodeMap := map[string]*Node{}
  406. for _, result := range resNodeCPUCost {
  407. cluster, err := result.GetString("cluster_id")
  408. if err != nil {
  409. cluster = env.GetClusterID()
  410. }
  411. name, err := result.GetString("node")
  412. if err != nil {
  413. log.Warningf("ClusterNodes: CPU cost data missing node")
  414. continue
  415. }
  416. nodeType, _ := result.GetString("instance_type")
  417. providerID, _ := result.GetString("provider_id")
  418. cpuCost := result.Values[0].Value
  419. key := fmt.Sprintf("%s/%s", cluster, name)
  420. if _, ok := nodeMap[key]; !ok {
  421. nodeMap[key] = &Node{
  422. Cluster: cluster,
  423. Name: name,
  424. NodeType: nodeType,
  425. ProviderID: cp.ParseID(providerID),
  426. CPUBreakdown: &ClusterCostsBreakdown{},
  427. RAMBreakdown: &ClusterCostsBreakdown{},
  428. Labels: map[string]string{},
  429. }
  430. }
  431. nodeMap[key].CPUCost += cpuCost
  432. nodeMap[key].NodeType = nodeType
  433. if nodeMap[key].ProviderID == "" {
  434. nodeMap[key].ProviderID = cp.ParseID(providerID)
  435. }
  436. }
  437. for _, result := range resNodeCPUCores {
  438. cluster, err := result.GetString("cluster_id")
  439. if err != nil {
  440. cluster = env.GetClusterID()
  441. }
  442. name, err := result.GetString("node")
  443. if err != nil {
  444. log.Warningf("ClusterNodes: CPU cores data missing node")
  445. continue
  446. }
  447. cpuCores := result.Values[0].Value
  448. key := fmt.Sprintf("%s/%s", cluster, name)
  449. if _, ok := nodeMap[key]; !ok {
  450. nodeMap[key] = &Node{
  451. Cluster: cluster,
  452. Name: name,
  453. CPUBreakdown: &ClusterCostsBreakdown{},
  454. RAMBreakdown: &ClusterCostsBreakdown{},
  455. Labels: map[string]string{},
  456. }
  457. }
  458. node := nodeMap[key]
  459. if v, ok := partialCPUMap[node.NodeType]; ok {
  460. node.CPUCores = v
  461. if cpuCores > 0 {
  462. adjustmentFactor := v / cpuCores
  463. node.CPUCost = node.CPUCost * adjustmentFactor
  464. }
  465. } else {
  466. nodeMap[key].CPUCores = cpuCores
  467. }
  468. }
  469. for _, result := range resNodeRAMCost {
  470. cluster, err := result.GetString("cluster_id")
  471. if err != nil {
  472. cluster = env.GetClusterID()
  473. }
  474. name, err := result.GetString("node")
  475. if err != nil {
  476. log.Warningf("ClusterNodes: RAM cost data missing node")
  477. continue
  478. }
  479. nodeType, _ := result.GetString("instance_type")
  480. providerID, _ := result.GetString("provider_id")
  481. ramCost := result.Values[0].Value
  482. key := fmt.Sprintf("%s/%s", cluster, name)
  483. if _, ok := nodeMap[key]; !ok {
  484. nodeMap[key] = &Node{
  485. Cluster: cluster,
  486. Name: name,
  487. NodeType: nodeType,
  488. ProviderID: cp.ParseID(providerID),
  489. CPUBreakdown: &ClusterCostsBreakdown{},
  490. RAMBreakdown: &ClusterCostsBreakdown{},
  491. Labels: map[string]string{},
  492. }
  493. }
  494. nodeMap[key].RAMCost += ramCost
  495. nodeMap[key].NodeType = nodeType
  496. if nodeMap[key].ProviderID == "" {
  497. nodeMap[key].ProviderID = cp.ParseID(providerID)
  498. }
  499. }
  500. for _, result := range resNodeRAMBytes {
  501. cluster, err := result.GetString("cluster_id")
  502. if err != nil {
  503. cluster = env.GetClusterID()
  504. }
  505. name, err := result.GetString("node")
  506. if err != nil {
  507. log.Warningf("ClusterNodes: RAM bytes data missing node")
  508. continue
  509. }
  510. ramBytes := result.Values[0].Value
  511. key := fmt.Sprintf("%s/%s", cluster, name)
  512. if _, ok := nodeMap[key]; !ok {
  513. nodeMap[key] = &Node{
  514. Cluster: cluster,
  515. Name: name,
  516. CPUBreakdown: &ClusterCostsBreakdown{},
  517. RAMBreakdown: &ClusterCostsBreakdown{},
  518. Labels: map[string]string{},
  519. }
  520. }
  521. nodeMap[key].RAMBytes = ramBytes
  522. }
  523. for _, result := range resNodeGPUCost {
  524. cluster, err := result.GetString("cluster_id")
  525. if err != nil {
  526. cluster = env.GetClusterID()
  527. }
  528. name, err := result.GetString("node")
  529. if err != nil {
  530. log.Warningf("ClusterNodes: GPU cost data missing node")
  531. continue
  532. }
  533. nodeType, _ := result.GetString("instance_type")
  534. providerID, _ := result.GetString("provider_id")
  535. gpuCost := result.Values[0].Value
  536. key := fmt.Sprintf("%s/%s", cluster, name)
  537. if _, ok := nodeMap[key]; !ok {
  538. nodeMap[key] = &Node{
  539. Cluster: cluster,
  540. Name: name,
  541. NodeType: nodeType,
  542. ProviderID: cp.ParseID(providerID),
  543. CPUBreakdown: &ClusterCostsBreakdown{},
  544. RAMBreakdown: &ClusterCostsBreakdown{},
  545. Labels: map[string]string{},
  546. }
  547. }
  548. nodeMap[key].GPUCost += gpuCost
  549. if nodeMap[key].ProviderID == "" {
  550. nodeMap[key].ProviderID = cp.ParseID(providerID)
  551. }
  552. }
  553. // Mapping of cluster/node=cpu for computing resource efficiency
  554. clusterNodeCPUTotal := map[string]float64{}
  555. // Mapping of cluster/node:mode=cpu for computing resource efficiency
  556. clusterNodeModeCPUTotal := map[string]map[string]float64{}
  557. // Build intermediate structures for CPU usage by (cluster, node) and by
  558. // (cluster, node, mode) for computing resouce efficiency
  559. for _, result := range resNodeCPUModeTotal {
  560. cluster, err := result.GetString("cluster_id")
  561. if err != nil {
  562. cluster = env.GetClusterID()
  563. }
  564. node, err := result.GetString("kubernetes_node")
  565. if err != nil {
  566. log.DedupedWarningf(5, "ClusterNodes: CPU mode data missing node")
  567. continue
  568. }
  569. mode, err := result.GetString("mode")
  570. if err != nil {
  571. log.Warningf("ClusterNodes: unable to read CPU mode: %s", err)
  572. mode = "other"
  573. }
  574. key := fmt.Sprintf("%s/%s", cluster, node)
  575. total := result.Values[0].Value
  576. // Increment total
  577. clusterNodeCPUTotal[key] += total
  578. // Increment mode
  579. if _, ok := clusterNodeModeCPUTotal[key]; !ok {
  580. clusterNodeModeCPUTotal[key] = map[string]float64{}
  581. }
  582. clusterNodeModeCPUTotal[key][mode] += total
  583. }
  584. // Compute resource efficiency from intermediate structures
  585. for key, total := range clusterNodeCPUTotal {
  586. if modeTotals, ok := clusterNodeModeCPUTotal[key]; ok {
  587. for mode, subtotal := range modeTotals {
  588. // Compute percentage for the current cluster, node, mode
  589. pct := 0.0
  590. if total > 0 {
  591. pct = subtotal / total
  592. }
  593. if _, ok := nodeMap[key]; !ok {
  594. log.Warningf("ClusterNodes: CPU mode data for unidentified node")
  595. continue
  596. }
  597. switch mode {
  598. case "idle":
  599. nodeMap[key].CPUBreakdown.Idle += pct
  600. case "system":
  601. nodeMap[key].CPUBreakdown.System += pct
  602. case "user":
  603. nodeMap[key].CPUBreakdown.User += pct
  604. default:
  605. nodeMap[key].CPUBreakdown.Other += pct
  606. }
  607. }
  608. }
  609. }
  610. for _, result := range resNodeRAMSystemPct {
  611. cluster, err := result.GetString("cluster_id")
  612. if err != nil {
  613. cluster = env.GetClusterID()
  614. }
  615. name, err := result.GetString("instance")
  616. if err != nil {
  617. log.Warningf("ClusterNodes: RAM system percent missing node")
  618. continue
  619. }
  620. pct := result.Values[0].Value
  621. key := fmt.Sprintf("%s/%s", cluster, name)
  622. if _, ok := nodeMap[key]; !ok {
  623. log.Warningf("ClusterNodes: RAM system percent for unidentified node")
  624. continue
  625. }
  626. nodeMap[key].RAMBreakdown.System += pct
  627. }
  628. for _, result := range resNodeRAMUserPct {
  629. cluster, err := result.GetString("cluster_id")
  630. if err != nil {
  631. cluster = env.GetClusterID()
  632. }
  633. name, err := result.GetString("instance")
  634. if err != nil {
  635. log.Warningf("ClusterNodes: RAM system percent missing node")
  636. continue
  637. }
  638. pct := result.Values[0].Value
  639. key := fmt.Sprintf("%s/%s", cluster, name)
  640. if _, ok := nodeMap[key]; !ok {
  641. log.Warningf("ClusterNodes: RAM system percent for unidentified node")
  642. continue
  643. }
  644. nodeMap[key].RAMBreakdown.User += pct
  645. }
  646. for _, result := range resActiveMins {
  647. cluster, err := result.GetString("cluster_id")
  648. if err != nil {
  649. cluster = env.GetClusterID()
  650. }
  651. name, err := result.GetString("node")
  652. if err != nil {
  653. log.Warningf("ClusterNodes: active mins missing node")
  654. continue
  655. }
  656. key := fmt.Sprintf("%s/%s", cluster, name)
  657. if _, ok := nodeMap[key]; !ok {
  658. log.Warningf("ClusterNodes: active mins for unidentified node")
  659. continue
  660. }
  661. if len(result.Values) == 0 {
  662. continue
  663. }
  664. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  665. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  666. mins := e.Sub(s).Minutes()
  667. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  668. nodeMap[key].End = e
  669. nodeMap[key].Start = s
  670. nodeMap[key].Minutes = mins
  671. }
  672. // Determine preemptibility with node labels
  673. for _, result := range resIsSpot {
  674. nodeName, err := result.GetString("node")
  675. if err != nil {
  676. continue
  677. }
  678. // GCP preemptible label
  679. pre := result.Values[0].Value
  680. cluster, err := result.GetString("cluster_id")
  681. if err != nil {
  682. cluster = env.GetClusterID()
  683. }
  684. key := fmt.Sprintf("%s/%s", cluster, nodeName)
  685. if node, ok := nodeMap[key]; pre > 0.0 && ok {
  686. node.Preemptible = true
  687. }
  688. // TODO AWS preemptible
  689. // TODO Azure preemptible
  690. }
  691. // Copy labels into node
  692. for _, result := range resLabels {
  693. cluster, err := result.GetString("cluster_id")
  694. if err != nil {
  695. cluster = env.GetClusterID()
  696. }
  697. node, err := result.GetString("kubernetes_node")
  698. if err != nil {
  699. log.DedupedWarningf(5, "ClusterNodes: label data missing node")
  700. continue
  701. }
  702. key := fmt.Sprintf("%s/%s", cluster, node)
  703. if _, ok := nodeMap[key]; !ok {
  704. continue
  705. }
  706. for name, value := range result.Metric {
  707. if val, ok := value.(string); ok {
  708. nodeMap[key].Labels[name] = val
  709. }
  710. }
  711. }
  712. c, err := cp.GetConfig()
  713. if err != nil {
  714. return nil, err
  715. }
  716. discount, err := ParsePercentString(c.Discount)
  717. if err != nil {
  718. return nil, err
  719. }
  720. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  721. if err != nil {
  722. return nil, err
  723. }
  724. for _, node := range nodeMap {
  725. // TODO take RI into account
  726. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  727. // Apply all remaining resources to Idle
  728. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  729. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  730. }
  731. return nodeMap, nil
  732. }
  733. type LoadBalancer struct {
  734. Cluster string
  735. Name string
  736. ProviderID string
  737. Cost float64
  738. Start time.Time
  739. Minutes float64
  740. }
  741. func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
  742. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  743. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  744. if offset < time.Minute {
  745. offsetStr = ""
  746. }
  747. // minsPerResolution determines accuracy and resource use for the following
  748. // queries. Smaller values (higher resolution) result in better accuracy,
  749. // but more expensive queries, and vice-a-versa.
  750. minsPerResolution := 5
  751. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  752. // value, converts it to a cumulative value; i.e.
  753. // [$/hr] * [min/res]*[hr/min] = [$/res]
  754. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  755. ctx := prom.NewContext(client)
  756. queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  757. queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  758. resChLBCost := ctx.Query(queryLBCost)
  759. resChActiveMins := ctx.Query(queryActiveMins)
  760. resLBCost, _ := resChLBCost.Await()
  761. resActiveMins, _ := resChActiveMins.Await()
  762. if ctx.HasErrors() {
  763. return nil, ctx.ErrorCollection()
  764. }
  765. loadBalancerMap := map[string]*LoadBalancer{}
  766. for _, result := range resLBCost {
  767. cluster, err := result.GetString("cluster_id")
  768. if err != nil {
  769. cluster = env.GetClusterID()
  770. }
  771. namespace, err := result.GetString("namespace")
  772. if err != nil {
  773. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  774. continue
  775. }
  776. serviceName, err := result.GetString("service_name")
  777. if err != nil {
  778. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  779. continue
  780. }
  781. providerID := ""
  782. lbCost := result.Values[0].Value
  783. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  784. if _, ok := loadBalancerMap[key]; !ok {
  785. loadBalancerMap[key] = &LoadBalancer{
  786. Cluster: cluster,
  787. Name: namespace + "/" + serviceName,
  788. ProviderID: providerID, // cp.ParseID(providerID) if providerID does get recorded later
  789. }
  790. }
  791. loadBalancerMap[key].Cost += lbCost
  792. }
  793. for _, result := range resActiveMins {
  794. cluster, err := result.GetString("cluster_id")
  795. if err != nil {
  796. cluster = env.GetClusterID()
  797. }
  798. namespace, err := result.GetString("namespace")
  799. if err != nil {
  800. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  801. continue
  802. }
  803. serviceName, err := result.GetString("service_name")
  804. if err != nil {
  805. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  806. continue
  807. }
  808. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  809. if len(result.Values) == 0 {
  810. continue
  811. }
  812. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  813. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  814. mins := e.Sub(s).Minutes()
  815. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  816. loadBalancerMap[key].Start = s
  817. loadBalancerMap[key].Minutes = mins
  818. }
  819. return loadBalancerMap, nil
  820. }
  821. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  822. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
  823. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  824. start, end, err := util.ParseTimeRange(window, offset)
  825. if err != nil {
  826. return nil, err
  827. }
  828. mins := end.Sub(*start).Minutes()
  829. // minsPerResolution determines accuracy and resource use for the following
  830. // queries. Smaller values (higher resolution) result in better accuracy,
  831. // but more expensive queries, and vice-a-versa.
  832. minsPerResolution := 5
  833. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  834. // value, converts it to a cumulative value; i.e.
  835. // [$/hr] * [min/res]*[hr/min] = [$/res]
  836. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  837. const fmtQueryDataCount = `
  838. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (cluster_id)[%s:%dm]%s) * %d
  839. `
  840. const fmtQueryTotalGPU = `
  841. sum(
  842. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  843. ) by (cluster_id)
  844. `
  845. const fmtQueryTotalCPU = `
  846. sum(
  847. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, cluster_id)[%s:%dm]%s) *
  848. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  849. ) by (cluster_id)
  850. `
  851. const fmtQueryTotalRAM = `
  852. sum(
  853. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  854. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  855. ) by (cluster_id)
  856. `
  857. const fmtQueryTotalStorage = `
  858. sum(
  859. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  860. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, cluster_id) * %f
  861. ) by (cluster_id)
  862. `
  863. const fmtQueryCPUModePct = `
  864. sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
  865. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)
  866. `
  867. const fmtQueryRAMSystemPct = `
  868. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (cluster_id)
  869. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  870. `
  871. const fmtQueryRAMUserPct = `
  872. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (cluster_id)
  873. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  874. `
  875. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  876. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  877. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  878. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  879. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  880. if queryTotalLocalStorage != "" {
  881. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  882. }
  883. fmtOffset := ""
  884. if offset != "" {
  885. fmtOffset = fmt.Sprintf("offset %s", offset)
  886. }
  887. queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, minsPerResolution, fmtOffset, minsPerResolution)
  888. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  889. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  890. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  891. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  892. ctx := prom.NewContext(client)
  893. resChs := ctx.QueryAll(
  894. queryDataCount,
  895. queryTotalGPU,
  896. queryTotalCPU,
  897. queryTotalRAM,
  898. queryTotalStorage,
  899. )
  900. // Only submit the local storage query if it is valid. Otherwise Prometheus
  901. // will return errors. Always append something to resChs, regardless, to
  902. // maintain indexing.
  903. if queryTotalLocalStorage != "" {
  904. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  905. } else {
  906. resChs = append(resChs, nil)
  907. }
  908. if withBreakdown {
  909. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
  910. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  911. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  912. bdResChs := ctx.QueryAll(
  913. queryCPUModePct,
  914. queryRAMSystemPct,
  915. queryRAMUserPct,
  916. )
  917. // Only submit the local storage query if it is valid. Otherwise Prometheus
  918. // will return errors. Always append something to resChs, regardless, to
  919. // maintain indexing.
  920. if queryUsedLocalStorage != "" {
  921. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  922. } else {
  923. bdResChs = append(bdResChs, nil)
  924. }
  925. resChs = append(resChs, bdResChs...)
  926. }
  927. resDataCount, _ := resChs[0].Await()
  928. resTotalGPU, _ := resChs[1].Await()
  929. resTotalCPU, _ := resChs[2].Await()
  930. resTotalRAM, _ := resChs[3].Await()
  931. resTotalStorage, _ := resChs[4].Await()
  932. if ctx.HasErrors() {
  933. return nil, ctx.ErrorCollection()
  934. }
  935. defaultClusterID := env.GetClusterID()
  936. dataMinsByCluster := map[string]float64{}
  937. for _, result := range resDataCount {
  938. clusterID, _ := result.GetString("cluster_id")
  939. if clusterID == "" {
  940. clusterID = defaultClusterID
  941. }
  942. dataMins := mins
  943. if len(result.Values) > 0 {
  944. dataMins = result.Values[0].Value
  945. } else {
  946. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  947. }
  948. dataMinsByCluster[clusterID] = dataMins
  949. }
  950. // Determine combined discount
  951. discount, customDiscount := 0.0, 0.0
  952. c, err := a.CloudProvider.GetConfig()
  953. if err == nil {
  954. discount, err = ParsePercentString(c.Discount)
  955. if err != nil {
  956. discount = 0.0
  957. }
  958. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  959. if err != nil {
  960. customDiscount = 0.0
  961. }
  962. }
  963. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  964. costData := make(map[string]map[string]float64)
  965. // Helper function to iterate over Prom query results, parsing the raw values into
  966. // the intermediate costData structure.
  967. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  968. for _, result := range results {
  969. clusterID, _ := result.GetString("cluster_id")
  970. if clusterID == "" {
  971. clusterID = defaultClusterID
  972. }
  973. if _, ok := costData[clusterID]; !ok {
  974. costData[clusterID] = map[string]float64{}
  975. }
  976. if len(result.Values) > 0 {
  977. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  978. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  979. }
  980. }
  981. }
  982. // Apply both sustained use and custom discounts to RAM and CPU
  983. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  984. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  985. // Apply only custom discount to GPU and storage
  986. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  987. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  988. if queryTotalLocalStorage != "" {
  989. resTotalLocalStorage, err := resChs[5].Await()
  990. if err != nil {
  991. return nil, err
  992. }
  993. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  994. }
  995. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  996. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  997. pvUsedCostMap := map[string]float64{}
  998. if withBreakdown {
  999. resCPUModePct, _ := resChs[6].Await()
  1000. resRAMSystemPct, _ := resChs[7].Await()
  1001. resRAMUserPct, _ := resChs[8].Await()
  1002. if ctx.HasErrors() {
  1003. return nil, ctx.ErrorCollection()
  1004. }
  1005. for _, result := range resCPUModePct {
  1006. clusterID, _ := result.GetString("cluster_id")
  1007. if clusterID == "" {
  1008. clusterID = defaultClusterID
  1009. }
  1010. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  1011. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1012. }
  1013. cpuBD := cpuBreakdownMap[clusterID]
  1014. mode, err := result.GetString("mode")
  1015. if err != nil {
  1016. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  1017. mode = "other"
  1018. }
  1019. switch mode {
  1020. case "idle":
  1021. cpuBD.Idle += result.Values[0].Value
  1022. case "system":
  1023. cpuBD.System += result.Values[0].Value
  1024. case "user":
  1025. cpuBD.User += result.Values[0].Value
  1026. default:
  1027. cpuBD.Other += result.Values[0].Value
  1028. }
  1029. }
  1030. for _, result := range resRAMSystemPct {
  1031. clusterID, _ := result.GetString("cluster_id")
  1032. if clusterID == "" {
  1033. clusterID = defaultClusterID
  1034. }
  1035. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1036. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1037. }
  1038. ramBD := ramBreakdownMap[clusterID]
  1039. ramBD.System += result.Values[0].Value
  1040. }
  1041. for _, result := range resRAMUserPct {
  1042. clusterID, _ := result.GetString("cluster_id")
  1043. if clusterID == "" {
  1044. clusterID = defaultClusterID
  1045. }
  1046. if _, ok := ramBreakdownMap[clusterID]; !ok {
  1047. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  1048. }
  1049. ramBD := ramBreakdownMap[clusterID]
  1050. ramBD.User += result.Values[0].Value
  1051. }
  1052. for _, ramBD := range ramBreakdownMap {
  1053. remaining := 1.0
  1054. remaining -= ramBD.Other
  1055. remaining -= ramBD.System
  1056. remaining -= ramBD.User
  1057. ramBD.Idle = remaining
  1058. }
  1059. if queryUsedLocalStorage != "" {
  1060. resUsedLocalStorage, err := resChs[9].Await()
  1061. if err != nil {
  1062. return nil, err
  1063. }
  1064. for _, result := range resUsedLocalStorage {
  1065. clusterID, _ := result.GetString("cluster_id")
  1066. if clusterID == "" {
  1067. clusterID = defaultClusterID
  1068. }
  1069. pvUsedCostMap[clusterID] += result.Values[0].Value
  1070. }
  1071. }
  1072. }
  1073. if ctx.HasErrors() {
  1074. for _, err := range ctx.Errors() {
  1075. log.Errorf("ComputeClusterCosts: %s", err)
  1076. }
  1077. return nil, ctx.ErrorCollection()
  1078. }
  1079. // Convert intermediate structure to Costs instances
  1080. costsByCluster := map[string]*ClusterCosts{}
  1081. for id, cd := range costData {
  1082. dataMins, ok := dataMinsByCluster[id]
  1083. if !ok {
  1084. dataMins = mins
  1085. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  1086. }
  1087. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/util.MinsPerHour)
  1088. if err != nil {
  1089. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  1090. return nil, err
  1091. }
  1092. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  1093. costs.CPUBreakdown = cpuBD
  1094. }
  1095. if ramBD, ok := ramBreakdownMap[id]; ok {
  1096. costs.RAMBreakdown = ramBD
  1097. }
  1098. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  1099. if pvUC, ok := pvUsedCostMap[id]; ok {
  1100. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  1101. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  1102. }
  1103. costs.DataMinutes = dataMins
  1104. costsByCluster[id] = costs
  1105. }
  1106. return costsByCluster, nil
  1107. }
  1108. type Totals struct {
  1109. TotalCost [][]string `json:"totalcost"`
  1110. CPUCost [][]string `json:"cpucost"`
  1111. MemCost [][]string `json:"memcost"`
  1112. StorageCost [][]string `json:"storageCost"`
  1113. }
  1114. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  1115. if len(qrs) == 0 {
  1116. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  1117. }
  1118. result := qrs[0]
  1119. totals := [][]string{}
  1120. for _, value := range result.Values {
  1121. d0 := fmt.Sprintf("%f", value.Timestamp)
  1122. d1 := fmt.Sprintf("%f", value.Value)
  1123. toAppend := []string{
  1124. d0,
  1125. d1,
  1126. }
  1127. totals = append(totals, toAppend)
  1128. }
  1129. return totals, nil
  1130. }
  1131. // ClusterCostsOverTime gives the full cluster costs over time
  1132. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  1133. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
  1134. if localStorageQuery != "" {
  1135. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  1136. }
  1137. layout := "2006-01-02T15:04:05.000Z"
  1138. start, err := time.Parse(layout, startString)
  1139. if err != nil {
  1140. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  1141. return nil, err
  1142. }
  1143. end, err := time.Parse(layout, endString)
  1144. if err != nil {
  1145. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  1146. return nil, err
  1147. }
  1148. window, err := time.ParseDuration(windowString)
  1149. if err != nil {
  1150. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  1151. return nil, err
  1152. }
  1153. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  1154. if offset != "" {
  1155. offset = fmt.Sprintf("offset %s", offset)
  1156. }
  1157. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  1158. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  1159. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  1160. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  1161. ctx := prom.NewContext(cli)
  1162. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  1163. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  1164. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  1165. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  1166. resultClusterCores, err := resChClusterCores.Await()
  1167. if err != nil {
  1168. return nil, err
  1169. }
  1170. resultClusterRAM, err := resChClusterRAM.Await()
  1171. if err != nil {
  1172. return nil, err
  1173. }
  1174. resultStorage, err := resChStorage.Await()
  1175. if err != nil {
  1176. return nil, err
  1177. }
  1178. resultTotal, err := resChTotal.Await()
  1179. if err != nil {
  1180. return nil, err
  1181. }
  1182. coreTotal, err := resultToTotals(resultClusterCores)
  1183. if err != nil {
  1184. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  1185. return nil, err
  1186. }
  1187. ramTotal, err := resultToTotals(resultClusterRAM)
  1188. if err != nil {
  1189. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  1190. return nil, err
  1191. }
  1192. storageTotal, err := resultToTotals(resultStorage)
  1193. if err != nil {
  1194. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  1195. }
  1196. clusterTotal, err := resultToTotals(resultTotal)
  1197. if err != nil {
  1198. // If clusterTotal query failed, it's likely because there are no PVs, which
  1199. // causes the qTotal query to return no data. Instead, query only node costs.
  1200. // If that fails, return an error because something is actually wrong.
  1201. qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
  1202. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  1203. for _, warning := range warnings {
  1204. log.Warningf(warning)
  1205. }
  1206. if err != nil {
  1207. return nil, err
  1208. }
  1209. clusterTotal, err = resultToTotals(resultNodes)
  1210. if err != nil {
  1211. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  1212. return nil, err
  1213. }
  1214. }
  1215. return &Totals{
  1216. TotalCost: clusterTotal,
  1217. CPUCost: coreTotal,
  1218. MemCost: ramTotal,
  1219. StorageCost: storageTotal,
  1220. }, nil
  1221. }