cluster.go 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/util/timeutil"
  6. "github.com/kubecost/cost-model/pkg/cloud"
  7. "github.com/kubecost/cost-model/pkg/env"
  8. "github.com/kubecost/cost-model/pkg/log"
  9. "github.com/kubecost/cost-model/pkg/prom"
  10. prometheus "github.com/prometheus/client_golang/api"
  11. "k8s.io/klog"
  12. )
  13. const (
  14. queryClusterCores = `sum(
  15. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  16. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  17. ) by (%s)`
  18. queryClusterRAM = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  20. ) by (%s)`
  21. queryStorage = `sum(
  22. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  23. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  24. ) by (%s) %s`
  25. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  26. sum(
  27. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  29. ) by (%s) %s`
  30. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  31. )
  32. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  33. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  34. // are broken down by cores, memory, and storage.
  35. type ClusterCosts struct {
  36. Start *time.Time `json:"startTime"`
  37. End *time.Time `json:"endTime"`
  38. CPUCumulative float64 `json:"cpuCumulativeCost"`
  39. CPUMonthly float64 `json:"cpuMonthlyCost"`
  40. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  41. GPUCumulative float64 `json:"gpuCumulativeCost"`
  42. GPUMonthly float64 `json:"gpuMonthlyCost"`
  43. RAMCumulative float64 `json:"ramCumulativeCost"`
  44. RAMMonthly float64 `json:"ramMonthlyCost"`
  45. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  46. StorageCumulative float64 `json:"storageCumulativeCost"`
  47. StorageMonthly float64 `json:"storageMonthlyCost"`
  48. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  49. TotalCumulative float64 `json:"totalCumulativeCost"`
  50. TotalMonthly float64 `json:"totalMonthlyCost"`
  51. DataMinutes float64
  52. }
  53. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  54. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  55. type ClusterCostsBreakdown struct {
  56. Idle float64 `json:"idle"`
  57. Other float64 `json:"other"`
  58. System float64 `json:"system"`
  59. User float64 `json:"user"`
  60. }
  61. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  62. // the associated monthly rate data, and returns the Costs.
  63. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  64. start, end := timeutil.ParseTimeRange(window, offset)
  65. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  66. if dataHours == 0 {
  67. dataHours = end.Sub(start).Hours()
  68. }
  69. // Do not allow zero-length windows to prevent divide-by-zero issues
  70. if dataHours == 0 {
  71. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  72. }
  73. cc := &ClusterCosts{
  74. Start: &start,
  75. End: &end,
  76. CPUCumulative: cpu,
  77. GPUCumulative: gpu,
  78. RAMCumulative: ram,
  79. StorageCumulative: storage,
  80. TotalCumulative: cpu + gpu + ram + storage,
  81. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  82. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  83. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  84. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  85. }
  86. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  87. return cc, nil
  88. }
  89. type Disk struct {
  90. Cluster string
  91. Name string
  92. ProviderID string
  93. Cost float64
  94. Bytes float64
  95. Local bool
  96. Start time.Time
  97. End time.Time
  98. Minutes float64
  99. Breakdown *ClusterCostsBreakdown
  100. }
  101. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
  102. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  103. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  104. if offset < time.Minute {
  105. offsetStr = ""
  106. }
  107. // minsPerResolution determines accuracy and resource use for the following
  108. // queries. Smaller values (higher resolution) result in better accuracy,
  109. // but more expensive queries, and vice-a-versa.
  110. minsPerResolution := 1
  111. resolution := time.Duration(minsPerResolution) * time.Minute
  112. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  113. // value, converts it to a cumulative value; i.e.
  114. // [$/hr] * [min/res]*[hr/min] = [$/res]
  115. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  116. // TODO niko/assets how do we not hard-code this price?
  117. costPerGBHr := 0.04 / 730.0
  118. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  119. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s]%s)) by (%s, persistentvolume,provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  120. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (%s, persistentvolume)`, durationStr, offsetStr, env.GetPromClusterLabel())
  121. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  122. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  123. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  124. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s)`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  125. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  126. resChPVCost := ctx.Query(queryPVCost)
  127. resChPVSize := ctx.Query(queryPVSize)
  128. resChActiveMins := ctx.Query(queryActiveMins)
  129. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  130. resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
  131. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  132. resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
  133. resPVCost, _ := resChPVCost.Await()
  134. resPVSize, _ := resChPVSize.Await()
  135. resActiveMins, _ := resChActiveMins.Await()
  136. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  137. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  138. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  139. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  140. if ctx.HasErrors() {
  141. return nil, ctx.ErrorCollection()
  142. }
  143. diskMap := map[string]*Disk{}
  144. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost)
  145. for _, result := range resLocalStorageCost {
  146. cluster, err := result.GetString(env.GetPromClusterLabel())
  147. if err != nil {
  148. cluster = env.GetClusterID()
  149. }
  150. name, err := result.GetString("instance")
  151. if err != nil {
  152. log.Warningf("ClusterDisks: local storage data missing instance")
  153. continue
  154. }
  155. cost := result.Values[0].Value
  156. key := fmt.Sprintf("%s/%s", cluster, name)
  157. if _, ok := diskMap[key]; !ok {
  158. diskMap[key] = &Disk{
  159. Cluster: cluster,
  160. Name: name,
  161. Breakdown: &ClusterCostsBreakdown{},
  162. Local: true,
  163. }
  164. }
  165. diskMap[key].Cost += cost
  166. }
  167. for _, result := range resLocalStorageUsedCost {
  168. cluster, err := result.GetString(env.GetPromClusterLabel())
  169. if err != nil {
  170. cluster = env.GetClusterID()
  171. }
  172. name, err := result.GetString("instance")
  173. if err != nil {
  174. log.Warningf("ClusterDisks: local storage usage data missing instance")
  175. continue
  176. }
  177. cost := result.Values[0].Value
  178. key := fmt.Sprintf("%s/%s", cluster, name)
  179. if _, ok := diskMap[key]; !ok {
  180. diskMap[key] = &Disk{
  181. Cluster: cluster,
  182. Name: name,
  183. Breakdown: &ClusterCostsBreakdown{},
  184. Local: true,
  185. }
  186. }
  187. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  188. }
  189. for _, result := range resLocalStorageBytes {
  190. cluster, err := result.GetString(env.GetPromClusterLabel())
  191. if err != nil {
  192. cluster = env.GetClusterID()
  193. }
  194. name, err := result.GetString("instance")
  195. if err != nil {
  196. log.Warningf("ClusterDisks: local storage data missing instance")
  197. continue
  198. }
  199. bytes := result.Values[0].Value
  200. key := fmt.Sprintf("%s/%s", cluster, name)
  201. if _, ok := diskMap[key]; !ok {
  202. diskMap[key] = &Disk{
  203. Cluster: cluster,
  204. Name: name,
  205. Breakdown: &ClusterCostsBreakdown{},
  206. Local: true,
  207. }
  208. }
  209. diskMap[key].Bytes = bytes
  210. if bytes/1024/1024/1024 > maxLocalDiskSize {
  211. log.Warningf("Deleting large root disk/localstorage disk from analysis")
  212. delete(diskMap, key)
  213. }
  214. }
  215. for _, result := range resLocalActiveMins {
  216. cluster, err := result.GetString(env.GetPromClusterLabel())
  217. if err != nil {
  218. cluster = env.GetClusterID()
  219. }
  220. name, err := result.GetString("node")
  221. if err != nil {
  222. log.Warningf("ClusterDisks: local active mins data missing instance")
  223. continue
  224. }
  225. key := fmt.Sprintf("%s/%s", cluster, name)
  226. if _, ok := diskMap[key]; !ok {
  227. log.Warningf("ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  228. continue
  229. }
  230. if len(result.Values) == 0 {
  231. continue
  232. }
  233. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  234. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  235. mins := e.Sub(s).Minutes()
  236. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  237. diskMap[key].End = e
  238. diskMap[key].Start = s
  239. diskMap[key].Minutes = mins
  240. }
  241. for _, disk := range diskMap {
  242. // Apply all remaining RAM to Idle
  243. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  244. // Set provider Id to the name for reconciliation on Azure
  245. if fmt.Sprintf("%T", provider) == "*provider.Azure" {
  246. if disk.ProviderID == "" {
  247. disk.ProviderID = disk.Name
  248. }
  249. }
  250. }
  251. return diskMap, nil
  252. }
  253. type Node struct {
  254. Cluster string
  255. Name string
  256. ProviderID string
  257. NodeType string
  258. CPUCost float64
  259. CPUCores float64
  260. GPUCost float64
  261. GPUCount float64
  262. RAMCost float64
  263. RAMBytes float64
  264. Discount float64
  265. Preemptible bool
  266. CPUBreakdown *ClusterCostsBreakdown
  267. RAMBreakdown *ClusterCostsBreakdown
  268. Start time.Time
  269. End time.Time
  270. Minutes float64
  271. Labels map[string]string
  272. CostPerCPUHr float64
  273. CostPerRAMGiBHr float64
  274. CostPerGPUHr float64
  275. }
  276. // GKE lies about the number of cores e2 nodes have. This table
  277. // contains a mapping from node type -> actual CPU cores
  278. // for those cases.
  279. var partialCPUMap = map[string]float64{
  280. "e2-micro": 0.25,
  281. "e2-small": 0.5,
  282. "e2-medium": 1.0,
  283. }
  284. type NodeIdentifier struct {
  285. Cluster string
  286. Name string
  287. ProviderID string
  288. }
  289. type nodeIdentifierNoProviderID struct {
  290. Cluster string
  291. Name string
  292. }
  293. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  294. for k, v := range activeDataMap {
  295. keyNon := nodeIdentifierNoProviderID{
  296. Cluster: k.Cluster,
  297. Name: k.Name,
  298. }
  299. if cost, ok := costMap[k]; ok {
  300. minutes := v.minutes
  301. count := 1.0
  302. if c, ok := resourceCountMap[keyNon]; ok {
  303. count = c
  304. }
  305. costMap[k] = cost * (minutes / 60) * count
  306. }
  307. }
  308. }
  309. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  310. for k, v := range activeDataMap {
  311. if cost, ok := costMap[k]; ok {
  312. minutes := v.minutes
  313. costMap[k] = cost * (minutes / 60)
  314. }
  315. }
  316. }
  317. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[NodeIdentifier]*Node, error) {
  318. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  319. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  320. if offset < time.Minute {
  321. offsetStr = ""
  322. }
  323. // minsPerResolution determines accuracy and resource use for the following
  324. // queries. Smaller values (higher resolution) result in better accuracy,
  325. // but more expensive queries, and vice-a-versa.
  326. minsPerResolution := 1
  327. resolution := time.Duration(minsPerResolution) * time.Minute
  328. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  329. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  330. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  331. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  332. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr, env.GetPromClusterLabel())
  333. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  334. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (%s, node, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  335. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  336. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, %s, mode)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel())
  337. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  338. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  339. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  340. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  341. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  342. // Return errors if these fail
  343. resChNodeCPUHourlyCost := requiredCtx.Query(queryNodeCPUHourlyCost)
  344. resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
  345. resChNodeRAMHourlyCost := requiredCtx.Query(queryNodeRAMHourlyCost)
  346. resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
  347. resChNodeGPUCount := requiredCtx.Query(queryNodeGPUCount)
  348. resChNodeGPUHourlyCost := requiredCtx.Query(queryNodeGPUHourlyCost)
  349. resChActiveMins := requiredCtx.Query(queryActiveMins)
  350. resChIsSpot := requiredCtx.Query(queryIsSpot)
  351. // Do not return errors if these fail, but log warnings
  352. resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
  353. resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
  354. resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
  355. resChLabels := optionalCtx.Query(queryLabels)
  356. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  357. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  358. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  359. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  360. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  361. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  362. resIsSpot, _ := resChIsSpot.Await()
  363. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  364. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  365. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  366. resActiveMins, _ := resChActiveMins.Await()
  367. resLabels, _ := resChLabels.Await()
  368. if optionalCtx.HasErrors() {
  369. for _, err := range optionalCtx.Errors() {
  370. log.Warningf("ClusterNodes: %s", err)
  371. }
  372. }
  373. if requiredCtx.HasErrors() {
  374. for _, err := range requiredCtx.Errors() {
  375. log.Errorf("ClusterNodes: %s", err)
  376. }
  377. return nil, requiredCtx.ErrorCollection()
  378. }
  379. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  380. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  381. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost)
  382. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost)
  383. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap)
  384. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  385. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  386. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  387. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  388. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  389. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  390. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  391. preemptibleMap := buildPreemptibleMap(resIsSpot)
  392. labelsMap := buildLabelsMap(resLabels)
  393. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  394. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  395. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  396. nodeMap := buildNodeMap(
  397. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  398. cpuCoresMap, ramBytesMap, ramUserPctMap,
  399. ramSystemPctMap,
  400. cpuBreakdownMap,
  401. activeDataMap,
  402. preemptibleMap,
  403. labelsMap,
  404. clusterAndNameToType,
  405. )
  406. c, err := cp.GetConfig()
  407. if err != nil {
  408. return nil, err
  409. }
  410. discount, err := ParsePercentString(c.Discount)
  411. if err != nil {
  412. return nil, err
  413. }
  414. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  415. if err != nil {
  416. return nil, err
  417. }
  418. for _, node := range nodeMap {
  419. // TODO take GKE Reserved Instances into account
  420. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  421. // Apply all remaining resources to Idle
  422. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  423. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  424. }
  425. return nodeMap, nil
  426. }
  427. type LoadBalancer struct {
  428. Cluster string
  429. Name string
  430. ProviderID string
  431. Cost float64
  432. Start time.Time
  433. Minutes float64
  434. }
  435. func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
  436. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  437. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  438. if offset < time.Minute {
  439. offsetStr = ""
  440. }
  441. // minsPerResolution determines accuracy and resource use for the following
  442. // queries. Smaller values (higher resolution) result in better accuracy,
  443. // but more expensive queries, and vice-a-versa.
  444. minsPerResolution := 5
  445. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  446. // value, converts it to a cumulative value; i.e.
  447. // [$/hr] * [min/res]*[hr/min] = [$/res]
  448. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  449. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  450. queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip))[%s:%dm]%s) * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  451. queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  452. resChLBCost := ctx.Query(queryLBCost)
  453. resChActiveMins := ctx.Query(queryActiveMins)
  454. resLBCost, _ := resChLBCost.Await()
  455. resActiveMins, _ := resChActiveMins.Await()
  456. if ctx.HasErrors() {
  457. return nil, ctx.ErrorCollection()
  458. }
  459. loadBalancerMap := map[string]*LoadBalancer{}
  460. for _, result := range resLBCost {
  461. cluster, err := result.GetString(env.GetPromClusterLabel())
  462. if err != nil {
  463. cluster = env.GetClusterID()
  464. }
  465. namespace, err := result.GetString("namespace")
  466. if err != nil {
  467. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  468. continue
  469. }
  470. serviceName, err := result.GetString("service_name")
  471. if err != nil {
  472. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  473. continue
  474. }
  475. providerID, err := result.GetString("ingress_ip")
  476. if err != nil {
  477. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  478. providerID = ""
  479. }
  480. lbCost := result.Values[0].Value
  481. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  482. if _, ok := loadBalancerMap[key]; !ok {
  483. loadBalancerMap[key] = &LoadBalancer{
  484. Cluster: cluster,
  485. Name: namespace + "/" + serviceName,
  486. ProviderID: cloud.ParseLBID(providerID),
  487. }
  488. }
  489. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  490. // Prevents there from being a duplicate LoadBalancers on the same day
  491. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  492. loadBalancerMap[key].ProviderID = providerID
  493. }
  494. loadBalancerMap[key].Cost += lbCost
  495. }
  496. for _, result := range resActiveMins {
  497. cluster, err := result.GetString(env.GetPromClusterLabel())
  498. if err != nil {
  499. cluster = env.GetClusterID()
  500. }
  501. namespace, err := result.GetString("namespace")
  502. if err != nil {
  503. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  504. continue
  505. }
  506. serviceName, err := result.GetString("service_name")
  507. if err != nil {
  508. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  509. continue
  510. }
  511. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  512. if len(result.Values) == 0 {
  513. continue
  514. }
  515. if lb, ok := loadBalancerMap[key]; ok {
  516. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  517. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  518. mins := e.Sub(s).Minutes()
  519. lb.Start = s
  520. lb.Minutes = mins
  521. } else {
  522. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  523. }
  524. }
  525. return loadBalancerMap, nil
  526. }
  527. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  528. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  529. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  530. start, end := timeutil.ParseTimeRange(window, offset)
  531. mins := end.Sub(start).Minutes()
  532. // minsPerResolution determines accuracy and resource use for the following
  533. // queries. Smaller values (higher resolution) result in better accuracy,
  534. // but more expensive queries, and vice-a-versa.
  535. minsPerResolution := 5
  536. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  537. // value, converts it to a cumulative value; i.e.
  538. // [$/hr] * [min/res]*[hr/min] = [$/res]
  539. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  540. const fmtQueryDataCount = `
  541. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  542. `
  543. const fmtQueryTotalGPU = `
  544. sum(
  545. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  546. ) by (%s)
  547. `
  548. const fmtQueryTotalCPU = `
  549. sum(
  550. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  551. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  552. ) by (%s)
  553. `
  554. const fmtQueryTotalRAM = `
  555. sum(
  556. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  557. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  558. ) by (%s)
  559. `
  560. const fmtQueryTotalStorage = `
  561. sum(
  562. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  563. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  564. ) by (%s)
  565. `
  566. const fmtQueryCPUModePct = `
  567. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  568. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  569. `
  570. const fmtQueryRAMSystemPct = `
  571. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  572. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  573. `
  574. const fmtQueryRAMUserPct = `
  575. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  576. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  577. `
  578. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  579. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  580. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  581. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  582. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  583. if queryTotalLocalStorage != "" {
  584. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  585. }
  586. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  587. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, minsPerResolution)
  588. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  589. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  590. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  591. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  592. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  593. resChs := ctx.QueryAll(
  594. queryDataCount,
  595. queryTotalGPU,
  596. queryTotalCPU,
  597. queryTotalRAM,
  598. queryTotalStorage,
  599. )
  600. // Only submit the local storage query if it is valid. Otherwise Prometheus
  601. // will return errors. Always append something to resChs, regardless, to
  602. // maintain indexing.
  603. if queryTotalLocalStorage != "" {
  604. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  605. } else {
  606. resChs = append(resChs, nil)
  607. }
  608. if withBreakdown {
  609. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, env.GetPromClusterLabel(), window, fmtOffset, env.GetPromClusterLabel())
  610. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  611. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  612. bdResChs := ctx.QueryAll(
  613. queryCPUModePct,
  614. queryRAMSystemPct,
  615. queryRAMUserPct,
  616. )
  617. // Only submit the local storage query if it is valid. Otherwise Prometheus
  618. // will return errors. Always append something to resChs, regardless, to
  619. // maintain indexing.
  620. if queryUsedLocalStorage != "" {
  621. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  622. } else {
  623. bdResChs = append(bdResChs, nil)
  624. }
  625. resChs = append(resChs, bdResChs...)
  626. }
  627. resDataCount, _ := resChs[0].Await()
  628. resTotalGPU, _ := resChs[1].Await()
  629. resTotalCPU, _ := resChs[2].Await()
  630. resTotalRAM, _ := resChs[3].Await()
  631. resTotalStorage, _ := resChs[4].Await()
  632. if ctx.HasErrors() {
  633. return nil, ctx.ErrorCollection()
  634. }
  635. defaultClusterID := env.GetClusterID()
  636. dataMinsByCluster := map[string]float64{}
  637. for _, result := range resDataCount {
  638. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  639. if clusterID == "" {
  640. clusterID = defaultClusterID
  641. }
  642. dataMins := mins
  643. if len(result.Values) > 0 {
  644. dataMins = result.Values[0].Value
  645. } else {
  646. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  647. }
  648. dataMinsByCluster[clusterID] = dataMins
  649. }
  650. // Determine combined discount
  651. discount, customDiscount := 0.0, 0.0
  652. c, err := a.CloudProvider.GetConfig()
  653. if err == nil {
  654. discount, err = ParsePercentString(c.Discount)
  655. if err != nil {
  656. discount = 0.0
  657. }
  658. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  659. if err != nil {
  660. customDiscount = 0.0
  661. }
  662. }
  663. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  664. costData := make(map[string]map[string]float64)
  665. // Helper function to iterate over Prom query results, parsing the raw values into
  666. // the intermediate costData structure.
  667. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  668. for _, result := range results {
  669. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  670. if clusterID == "" {
  671. clusterID = defaultClusterID
  672. }
  673. if _, ok := costData[clusterID]; !ok {
  674. costData[clusterID] = map[string]float64{}
  675. }
  676. if len(result.Values) > 0 {
  677. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  678. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  679. }
  680. }
  681. }
  682. // Apply both sustained use and custom discounts to RAM and CPU
  683. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  684. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  685. // Apply only custom discount to GPU and storage
  686. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  687. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  688. if queryTotalLocalStorage != "" {
  689. resTotalLocalStorage, err := resChs[5].Await()
  690. if err != nil {
  691. return nil, err
  692. }
  693. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  694. }
  695. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  696. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  697. pvUsedCostMap := map[string]float64{}
  698. if withBreakdown {
  699. resCPUModePct, _ := resChs[6].Await()
  700. resRAMSystemPct, _ := resChs[7].Await()
  701. resRAMUserPct, _ := resChs[8].Await()
  702. if ctx.HasErrors() {
  703. return nil, ctx.ErrorCollection()
  704. }
  705. for _, result := range resCPUModePct {
  706. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  707. if clusterID == "" {
  708. clusterID = defaultClusterID
  709. }
  710. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  711. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  712. }
  713. cpuBD := cpuBreakdownMap[clusterID]
  714. mode, err := result.GetString("mode")
  715. if err != nil {
  716. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  717. mode = "other"
  718. }
  719. switch mode {
  720. case "idle":
  721. cpuBD.Idle += result.Values[0].Value
  722. case "system":
  723. cpuBD.System += result.Values[0].Value
  724. case "user":
  725. cpuBD.User += result.Values[0].Value
  726. default:
  727. cpuBD.Other += result.Values[0].Value
  728. }
  729. }
  730. for _, result := range resRAMSystemPct {
  731. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  732. if clusterID == "" {
  733. clusterID = defaultClusterID
  734. }
  735. if _, ok := ramBreakdownMap[clusterID]; !ok {
  736. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  737. }
  738. ramBD := ramBreakdownMap[clusterID]
  739. ramBD.System += result.Values[0].Value
  740. }
  741. for _, result := range resRAMUserPct {
  742. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  743. if clusterID == "" {
  744. clusterID = defaultClusterID
  745. }
  746. if _, ok := ramBreakdownMap[clusterID]; !ok {
  747. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  748. }
  749. ramBD := ramBreakdownMap[clusterID]
  750. ramBD.User += result.Values[0].Value
  751. }
  752. for _, ramBD := range ramBreakdownMap {
  753. remaining := 1.0
  754. remaining -= ramBD.Other
  755. remaining -= ramBD.System
  756. remaining -= ramBD.User
  757. ramBD.Idle = remaining
  758. }
  759. if queryUsedLocalStorage != "" {
  760. resUsedLocalStorage, err := resChs[9].Await()
  761. if err != nil {
  762. return nil, err
  763. }
  764. for _, result := range resUsedLocalStorage {
  765. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  766. if clusterID == "" {
  767. clusterID = defaultClusterID
  768. }
  769. pvUsedCostMap[clusterID] += result.Values[0].Value
  770. }
  771. }
  772. }
  773. if ctx.HasErrors() {
  774. for _, err := range ctx.Errors() {
  775. log.Errorf("ComputeClusterCosts: %s", err)
  776. }
  777. return nil, ctx.ErrorCollection()
  778. }
  779. // Convert intermediate structure to Costs instances
  780. costsByCluster := map[string]*ClusterCosts{}
  781. for id, cd := range costData {
  782. dataMins, ok := dataMinsByCluster[id]
  783. if !ok {
  784. dataMins = mins
  785. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  786. }
  787. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  788. if err != nil {
  789. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  790. return nil, err
  791. }
  792. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  793. costs.CPUBreakdown = cpuBD
  794. }
  795. if ramBD, ok := ramBreakdownMap[id]; ok {
  796. costs.RAMBreakdown = ramBD
  797. }
  798. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  799. if pvUC, ok := pvUsedCostMap[id]; ok {
  800. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  801. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  802. }
  803. costs.DataMinutes = dataMins
  804. costsByCluster[id] = costs
  805. }
  806. return costsByCluster, nil
  807. }
  808. type Totals struct {
  809. TotalCost [][]string `json:"totalcost"`
  810. CPUCost [][]string `json:"cpucost"`
  811. MemCost [][]string `json:"memcost"`
  812. StorageCost [][]string `json:"storageCost"`
  813. }
  814. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  815. if len(qrs) == 0 {
  816. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  817. }
  818. result := qrs[0]
  819. totals := [][]string{}
  820. for _, value := range result.Values {
  821. d0 := fmt.Sprintf("%f", value.Timestamp)
  822. d1 := fmt.Sprintf("%f", value.Value)
  823. toAppend := []string{
  824. d0,
  825. d1,
  826. }
  827. totals = append(totals, toAppend)
  828. }
  829. return totals, nil
  830. }
  831. // ClusterCostsOverTime gives the full cluster costs over time
  832. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  833. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  834. if localStorageQuery != "" {
  835. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  836. }
  837. layout := "2006-01-02T15:04:05.000Z"
  838. start, err := time.Parse(layout, startString)
  839. if err != nil {
  840. klog.V(1).Infof("Error parsing time %s. Error: %s", startString, err.Error())
  841. return nil, err
  842. }
  843. end, err := time.Parse(layout, endString)
  844. if err != nil {
  845. klog.V(1).Infof("Error parsing time %s. Error: %s", endString, err.Error())
  846. return nil, err
  847. }
  848. fmtWindow := timeutil.DurationString(window)
  849. if fmtWindow == "" {
  850. err := fmt.Errorf("window value invalid or missing")
  851. klog.V(1).Infof("Error parsing time %v. Error: %s", window, err.Error())
  852. return nil, err
  853. }
  854. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  855. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  856. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  857. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  858. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  859. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  860. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  861. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  862. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  863. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  864. resultClusterCores, err := resChClusterCores.Await()
  865. if err != nil {
  866. return nil, err
  867. }
  868. resultClusterRAM, err := resChClusterRAM.Await()
  869. if err != nil {
  870. return nil, err
  871. }
  872. resultStorage, err := resChStorage.Await()
  873. if err != nil {
  874. return nil, err
  875. }
  876. resultTotal, err := resChTotal.Await()
  877. if err != nil {
  878. return nil, err
  879. }
  880. coreTotal, err := resultToTotals(resultClusterCores)
  881. if err != nil {
  882. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  883. return nil, err
  884. }
  885. ramTotal, err := resultToTotals(resultClusterRAM)
  886. if err != nil {
  887. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  888. return nil, err
  889. }
  890. storageTotal, err := resultToTotals(resultStorage)
  891. if err != nil {
  892. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  893. }
  894. clusterTotal, err := resultToTotals(resultTotal)
  895. if err != nil {
  896. // If clusterTotal query failed, it's likely because there are no PVs, which
  897. // causes the qTotal query to return no data. Instead, query only node costs.
  898. // If that fails, return an error because something is actually wrong.
  899. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  900. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  901. for _, warning := range warnings {
  902. log.Warningf(warning)
  903. }
  904. if err != nil {
  905. return nil, err
  906. }
  907. clusterTotal, err = resultToTotals(resultNodes)
  908. if err != nil {
  909. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  910. return nil, err
  911. }
  912. }
  913. return &Totals{
  914. TotalCost: clusterTotal,
  915. CPUCost: coreTotal,
  916. MemCost: ramTotal,
  917. StorageCost: storageTotal,
  918. }, nil
  919. }
  920. func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult) {
  921. for _, result := range resActiveMins {
  922. cluster, err := result.GetString(env.GetPromClusterLabel())
  923. if err != nil {
  924. cluster = env.GetClusterID()
  925. }
  926. name, err := result.GetString("persistentvolume")
  927. if err != nil {
  928. log.Warningf("ClusterDisks: active mins missing pv name")
  929. continue
  930. }
  931. if len(result.Values) == 0 {
  932. continue
  933. }
  934. key := fmt.Sprintf("%s/%s", cluster, name)
  935. if _, ok := diskMap[key]; !ok {
  936. diskMap[key] = &Disk{
  937. Cluster: cluster,
  938. Name: name,
  939. Breakdown: &ClusterCostsBreakdown{},
  940. }
  941. }
  942. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  943. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  944. mins := e.Sub(s).Minutes()
  945. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  946. diskMap[key].End = e
  947. diskMap[key].Start = s
  948. diskMap[key].Minutes = mins
  949. }
  950. for _, result := range resPVSize {
  951. cluster, err := result.GetString(env.GetPromClusterLabel())
  952. if err != nil {
  953. cluster = env.GetClusterID()
  954. }
  955. name, err := result.GetString("persistentvolume")
  956. if err != nil {
  957. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  958. continue
  959. }
  960. // TODO niko/assets storage class
  961. bytes := result.Values[0].Value
  962. key := fmt.Sprintf("%s/%s", cluster, name)
  963. if _, ok := diskMap[key]; !ok {
  964. diskMap[key] = &Disk{
  965. Cluster: cluster,
  966. Name: name,
  967. Breakdown: &ClusterCostsBreakdown{},
  968. }
  969. }
  970. diskMap[key].Bytes = bytes
  971. }
  972. for _, result := range resPVCost {
  973. cluster, err := result.GetString(env.GetPromClusterLabel())
  974. if err != nil {
  975. cluster = env.GetClusterID()
  976. }
  977. name, err := result.GetString("persistentvolume")
  978. if err != nil {
  979. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  980. continue
  981. }
  982. // TODO niko/assets storage class
  983. cost := result.Values[0].Value
  984. key := fmt.Sprintf("%s/%s", cluster, name)
  985. if _, ok := diskMap[key]; !ok {
  986. diskMap[key] = &Disk{
  987. Cluster: cluster,
  988. Name: name,
  989. Breakdown: &ClusterCostsBreakdown{},
  990. }
  991. }
  992. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  993. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  994. if providerID != "" {
  995. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  996. }
  997. }
  998. }