cluster.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/util/timeutil"
  6. "github.com/kubecost/cost-model/pkg/cloud"
  7. "github.com/kubecost/cost-model/pkg/env"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/prom"
  10. prometheus "github.com/prometheus/client_golang/api"
  11. "k8s.io/klog"
  12. )
  13. const (
  14. queryClusterCores = `sum(
  15. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  16. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  17. ) by (%s)`
  18. queryClusterRAM = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  20. ) by (%s)`
  21. queryStorage = `sum(
  22. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  23. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  24. ) by (%s) %s`
  25. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  26. sum(
  27. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  29. ) by (%s) %s`
  30. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  31. )
  32. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  33. // are broken down by cores, memory, and storage.
  34. type ClusterCosts struct {
  35. Start *time.Time `json:"startTime"`
  36. End *time.Time `json:"endTime"`
  37. CPUCumulative float64 `json:"cpuCumulativeCost"`
  38. CPUMonthly float64 `json:"cpuMonthlyCost"`
  39. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  40. GPUCumulative float64 `json:"gpuCumulativeCost"`
  41. GPUMonthly float64 `json:"gpuMonthlyCost"`
  42. RAMCumulative float64 `json:"ramCumulativeCost"`
  43. RAMMonthly float64 `json:"ramMonthlyCost"`
  44. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  45. StorageCumulative float64 `json:"storageCumulativeCost"`
  46. StorageMonthly float64 `json:"storageMonthlyCost"`
  47. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  48. TotalCumulative float64 `json:"totalCumulativeCost"`
  49. TotalMonthly float64 `json:"totalMonthlyCost"`
  50. DataMinutes float64
  51. }
  52. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  53. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  54. type ClusterCostsBreakdown struct {
  55. Idle float64 `json:"idle"`
  56. Other float64 `json:"other"`
  57. System float64 `json:"system"`
  58. User float64 `json:"user"`
  59. }
  60. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  61. // the associated monthly rate data, and returns the Costs.
  62. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  63. start, end := timeutil.ParseTimeRange(window, offset)
  64. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  65. if dataHours == 0 {
  66. dataHours = end.Sub(start).Hours()
  67. }
  68. // Do not allow zero-length windows to prevent divide-by-zero issues
  69. if dataHours == 0 {
  70. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  71. }
  72. cc := &ClusterCosts{
  73. Start: &start,
  74. End: &end,
  75. CPUCumulative: cpu,
  76. GPUCumulative: gpu,
  77. RAMCumulative: ram,
  78. StorageCumulative: storage,
  79. TotalCumulative: cpu + gpu + ram + storage,
  80. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  81. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  82. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  83. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  84. }
  85. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  86. return cc, nil
  87. }
  88. type Disk struct {
  89. Cluster string
  90. Name string
  91. ProviderID string
  92. Cost float64
  93. Bytes float64
  94. Local bool
  95. Start time.Time
  96. End time.Time
  97. Minutes float64
  98. Breakdown *ClusterCostsBreakdown
  99. }
  100. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
  101. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  102. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  103. if offset < time.Minute {
  104. offsetStr = ""
  105. }
  106. // minsPerResolution determines accuracy and resource use for the following
  107. // queries. Smaller values (higher resolution) result in better accuracy,
  108. // but more expensive queries, and vice-a-versa.
  109. minsPerResolution := 1
  110. resolution := time.Duration(minsPerResolution) * time.Minute
  111. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  112. // value, converts it to a cumulative value; i.e.
  113. // [$/hr] * [min/res]*[hr/min] = [$/res]
  114. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  115. // TODO niko/assets how do we not hard-code this price?
  116. costPerGBHr := 0.04 / 730.0
  117. ctx := prom.NewContext(client)
  118. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s]%s)) by (%s, persistentvolume,provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  119. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (%s, persistentvolume)`, durationStr, offsetStr, env.GetPromClusterLabel())
  120. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  121. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  122. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  123. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s)`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  124. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  125. resChPVCost := ctx.Query(queryPVCost)
  126. resChPVSize := ctx.Query(queryPVSize)
  127. resChActiveMins := ctx.Query(queryActiveMins)
  128. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  129. resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
  130. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  131. resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
  132. resPVCost, _ := resChPVCost.Await()
  133. resPVSize, _ := resChPVSize.Await()
  134. resActiveMins, _ := resChActiveMins.Await()
  135. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  136. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  137. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  138. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  139. if ctx.HasErrors() {
  140. return nil, ctx.ErrorCollection()
  141. }
  142. diskMap := map[string]*Disk{}
  143. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost)
  144. for _, result := range resLocalStorageCost {
  145. cluster, err := result.GetString(env.GetPromClusterLabel())
  146. if err != nil {
  147. cluster = env.GetClusterID()
  148. }
  149. name, err := result.GetString("instance")
  150. if err != nil {
  151. log.Warningf("ClusterDisks: local storage data missing instance")
  152. continue
  153. }
  154. cost := result.Values[0].Value
  155. key := fmt.Sprintf("%s/%s", cluster, name)
  156. if _, ok := diskMap[key]; !ok {
  157. diskMap[key] = &Disk{
  158. Cluster: cluster,
  159. Name: name,
  160. Breakdown: &ClusterCostsBreakdown{},
  161. Local: true,
  162. }
  163. }
  164. diskMap[key].Cost += cost
  165. }
  166. for _, result := range resLocalStorageUsedCost {
  167. cluster, err := result.GetString(env.GetPromClusterLabel())
  168. if err != nil {
  169. cluster = env.GetClusterID()
  170. }
  171. name, err := result.GetString("instance")
  172. if err != nil {
  173. log.Warningf("ClusterDisks: local storage usage data missing instance")
  174. continue
  175. }
  176. cost := result.Values[0].Value
  177. key := fmt.Sprintf("%s/%s", cluster, name)
  178. if _, ok := diskMap[key]; !ok {
  179. diskMap[key] = &Disk{
  180. Cluster: cluster,
  181. Name: name,
  182. Breakdown: &ClusterCostsBreakdown{},
  183. Local: true,
  184. }
  185. }
  186. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  187. }
  188. for _, result := range resLocalStorageBytes {
  189. cluster, err := result.GetString(env.GetPromClusterLabel())
  190. if err != nil {
  191. cluster = env.GetClusterID()
  192. }
  193. name, err := result.GetString("instance")
  194. if err != nil {
  195. log.Warningf("ClusterDisks: local storage data missing instance")
  196. continue
  197. }
  198. bytes := result.Values[0].Value
  199. key := fmt.Sprintf("%s/%s", cluster, name)
  200. if _, ok := diskMap[key]; !ok {
  201. diskMap[key] = &Disk{
  202. Cluster: cluster,
  203. Name: name,
  204. Breakdown: &ClusterCostsBreakdown{},
  205. Local: true,
  206. }
  207. }
  208. diskMap[key].Bytes = bytes
  209. }
  210. for _, result := range resLocalActiveMins {
  211. cluster, err := result.GetString(env.GetPromClusterLabel())
  212. if err != nil {
  213. cluster = env.GetClusterID()
  214. }
  215. name, err := result.GetString("node")
  216. if err != nil {
  217. log.Warningf("ClusterDisks: local active mins data missing instance")
  218. continue
  219. }
  220. key := fmt.Sprintf("%s/%s", cluster, name)
  221. if _, ok := diskMap[key]; !ok {
  222. log.Warningf("ClusterDisks: local active mins for unidentified disk")
  223. continue
  224. }
  225. if len(result.Values) == 0 {
  226. continue
  227. }
  228. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  229. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  230. mins := e.Sub(s).Minutes()
  231. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  232. diskMap[key].End = e
  233. diskMap[key].Start = s
  234. diskMap[key].Minutes = mins
  235. }
  236. for _, disk := range diskMap {
  237. // Apply all remaining RAM to Idle
  238. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  239. // Set provider Id to the name for reconciliation on Azure
  240. if fmt.Sprintf("%T", provider) == "*provider.Azure" {
  241. if disk.ProviderID == "" {
  242. disk.ProviderID = disk.Name
  243. }
  244. }
  245. }
  246. return diskMap, nil
  247. }
  248. type Node struct {
  249. Cluster string
  250. Name string
  251. ProviderID string
  252. NodeType string
  253. CPUCost float64
  254. CPUCores float64
  255. GPUCost float64
  256. GPUCount float64
  257. RAMCost float64
  258. RAMBytes float64
  259. Discount float64
  260. Preemptible bool
  261. CPUBreakdown *ClusterCostsBreakdown
  262. RAMBreakdown *ClusterCostsBreakdown
  263. Start time.Time
  264. End time.Time
  265. Minutes float64
  266. Labels map[string]string
  267. CostPerCPUHr float64
  268. CostPerRAMGiBHr float64
  269. CostPerGPUHr float64
  270. }
  271. // GKE lies about the number of cores e2 nodes have. This table
  272. // contains a mapping from node type -> actual CPU cores
  273. // for those cases.
  274. var partialCPUMap = map[string]float64{
  275. "e2-micro": 0.25,
  276. "e2-small": 0.5,
  277. "e2-medium": 1.0,
  278. }
  279. type NodeIdentifier struct {
  280. Cluster string
  281. Name string
  282. ProviderID string
  283. }
  284. type nodeIdentifierNoProviderID struct {
  285. Cluster string
  286. Name string
  287. }
  288. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  289. for k, v := range activeDataMap {
  290. keyNon := nodeIdentifierNoProviderID{
  291. Cluster: k.Cluster,
  292. Name: k.Name,
  293. }
  294. if cost, ok := costMap[k]; ok {
  295. minutes := v.minutes
  296. count := 1.0
  297. if c, ok := resourceCountMap[keyNon]; ok {
  298. count = c
  299. }
  300. costMap[k] = cost * (minutes / 60) * count
  301. }
  302. }
  303. }
  304. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  305. for k, v := range activeDataMap {
  306. if cost, ok := costMap[k]; ok {
  307. minutes := v.minutes
  308. costMap[k] = cost * (minutes / 60)
  309. }
  310. }
  311. }
  312. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[NodeIdentifier]*Node, error) {
  313. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  314. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  315. if offset < time.Minute {
  316. offsetStr = ""
  317. }
  318. // minsPerResolution determines accuracy and resource use for the following
  319. // queries. Smaller values (higher resolution) result in better accuracy,
  320. // but more expensive queries, and vice-a-versa.
  321. minsPerResolution := 1
  322. resolution := time.Duration(minsPerResolution) * time.Minute
  323. requiredCtx := prom.NewContext(client)
  324. optionalCtx := prom.NewContext(client)
  325. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  326. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  327. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr, env.GetPromClusterLabel())
  328. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  329. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (%s, node, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  330. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  331. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, %s, mode)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel())
  332. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  333. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  334. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  335. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  336. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  337. // Return errors if these fail
  338. resChNodeCPUHourlyCost := requiredCtx.Query(queryNodeCPUHourlyCost)
  339. resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
  340. resChNodeRAMHourlyCost := requiredCtx.Query(queryNodeRAMHourlyCost)
  341. resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
  342. resChNodeGPUCount := requiredCtx.Query(queryNodeGPUCount)
  343. resChNodeGPUHourlyCost := requiredCtx.Query(queryNodeGPUHourlyCost)
  344. resChActiveMins := requiredCtx.Query(queryActiveMins)
  345. resChIsSpot := requiredCtx.Query(queryIsSpot)
  346. // Do not return errors if these fail, but log warnings
  347. resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
  348. resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
  349. resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
  350. resChLabels := optionalCtx.Query(queryLabels)
  351. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  352. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  353. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  354. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  355. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  356. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  357. resIsSpot, _ := resChIsSpot.Await()
  358. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  359. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  360. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  361. resActiveMins, _ := resChActiveMins.Await()
  362. resLabels, _ := resChLabels.Await()
  363. if optionalCtx.HasErrors() {
  364. for _, err := range optionalCtx.Errors() {
  365. log.Warningf("ClusterNodes: %s", err)
  366. }
  367. }
  368. if requiredCtx.HasErrors() {
  369. for _, err := range requiredCtx.Errors() {
  370. log.Errorf("ClusterNodes: %s", err)
  371. }
  372. return nil, requiredCtx.ErrorCollection()
  373. }
  374. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  375. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  376. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost)
  377. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost)
  378. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap)
  379. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  380. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  381. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  382. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  383. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  384. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  385. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  386. preemptibleMap := buildPreemptibleMap(resIsSpot)
  387. labelsMap := buildLabelsMap(resLabels)
  388. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  389. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  390. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  391. nodeMap := buildNodeMap(
  392. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  393. cpuCoresMap, ramBytesMap, ramUserPctMap,
  394. ramSystemPctMap,
  395. cpuBreakdownMap,
  396. activeDataMap,
  397. preemptibleMap,
  398. labelsMap,
  399. clusterAndNameToType,
  400. )
  401. c, err := cp.GetConfig()
  402. if err != nil {
  403. return nil, err
  404. }
  405. discount, err := ParsePercentString(c.Discount)
  406. if err != nil {
  407. return nil, err
  408. }
  409. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  410. if err != nil {
  411. return nil, err
  412. }
  413. for _, node := range nodeMap {
  414. // TODO take GKE Reserved Instances into account
  415. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  416. // Apply all remaining resources to Idle
  417. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  418. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  419. }
  420. return nodeMap, nil
  421. }
  422. type LoadBalancer struct {
  423. Cluster string
  424. Name string
  425. ProviderID string
  426. Cost float64
  427. Start time.Time
  428. Minutes float64
  429. }
  430. func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
  431. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  432. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  433. if offset < time.Minute {
  434. offsetStr = ""
  435. }
  436. // minsPerResolution determines accuracy and resource use for the following
  437. // queries. Smaller values (higher resolution) result in better accuracy,
  438. // but more expensive queries, and vice-a-versa.
  439. minsPerResolution := 5
  440. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  441. // value, converts it to a cumulative value; i.e.
  442. // [$/hr] * [min/res]*[hr/min] = [$/res]
  443. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  444. ctx := prom.NewContext(client)
  445. queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip))[%s:%dm]%s) * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  446. queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  447. resChLBCost := ctx.Query(queryLBCost)
  448. resChActiveMins := ctx.Query(queryActiveMins)
  449. resLBCost, _ := resChLBCost.Await()
  450. resActiveMins, _ := resChActiveMins.Await()
  451. if ctx.HasErrors() {
  452. return nil, ctx.ErrorCollection()
  453. }
  454. loadBalancerMap := map[string]*LoadBalancer{}
  455. for _, result := range resLBCost {
  456. cluster, err := result.GetString(env.GetPromClusterLabel())
  457. if err != nil {
  458. cluster = env.GetClusterID()
  459. }
  460. namespace, err := result.GetString("namespace")
  461. if err != nil {
  462. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  463. continue
  464. }
  465. serviceName, err := result.GetString("service_name")
  466. if err != nil {
  467. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  468. continue
  469. }
  470. providerID, err := result.GetString("ingress_ip")
  471. if err != nil {
  472. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  473. providerID = ""
  474. }
  475. lbCost := result.Values[0].Value
  476. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  477. if _, ok := loadBalancerMap[key]; !ok {
  478. loadBalancerMap[key] = &LoadBalancer{
  479. Cluster: cluster,
  480. Name: namespace + "/" + serviceName,
  481. ProviderID: cloud.ParseLBID(providerID),
  482. }
  483. }
  484. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  485. // Prevents there from being a duplicate LoadBalancers on the same day
  486. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  487. loadBalancerMap[key].ProviderID = providerID
  488. }
  489. loadBalancerMap[key].Cost += lbCost
  490. }
  491. for _, result := range resActiveMins {
  492. cluster, err := result.GetString(env.GetPromClusterLabel())
  493. if err != nil {
  494. cluster = env.GetClusterID()
  495. }
  496. namespace, err := result.GetString("namespace")
  497. if err != nil {
  498. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  499. continue
  500. }
  501. serviceName, err := result.GetString("service_name")
  502. if err != nil {
  503. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  504. continue
  505. }
  506. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  507. if len(result.Values) == 0 {
  508. continue
  509. }
  510. if lb, ok := loadBalancerMap[key]; ok {
  511. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  512. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  513. mins := e.Sub(s).Minutes()
  514. lb.Start = s
  515. lb.Minutes = mins
  516. } else {
  517. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  518. }
  519. }
  520. return loadBalancerMap, nil
  521. }
  522. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  523. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  524. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  525. start, end := timeutil.ParseTimeRange(window, offset)
  526. mins := end.Sub(start).Minutes()
  527. // minsPerResolution determines accuracy and resource use for the following
  528. // queries. Smaller values (higher resolution) result in better accuracy,
  529. // but more expensive queries, and vice-a-versa.
  530. minsPerResolution := 5
  531. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  532. // value, converts it to a cumulative value; i.e.
  533. // [$/hr] * [min/res]*[hr/min] = [$/res]
  534. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  535. const fmtQueryDataCount = `
  536. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  537. `
  538. const fmtQueryTotalGPU = `
  539. sum(
  540. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  541. ) by (%s)
  542. `
  543. const fmtQueryTotalCPU = `
  544. sum(
  545. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  546. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  547. ) by (%s)
  548. `
  549. const fmtQueryTotalRAM = `
  550. sum(
  551. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  552. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  553. ) by (%s)
  554. `
  555. const fmtQueryTotalStorage = `
  556. sum(
  557. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  558. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  559. ) by (%s)
  560. `
  561. const fmtQueryCPUModePct = `
  562. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  563. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  564. `
  565. const fmtQueryRAMSystemPct = `
  566. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  567. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  568. `
  569. const fmtQueryRAMUserPct = `
  570. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  571. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  572. `
  573. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  574. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  575. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  576. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  577. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  578. if queryTotalLocalStorage != "" {
  579. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  580. }
  581. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  582. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, minsPerResolution)
  583. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  584. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  585. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  586. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  587. ctx := prom.NewContext(client)
  588. resChs := ctx.QueryAll(
  589. queryDataCount,
  590. queryTotalGPU,
  591. queryTotalCPU,
  592. queryTotalRAM,
  593. queryTotalStorage,
  594. )
  595. // Only submit the local storage query if it is valid. Otherwise Prometheus
  596. // will return errors. Always append something to resChs, regardless, to
  597. // maintain indexing.
  598. if queryTotalLocalStorage != "" {
  599. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  600. } else {
  601. resChs = append(resChs, nil)
  602. }
  603. if withBreakdown {
  604. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, env.GetPromClusterLabel(), window, fmtOffset, env.GetPromClusterLabel())
  605. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  606. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  607. bdResChs := ctx.QueryAll(
  608. queryCPUModePct,
  609. queryRAMSystemPct,
  610. queryRAMUserPct,
  611. )
  612. // Only submit the local storage query if it is valid. Otherwise Prometheus
  613. // will return errors. Always append something to resChs, regardless, to
  614. // maintain indexing.
  615. if queryUsedLocalStorage != "" {
  616. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  617. } else {
  618. bdResChs = append(bdResChs, nil)
  619. }
  620. resChs = append(resChs, bdResChs...)
  621. }
  622. resDataCount, _ := resChs[0].Await()
  623. resTotalGPU, _ := resChs[1].Await()
  624. resTotalCPU, _ := resChs[2].Await()
  625. resTotalRAM, _ := resChs[3].Await()
  626. resTotalStorage, _ := resChs[4].Await()
  627. if ctx.HasErrors() {
  628. return nil, ctx.ErrorCollection()
  629. }
  630. defaultClusterID := env.GetClusterID()
  631. dataMinsByCluster := map[string]float64{}
  632. for _, result := range resDataCount {
  633. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  634. if clusterID == "" {
  635. clusterID = defaultClusterID
  636. }
  637. dataMins := mins
  638. if len(result.Values) > 0 {
  639. dataMins = result.Values[0].Value
  640. } else {
  641. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  642. }
  643. dataMinsByCluster[clusterID] = dataMins
  644. }
  645. // Determine combined discount
  646. discount, customDiscount := 0.0, 0.0
  647. c, err := a.CloudProvider.GetConfig()
  648. if err == nil {
  649. discount, err = ParsePercentString(c.Discount)
  650. if err != nil {
  651. discount = 0.0
  652. }
  653. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  654. if err != nil {
  655. customDiscount = 0.0
  656. }
  657. }
  658. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  659. costData := make(map[string]map[string]float64)
  660. // Helper function to iterate over Prom query results, parsing the raw values into
  661. // the intermediate costData structure.
  662. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  663. for _, result := range results {
  664. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  665. if clusterID == "" {
  666. clusterID = defaultClusterID
  667. }
  668. if _, ok := costData[clusterID]; !ok {
  669. costData[clusterID] = map[string]float64{}
  670. }
  671. if len(result.Values) > 0 {
  672. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  673. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  674. }
  675. }
  676. }
  677. // Apply both sustained use and custom discounts to RAM and CPU
  678. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  679. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  680. // Apply only custom discount to GPU and storage
  681. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  682. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  683. if queryTotalLocalStorage != "" {
  684. resTotalLocalStorage, err := resChs[5].Await()
  685. if err != nil {
  686. return nil, err
  687. }
  688. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  689. }
  690. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  691. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  692. pvUsedCostMap := map[string]float64{}
  693. if withBreakdown {
  694. resCPUModePct, _ := resChs[6].Await()
  695. resRAMSystemPct, _ := resChs[7].Await()
  696. resRAMUserPct, _ := resChs[8].Await()
  697. if ctx.HasErrors() {
  698. return nil, ctx.ErrorCollection()
  699. }
  700. for _, result := range resCPUModePct {
  701. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  702. if clusterID == "" {
  703. clusterID = defaultClusterID
  704. }
  705. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  706. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  707. }
  708. cpuBD := cpuBreakdownMap[clusterID]
  709. mode, err := result.GetString("mode")
  710. if err != nil {
  711. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  712. mode = "other"
  713. }
  714. switch mode {
  715. case "idle":
  716. cpuBD.Idle += result.Values[0].Value
  717. case "system":
  718. cpuBD.System += result.Values[0].Value
  719. case "user":
  720. cpuBD.User += result.Values[0].Value
  721. default:
  722. cpuBD.Other += result.Values[0].Value
  723. }
  724. }
  725. for _, result := range resRAMSystemPct {
  726. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  727. if clusterID == "" {
  728. clusterID = defaultClusterID
  729. }
  730. if _, ok := ramBreakdownMap[clusterID]; !ok {
  731. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  732. }
  733. ramBD := ramBreakdownMap[clusterID]
  734. ramBD.System += result.Values[0].Value
  735. }
  736. for _, result := range resRAMUserPct {
  737. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  738. if clusterID == "" {
  739. clusterID = defaultClusterID
  740. }
  741. if _, ok := ramBreakdownMap[clusterID]; !ok {
  742. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  743. }
  744. ramBD := ramBreakdownMap[clusterID]
  745. ramBD.User += result.Values[0].Value
  746. }
  747. for _, ramBD := range ramBreakdownMap {
  748. remaining := 1.0
  749. remaining -= ramBD.Other
  750. remaining -= ramBD.System
  751. remaining -= ramBD.User
  752. ramBD.Idle = remaining
  753. }
  754. if queryUsedLocalStorage != "" {
  755. resUsedLocalStorage, err := resChs[9].Await()
  756. if err != nil {
  757. return nil, err
  758. }
  759. for _, result := range resUsedLocalStorage {
  760. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  761. if clusterID == "" {
  762. clusterID = defaultClusterID
  763. }
  764. pvUsedCostMap[clusterID] += result.Values[0].Value
  765. }
  766. }
  767. }
  768. if ctx.HasErrors() {
  769. for _, err := range ctx.Errors() {
  770. log.Errorf("ComputeClusterCosts: %s", err)
  771. }
  772. return nil, ctx.ErrorCollection()
  773. }
  774. // Convert intermediate structure to Costs instances
  775. costsByCluster := map[string]*ClusterCosts{}
  776. for id, cd := range costData {
  777. dataMins, ok := dataMinsByCluster[id]
  778. if !ok {
  779. dataMins = mins
  780. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  781. }
  782. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  783. if err != nil {
  784. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  785. return nil, err
  786. }
  787. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  788. costs.CPUBreakdown = cpuBD
  789. }
  790. if ramBD, ok := ramBreakdownMap[id]; ok {
  791. costs.RAMBreakdown = ramBD
  792. }
  793. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  794. if pvUC, ok := pvUsedCostMap[id]; ok {
  795. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  796. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  797. }
  798. costs.DataMinutes = dataMins
  799. costsByCluster[id] = costs
  800. }
  801. return costsByCluster, nil
  802. }
  803. type Totals struct {
  804. TotalCost [][]string `json:"totalcost"`
  805. CPUCost [][]string `json:"cpucost"`
  806. MemCost [][]string `json:"memcost"`
  807. StorageCost [][]string `json:"storageCost"`
  808. }
  809. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  810. if len(qrs) == 0 {
  811. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  812. }
  813. result := qrs[0]
  814. totals := [][]string{}
  815. for _, value := range result.Values {
  816. d0 := fmt.Sprintf("%f", value.Timestamp)
  817. d1 := fmt.Sprintf("%f", value.Value)
  818. toAppend := []string{
  819. d0,
  820. d1,
  821. }
  822. totals = append(totals, toAppend)
  823. }
  824. return totals, nil
  825. }
  826. // ClusterCostsOverTime gives the full cluster costs over time
  827. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  828. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  829. if localStorageQuery != "" {
  830. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  831. }
  832. layout := "2006-01-02T15:04:05.000Z"
  833. start, err := time.Parse(layout, startString)
  834. if err != nil {
  835. klog.V(1).Infof("Error parsing time %s. Error: %s", startString, err.Error())
  836. return nil, err
  837. }
  838. end, err := time.Parse(layout, endString)
  839. if err != nil {
  840. klog.V(1).Infof("Error parsing time %s. Error: %s", endString, err.Error())
  841. return nil, err
  842. }
  843. fmtWindow := timeutil.DurationString(window)
  844. if fmtWindow == "" {
  845. err := fmt.Errorf("window value invalid or missing")
  846. klog.V(1).Infof("Error parsing time %v. Error: %s", window, err.Error())
  847. return nil, err
  848. }
  849. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  850. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  851. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  852. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  853. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  854. ctx := prom.NewContext(cli)
  855. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  856. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  857. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  858. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  859. resultClusterCores, err := resChClusterCores.Await()
  860. if err != nil {
  861. return nil, err
  862. }
  863. resultClusterRAM, err := resChClusterRAM.Await()
  864. if err != nil {
  865. return nil, err
  866. }
  867. resultStorage, err := resChStorage.Await()
  868. if err != nil {
  869. return nil, err
  870. }
  871. resultTotal, err := resChTotal.Await()
  872. if err != nil {
  873. return nil, err
  874. }
  875. coreTotal, err := resultToTotals(resultClusterCores)
  876. if err != nil {
  877. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  878. return nil, err
  879. }
  880. ramTotal, err := resultToTotals(resultClusterRAM)
  881. if err != nil {
  882. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  883. return nil, err
  884. }
  885. storageTotal, err := resultToTotals(resultStorage)
  886. if err != nil {
  887. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  888. }
  889. clusterTotal, err := resultToTotals(resultTotal)
  890. if err != nil {
  891. // If clusterTotal query failed, it's likely because there are no PVs, which
  892. // causes the qTotal query to return no data. Instead, query only node costs.
  893. // If that fails, return an error because something is actually wrong.
  894. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  895. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  896. for _, warning := range warnings {
  897. log.Warningf(warning)
  898. }
  899. if err != nil {
  900. return nil, err
  901. }
  902. clusterTotal, err = resultToTotals(resultNodes)
  903. if err != nil {
  904. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  905. return nil, err
  906. }
  907. }
  908. return &Totals{
  909. TotalCost: clusterTotal,
  910. CPUCost: coreTotal,
  911. MemCost: ramTotal,
  912. StorageCost: storageTotal,
  913. }, nil
  914. }
  915. func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult) {
  916. for _, result := range resActiveMins {
  917. cluster, err := result.GetString(env.GetPromClusterLabel())
  918. if err != nil {
  919. cluster = env.GetClusterID()
  920. }
  921. name, err := result.GetString("persistentvolume")
  922. if err != nil {
  923. log.Warningf("ClusterDisks: active mins missing pv name")
  924. continue
  925. }
  926. if len(result.Values) == 0 {
  927. continue
  928. }
  929. key := fmt.Sprintf("%s/%s", cluster, name)
  930. if _, ok := diskMap[key]; !ok {
  931. diskMap[key] = &Disk{
  932. Cluster: cluster,
  933. Name: name,
  934. Breakdown: &ClusterCostsBreakdown{},
  935. }
  936. }
  937. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  938. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  939. mins := e.Sub(s).Minutes()
  940. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  941. diskMap[key].End = e
  942. diskMap[key].Start = s
  943. diskMap[key].Minutes = mins
  944. }
  945. for _, result := range resPVSize {
  946. cluster, err := result.GetString(env.GetPromClusterLabel())
  947. if err != nil {
  948. cluster = env.GetClusterID()
  949. }
  950. name, err := result.GetString("persistentvolume")
  951. if err != nil {
  952. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  953. continue
  954. }
  955. // TODO niko/assets storage class
  956. bytes := result.Values[0].Value
  957. key := fmt.Sprintf("%s/%s", cluster, name)
  958. if _, ok := diskMap[key]; !ok {
  959. diskMap[key] = &Disk{
  960. Cluster: cluster,
  961. Name: name,
  962. Breakdown: &ClusterCostsBreakdown{},
  963. }
  964. }
  965. diskMap[key].Bytes = bytes
  966. }
  967. for _, result := range resPVCost {
  968. cluster, err := result.GetString(env.GetPromClusterLabel())
  969. if err != nil {
  970. cluster = env.GetClusterID()
  971. }
  972. name, err := result.GetString("persistentvolume")
  973. if err != nil {
  974. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  975. continue
  976. }
  977. // TODO niko/assets storage class
  978. cost := result.Values[0].Value
  979. key := fmt.Sprintf("%s/%s", cluster, name)
  980. if _, ok := diskMap[key]; !ok {
  981. diskMap[key] = &Disk{
  982. Cluster: cluster,
  983. Name: name,
  984. Breakdown: &ClusterCostsBreakdown{},
  985. }
  986. }
  987. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  988. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  989. if providerID != "" {
  990. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  991. }
  992. }
  993. }