cluster.go 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/kubecost/cost-model/pkg/util/timeutil"
  7. "github.com/kubecost/cost-model/pkg/cloud"
  8. "github.com/kubecost/cost-model/pkg/env"
  9. "github.com/kubecost/cost-model/pkg/log"
  10. "github.com/kubecost/cost-model/pkg/prom"
  11. prometheus "github.com/prometheus/client_golang/api"
  12. "k8s.io/klog"
  13. )
  14. const (
  15. queryClusterCores = `sum(
  16. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  17. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  18. ) by (%s)`
  19. queryClusterRAM = `sum(
  20. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryStorage = `sum(
  23. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  24. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  25. ) by (%s) %s`
  26. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  27. sum(
  28. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  29. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  30. ) by (%s) %s`
  31. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  32. )
  33. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  34. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  35. // are broken down by cores, memory, and storage.
  36. type ClusterCosts struct {
  37. Start *time.Time `json:"startTime"`
  38. End *time.Time `json:"endTime"`
  39. CPUCumulative float64 `json:"cpuCumulativeCost"`
  40. CPUMonthly float64 `json:"cpuMonthlyCost"`
  41. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  42. GPUCumulative float64 `json:"gpuCumulativeCost"`
  43. GPUMonthly float64 `json:"gpuMonthlyCost"`
  44. RAMCumulative float64 `json:"ramCumulativeCost"`
  45. RAMMonthly float64 `json:"ramMonthlyCost"`
  46. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  47. StorageCumulative float64 `json:"storageCumulativeCost"`
  48. StorageMonthly float64 `json:"storageMonthlyCost"`
  49. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  50. TotalCumulative float64 `json:"totalCumulativeCost"`
  51. TotalMonthly float64 `json:"totalMonthlyCost"`
  52. DataMinutes float64
  53. }
  54. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  55. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  56. type ClusterCostsBreakdown struct {
  57. Idle float64 `json:"idle"`
  58. Other float64 `json:"other"`
  59. System float64 `json:"system"`
  60. User float64 `json:"user"`
  61. }
  62. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  63. // the associated monthly rate data, and returns the Costs.
  64. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  65. start, end := timeutil.ParseTimeRange(window, offset)
  66. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  67. if dataHours == 0 {
  68. dataHours = end.Sub(start).Hours()
  69. }
  70. // Do not allow zero-length windows to prevent divide-by-zero issues
  71. if dataHours == 0 {
  72. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  73. }
  74. cc := &ClusterCosts{
  75. Start: &start,
  76. End: &end,
  77. CPUCumulative: cpu,
  78. GPUCumulative: gpu,
  79. RAMCumulative: ram,
  80. StorageCumulative: storage,
  81. TotalCumulative: cpu + gpu + ram + storage,
  82. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  83. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  84. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  85. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  86. }
  87. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  88. return cc, nil
  89. }
  90. type Disk struct {
  91. Cluster string
  92. Name string
  93. ProviderID string
  94. Cost float64
  95. Bytes float64
  96. Local bool
  97. Start time.Time
  98. End time.Time
  99. Minutes float64
  100. Breakdown *ClusterCostsBreakdown
  101. }
  102. type DiskIdentifier struct {
  103. Cluster string
  104. Name string
  105. }
  106. func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  107. // Query for the duration between start and end
  108. durStr := timeutil.DurationString(end.Sub(start))
  109. // Start from the time "end", querying backwards
  110. t := end
  111. // minsPerResolution determines accuracy and resource use for the following
  112. // queries. Smaller values (higher resolution) result in better accuracy,
  113. // but more expensive queries, and vice-a-versa.
  114. minsPerResolution := 1
  115. resolution := time.Duration(minsPerResolution) * time.Minute
  116. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  117. // value, converts it to a cumulative value; i.e.
  118. // [$/hr] * [min/res]*[hr/min] = [$/res]
  119. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  120. // TODO niko/assets how do we not hard-code this price?
  121. costPerGBHr := 0.04 / 730.0
  122. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  123. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  124. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  125. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  126. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  127. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  128. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  129. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  130. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  131. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  132. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  133. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  134. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  135. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  136. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  137. resPVCost, _ := resChPVCost.Await()
  138. resPVSize, _ := resChPVSize.Await()
  139. resActiveMins, _ := resChActiveMins.Await()
  140. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  141. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  142. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  143. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  144. if ctx.HasErrors() {
  145. return nil, ctx.ErrorCollection()
  146. }
  147. diskMap := map[DiskIdentifier]*Disk{}
  148. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
  149. for _, result := range resLocalStorageCost {
  150. cluster, err := result.GetString(env.GetPromClusterLabel())
  151. if err != nil {
  152. cluster = env.GetClusterID()
  153. }
  154. name, err := result.GetString("instance")
  155. if err != nil {
  156. log.Warningf("ClusterDisks: local storage data missing instance")
  157. continue
  158. }
  159. cost := result.Values[0].Value
  160. key := DiskIdentifier{cluster, name}
  161. if _, ok := diskMap[key]; !ok {
  162. diskMap[key] = &Disk{
  163. Cluster: cluster,
  164. Name: name,
  165. Breakdown: &ClusterCostsBreakdown{},
  166. Local: true,
  167. }
  168. }
  169. diskMap[key].Cost += cost
  170. }
  171. for _, result := range resLocalStorageUsedCost {
  172. cluster, err := result.GetString(env.GetPromClusterLabel())
  173. if err != nil {
  174. cluster = env.GetClusterID()
  175. }
  176. name, err := result.GetString("instance")
  177. if err != nil {
  178. log.Warningf("ClusterDisks: local storage usage data missing instance")
  179. continue
  180. }
  181. cost := result.Values[0].Value
  182. key := DiskIdentifier{cluster, name}
  183. if _, ok := diskMap[key]; !ok {
  184. diskMap[key] = &Disk{
  185. Cluster: cluster,
  186. Name: name,
  187. Breakdown: &ClusterCostsBreakdown{},
  188. Local: true,
  189. }
  190. }
  191. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  192. }
  193. for _, result := range resLocalStorageBytes {
  194. cluster, err := result.GetString(env.GetPromClusterLabel())
  195. if err != nil {
  196. cluster = env.GetClusterID()
  197. }
  198. name, err := result.GetString("instance")
  199. if err != nil {
  200. log.Warningf("ClusterDisks: local storage data missing instance")
  201. continue
  202. }
  203. bytes := result.Values[0].Value
  204. key := DiskIdentifier{cluster, name}
  205. if _, ok := diskMap[key]; !ok {
  206. diskMap[key] = &Disk{
  207. Cluster: cluster,
  208. Name: name,
  209. Breakdown: &ClusterCostsBreakdown{},
  210. Local: true,
  211. }
  212. }
  213. diskMap[key].Bytes = bytes
  214. if bytes/1024/1024/1024 > maxLocalDiskSize {
  215. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  216. delete(diskMap, key)
  217. }
  218. }
  219. for _, result := range resLocalActiveMins {
  220. cluster, err := result.GetString(env.GetPromClusterLabel())
  221. if err != nil {
  222. cluster = env.GetClusterID()
  223. }
  224. name, err := result.GetString("node")
  225. if err != nil {
  226. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  227. continue
  228. }
  229. key := DiskIdentifier{cluster, name}
  230. if _, ok := diskMap[key]; !ok {
  231. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  232. continue
  233. }
  234. if len(result.Values) == 0 {
  235. continue
  236. }
  237. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  238. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  239. mins := e.Sub(s).Minutes()
  240. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  241. diskMap[key].End = e
  242. diskMap[key].Start = s
  243. diskMap[key].Minutes = mins
  244. }
  245. for _, disk := range diskMap {
  246. // Apply all remaining RAM to Idle
  247. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  248. // Set provider Id to the name for reconciliation on Azure
  249. if fmt.Sprintf("%T", provider) == "*provider.Azure" {
  250. if disk.ProviderID == "" {
  251. disk.ProviderID = disk.Name
  252. }
  253. }
  254. }
  255. return diskMap, nil
  256. }
  257. type Node struct {
  258. Cluster string
  259. Name string
  260. ProviderID string
  261. NodeType string
  262. CPUCost float64
  263. CPUCores float64
  264. GPUCost float64
  265. GPUCount float64
  266. RAMCost float64
  267. RAMBytes float64
  268. Discount float64
  269. Preemptible bool
  270. CPUBreakdown *ClusterCostsBreakdown
  271. RAMBreakdown *ClusterCostsBreakdown
  272. Start time.Time
  273. End time.Time
  274. Minutes float64
  275. Labels map[string]string
  276. CostPerCPUHr float64
  277. CostPerRAMGiBHr float64
  278. CostPerGPUHr float64
  279. }
  280. // GKE lies about the number of cores e2 nodes have. This table
  281. // contains a mapping from node type -> actual CPU cores
  282. // for those cases.
  283. var partialCPUMap = map[string]float64{
  284. "e2-micro": 0.25,
  285. "e2-small": 0.5,
  286. "e2-medium": 1.0,
  287. }
  288. type NodeIdentifier struct {
  289. Cluster string
  290. Name string
  291. ProviderID string
  292. }
  293. type nodeIdentifierNoProviderID struct {
  294. Cluster string
  295. Name string
  296. }
  297. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  298. for k, v := range activeDataMap {
  299. keyNon := nodeIdentifierNoProviderID{
  300. Cluster: k.Cluster,
  301. Name: k.Name,
  302. }
  303. if cost, ok := costMap[k]; ok {
  304. minutes := v.minutes
  305. count := 1.0
  306. if c, ok := resourceCountMap[keyNon]; ok {
  307. count = c
  308. }
  309. costMap[k] = cost * (minutes / 60) * count
  310. }
  311. }
  312. }
  313. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  314. for k, v := range activeDataMap {
  315. if cost, ok := costMap[k]; ok {
  316. minutes := v.minutes
  317. costMap[k] = cost * (minutes / 60)
  318. }
  319. }
  320. }
  321. func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  322. // Query for the duration between start and end
  323. durStr := timeutil.DurationString(end.Sub(start))
  324. // Start from the time "end", querying backwards
  325. t := end
  326. // minsPerResolution determines accuracy and resource use for the following
  327. // queries. Smaller values (higher resolution) result in better accuracy,
  328. // but more expensive queries, and vice-a-versa.
  329. minsPerResolution := 1
  330. resolution := time.Duration(minsPerResolution) * time.Minute
  331. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  332. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  333. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  334. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  335. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  336. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  337. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  338. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  339. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  340. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  341. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  342. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  343. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  344. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  345. // Return errors if these fail
  346. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  347. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  348. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  349. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  350. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  351. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  352. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  353. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  354. // Do not return errors if these fail, but log warnings
  355. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  356. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  357. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  358. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  359. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  360. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  361. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  362. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  363. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  364. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  365. resIsSpot, _ := resChIsSpot.Await()
  366. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  367. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  368. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  369. resActiveMins, _ := resChActiveMins.Await()
  370. resLabels, _ := resChLabels.Await()
  371. if optionalCtx.HasErrors() {
  372. for _, err := range optionalCtx.Errors() {
  373. log.Warningf("ClusterNodes: %s", err)
  374. }
  375. }
  376. if requiredCtx.HasErrors() {
  377. for _, err := range requiredCtx.Errors() {
  378. log.Errorf("ClusterNodes: %s", err)
  379. }
  380. return nil, requiredCtx.ErrorCollection()
  381. }
  382. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  383. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  384. preemptibleMap := buildPreemptibleMap(resIsSpot)
  385. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  386. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  387. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  388. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  389. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  390. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  391. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  392. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  393. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  394. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  395. labelsMap := buildLabelsMap(resLabels)
  396. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  397. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  398. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  399. nodeMap := buildNodeMap(
  400. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  401. cpuCoresMap, ramBytesMap, ramUserPctMap,
  402. ramSystemPctMap,
  403. cpuBreakdownMap,
  404. activeDataMap,
  405. preemptibleMap,
  406. labelsMap,
  407. clusterAndNameToType,
  408. resolution,
  409. )
  410. c, err := cp.GetConfig()
  411. if err != nil {
  412. return nil, err
  413. }
  414. discount, err := ParsePercentString(c.Discount)
  415. if err != nil {
  416. return nil, err
  417. }
  418. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  419. if err != nil {
  420. return nil, err
  421. }
  422. for _, node := range nodeMap {
  423. // TODO take GKE Reserved Instances into account
  424. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  425. // Apply all remaining resources to Idle
  426. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  427. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  428. }
  429. return nodeMap, nil
  430. }
  431. type LoadBalancerIdentifier struct {
  432. Cluster string
  433. Namespace string
  434. Name string
  435. }
  436. type LoadBalancer struct {
  437. Cluster string
  438. Namespace string
  439. Name string
  440. ProviderID string
  441. Cost float64
  442. Start time.Time
  443. End time.Time
  444. Minutes float64
  445. }
  446. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  447. // Query for the duration between start and end
  448. durStr := timeutil.DurationString(end.Sub(start))
  449. // Start from the time "end", querying backwards
  450. t := end
  451. // minsPerResolution determines accuracy and resource use for the following
  452. // queries. Smaller values (higher resolution) result in better accuracy,
  453. // but more expensive queries, and vice-a-versa.
  454. minsPerResolution := 1
  455. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  456. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  457. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  458. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  459. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  460. resLBCost, _ := resChLBCost.Await()
  461. resActiveMins, _ := resChActiveMins.Await()
  462. if ctx.HasErrors() {
  463. return nil, ctx.ErrorCollection()
  464. }
  465. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  466. for _, result := range resActiveMins {
  467. cluster, err := result.GetString(env.GetPromClusterLabel())
  468. if err != nil {
  469. cluster = env.GetClusterID()
  470. }
  471. namespace, err := result.GetString("namespace")
  472. if err != nil {
  473. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  474. continue
  475. }
  476. name, err := result.GetString("service_name")
  477. if err != nil {
  478. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  479. continue
  480. }
  481. providerID, err := result.GetString("ingress_ip")
  482. if err != nil {
  483. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  484. providerID = ""
  485. }
  486. key := LoadBalancerIdentifier{
  487. Cluster: cluster,
  488. Namespace: namespace,
  489. Name: name,
  490. }
  491. // Skip if there are no data
  492. if len(result.Values) == 0 {
  493. continue
  494. }
  495. // Add load balancer to the set of load balancers
  496. if _, ok := loadBalancerMap[key]; !ok {
  497. loadBalancerMap[key] = &LoadBalancer{
  498. Cluster: cluster,
  499. Namespace: namespace,
  500. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  501. ProviderID: cloud.ParseLBID(providerID),
  502. }
  503. }
  504. // Append start, end, and minutes. This should come before all other data.
  505. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  506. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  507. loadBalancerMap[key].Start = s
  508. loadBalancerMap[key].End = e
  509. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  510. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  511. // Prevents there from being a duplicate LoadBalancers on the same day
  512. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  513. loadBalancerMap[key].ProviderID = providerID
  514. }
  515. }
  516. for _, result := range resLBCost {
  517. cluster, err := result.GetString(env.GetPromClusterLabel())
  518. if err != nil {
  519. cluster = env.GetClusterID()
  520. }
  521. namespace, err := result.GetString("namespace")
  522. if err != nil {
  523. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  524. continue
  525. }
  526. name, err := result.GetString("service_name")
  527. if err != nil {
  528. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  529. continue
  530. }
  531. key := LoadBalancerIdentifier{
  532. Cluster: cluster,
  533. Namespace: namespace,
  534. Name: name,
  535. }
  536. // Apply cost as price-per-hour * hours
  537. if lb, ok := loadBalancerMap[key]; ok {
  538. lbPricePerHr := result.Values[0].Value
  539. hrs := lb.Minutes / 60.0
  540. lb.Cost += lbPricePerHr * hrs
  541. } else {
  542. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  543. }
  544. }
  545. return loadBalancerMap, nil
  546. }
  547. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  548. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  549. if window < 10*time.Minute {
  550. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  551. }
  552. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  553. start, end := timeutil.ParseTimeRange(window, offset)
  554. mins := end.Sub(start).Minutes()
  555. windowStr := timeutil.DurationString(window)
  556. // minsPerResolution determines accuracy and resource use for the following
  557. // queries. Smaller values (higher resolution) result in better accuracy,
  558. // but more expensive queries, and vice-a-versa.
  559. minsPerResolution := 5
  560. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  561. // value, converts it to a cumulative value; i.e.
  562. // [$/hr] * [min/res]*[hr/min] = [$/res]
  563. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  564. const fmtQueryDataCount = `
  565. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  566. `
  567. const fmtQueryTotalGPU = `
  568. sum(
  569. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  570. ) by (%s)
  571. `
  572. const fmtQueryTotalCPU = `
  573. sum(
  574. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  575. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  576. ) by (%s)
  577. `
  578. const fmtQueryTotalRAM = `
  579. sum(
  580. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  581. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  582. ) by (%s)
  583. `
  584. const fmtQueryTotalStorage = `
  585. sum(
  586. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  587. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  588. ) by (%s)
  589. `
  590. const fmtQueryCPUModePct = `
  591. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  592. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  593. `
  594. const fmtQueryRAMSystemPct = `
  595. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  596. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  597. `
  598. const fmtQueryRAMUserPct = `
  599. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  600. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  601. `
  602. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  603. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  604. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  605. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  606. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  607. if queryTotalLocalStorage != "" {
  608. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  609. }
  610. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  611. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  612. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  613. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  614. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  615. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  616. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  617. resChs := ctx.QueryAll(
  618. queryDataCount,
  619. queryTotalGPU,
  620. queryTotalCPU,
  621. queryTotalRAM,
  622. queryTotalStorage,
  623. )
  624. // Only submit the local storage query if it is valid. Otherwise Prometheus
  625. // will return errors. Always append something to resChs, regardless, to
  626. // maintain indexing.
  627. if queryTotalLocalStorage != "" {
  628. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  629. } else {
  630. resChs = append(resChs, nil)
  631. }
  632. if withBreakdown {
  633. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  634. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  635. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  636. bdResChs := ctx.QueryAll(
  637. queryCPUModePct,
  638. queryRAMSystemPct,
  639. queryRAMUserPct,
  640. )
  641. // Only submit the local storage query if it is valid. Otherwise Prometheus
  642. // will return errors. Always append something to resChs, regardless, to
  643. // maintain indexing.
  644. if queryUsedLocalStorage != "" {
  645. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  646. } else {
  647. bdResChs = append(bdResChs, nil)
  648. }
  649. resChs = append(resChs, bdResChs...)
  650. }
  651. resDataCount, _ := resChs[0].Await()
  652. resTotalGPU, _ := resChs[1].Await()
  653. resTotalCPU, _ := resChs[2].Await()
  654. resTotalRAM, _ := resChs[3].Await()
  655. resTotalStorage, _ := resChs[4].Await()
  656. if ctx.HasErrors() {
  657. return nil, ctx.ErrorCollection()
  658. }
  659. defaultClusterID := env.GetClusterID()
  660. dataMinsByCluster := map[string]float64{}
  661. for _, result := range resDataCount {
  662. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  663. if clusterID == "" {
  664. clusterID = defaultClusterID
  665. }
  666. dataMins := mins
  667. if len(result.Values) > 0 {
  668. dataMins = result.Values[0].Value
  669. } else {
  670. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  671. }
  672. dataMinsByCluster[clusterID] = dataMins
  673. }
  674. // Determine combined discount
  675. discount, customDiscount := 0.0, 0.0
  676. c, err := a.CloudProvider.GetConfig()
  677. if err == nil {
  678. discount, err = ParsePercentString(c.Discount)
  679. if err != nil {
  680. discount = 0.0
  681. }
  682. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  683. if err != nil {
  684. customDiscount = 0.0
  685. }
  686. }
  687. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  688. costData := make(map[string]map[string]float64)
  689. // Helper function to iterate over Prom query results, parsing the raw values into
  690. // the intermediate costData structure.
  691. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  692. for _, result := range results {
  693. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  694. if clusterID == "" {
  695. clusterID = defaultClusterID
  696. }
  697. if _, ok := costData[clusterID]; !ok {
  698. costData[clusterID] = map[string]float64{}
  699. }
  700. if len(result.Values) > 0 {
  701. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  702. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  703. }
  704. }
  705. }
  706. // Apply both sustained use and custom discounts to RAM and CPU
  707. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  708. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  709. // Apply only custom discount to GPU and storage
  710. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  711. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  712. if queryTotalLocalStorage != "" {
  713. resTotalLocalStorage, err := resChs[5].Await()
  714. if err != nil {
  715. return nil, err
  716. }
  717. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  718. }
  719. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  720. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  721. pvUsedCostMap := map[string]float64{}
  722. if withBreakdown {
  723. resCPUModePct, _ := resChs[6].Await()
  724. resRAMSystemPct, _ := resChs[7].Await()
  725. resRAMUserPct, _ := resChs[8].Await()
  726. if ctx.HasErrors() {
  727. return nil, ctx.ErrorCollection()
  728. }
  729. for _, result := range resCPUModePct {
  730. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  731. if clusterID == "" {
  732. clusterID = defaultClusterID
  733. }
  734. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  735. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  736. }
  737. cpuBD := cpuBreakdownMap[clusterID]
  738. mode, err := result.GetString("mode")
  739. if err != nil {
  740. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  741. mode = "other"
  742. }
  743. switch mode {
  744. case "idle":
  745. cpuBD.Idle += result.Values[0].Value
  746. case "system":
  747. cpuBD.System += result.Values[0].Value
  748. case "user":
  749. cpuBD.User += result.Values[0].Value
  750. default:
  751. cpuBD.Other += result.Values[0].Value
  752. }
  753. }
  754. for _, result := range resRAMSystemPct {
  755. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  756. if clusterID == "" {
  757. clusterID = defaultClusterID
  758. }
  759. if _, ok := ramBreakdownMap[clusterID]; !ok {
  760. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  761. }
  762. ramBD := ramBreakdownMap[clusterID]
  763. ramBD.System += result.Values[0].Value
  764. }
  765. for _, result := range resRAMUserPct {
  766. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  767. if clusterID == "" {
  768. clusterID = defaultClusterID
  769. }
  770. if _, ok := ramBreakdownMap[clusterID]; !ok {
  771. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  772. }
  773. ramBD := ramBreakdownMap[clusterID]
  774. ramBD.User += result.Values[0].Value
  775. }
  776. for _, ramBD := range ramBreakdownMap {
  777. remaining := 1.0
  778. remaining -= ramBD.Other
  779. remaining -= ramBD.System
  780. remaining -= ramBD.User
  781. ramBD.Idle = remaining
  782. }
  783. if queryUsedLocalStorage != "" {
  784. resUsedLocalStorage, err := resChs[9].Await()
  785. if err != nil {
  786. return nil, err
  787. }
  788. for _, result := range resUsedLocalStorage {
  789. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  790. if clusterID == "" {
  791. clusterID = defaultClusterID
  792. }
  793. pvUsedCostMap[clusterID] += result.Values[0].Value
  794. }
  795. }
  796. }
  797. if ctx.HasErrors() {
  798. for _, err := range ctx.Errors() {
  799. log.Errorf("ComputeClusterCosts: %s", err)
  800. }
  801. return nil, ctx.ErrorCollection()
  802. }
  803. // Convert intermediate structure to Costs instances
  804. costsByCluster := map[string]*ClusterCosts{}
  805. for id, cd := range costData {
  806. dataMins, ok := dataMinsByCluster[id]
  807. if !ok {
  808. dataMins = mins
  809. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  810. }
  811. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  812. if err != nil {
  813. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  814. return nil, err
  815. }
  816. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  817. costs.CPUBreakdown = cpuBD
  818. }
  819. if ramBD, ok := ramBreakdownMap[id]; ok {
  820. costs.RAMBreakdown = ramBD
  821. }
  822. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  823. if pvUC, ok := pvUsedCostMap[id]; ok {
  824. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  825. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  826. }
  827. costs.DataMinutes = dataMins
  828. costsByCluster[id] = costs
  829. }
  830. return costsByCluster, nil
  831. }
  832. type Totals struct {
  833. TotalCost [][]string `json:"totalcost"`
  834. CPUCost [][]string `json:"cpucost"`
  835. MemCost [][]string `json:"memcost"`
  836. StorageCost [][]string `json:"storageCost"`
  837. }
  838. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  839. if len(qrs) == 0 {
  840. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  841. }
  842. result := qrs[0]
  843. totals := [][]string{}
  844. for _, value := range result.Values {
  845. d0 := fmt.Sprintf("%f", value.Timestamp)
  846. d1 := fmt.Sprintf("%f", value.Value)
  847. toAppend := []string{
  848. d0,
  849. d1,
  850. }
  851. totals = append(totals, toAppend)
  852. }
  853. return totals, nil
  854. }
  855. // ClusterCostsOverTime gives the full cluster costs over time
  856. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  857. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  858. if localStorageQuery != "" {
  859. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  860. }
  861. layout := "2006-01-02T15:04:05.000Z"
  862. start, err := time.Parse(layout, startString)
  863. if err != nil {
  864. klog.V(1).Infof("Error parsing time %s. Error: %s", startString, err.Error())
  865. return nil, err
  866. }
  867. end, err := time.Parse(layout, endString)
  868. if err != nil {
  869. klog.V(1).Infof("Error parsing time %s. Error: %s", endString, err.Error())
  870. return nil, err
  871. }
  872. fmtWindow := timeutil.DurationString(window)
  873. if fmtWindow == "" {
  874. err := fmt.Errorf("window value invalid or missing")
  875. klog.V(1).Infof("Error parsing time %v. Error: %s", window, err.Error())
  876. return nil, err
  877. }
  878. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  879. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  880. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  881. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  882. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  883. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  884. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  885. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  886. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  887. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  888. resultClusterCores, err := resChClusterCores.Await()
  889. if err != nil {
  890. return nil, err
  891. }
  892. resultClusterRAM, err := resChClusterRAM.Await()
  893. if err != nil {
  894. return nil, err
  895. }
  896. resultStorage, err := resChStorage.Await()
  897. if err != nil {
  898. return nil, err
  899. }
  900. resultTotal, err := resChTotal.Await()
  901. if err != nil {
  902. return nil, err
  903. }
  904. coreTotal, err := resultToTotals(resultClusterCores)
  905. if err != nil {
  906. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  907. return nil, err
  908. }
  909. ramTotal, err := resultToTotals(resultClusterRAM)
  910. if err != nil {
  911. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  912. return nil, err
  913. }
  914. storageTotal, err := resultToTotals(resultStorage)
  915. if err != nil {
  916. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  917. }
  918. clusterTotal, err := resultToTotals(resultTotal)
  919. if err != nil {
  920. // If clusterTotal query failed, it's likely because there are no PVs, which
  921. // causes the qTotal query to return no data. Instead, query only node costs.
  922. // If that fails, return an error because something is actually wrong.
  923. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  924. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  925. for _, warning := range warnings {
  926. log.Warningf(warning)
  927. }
  928. if err != nil {
  929. return nil, err
  930. }
  931. clusterTotal, err = resultToTotals(resultNodes)
  932. if err != nil {
  933. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  934. return nil, err
  935. }
  936. }
  937. return &Totals{
  938. TotalCost: clusterTotal,
  939. CPUCost: coreTotal,
  940. MemCost: ramTotal,
  941. StorageCost: storageTotal,
  942. }, nil
  943. }
  944. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
  945. for _, result := range resActiveMins {
  946. cluster, err := result.GetString(env.GetPromClusterLabel())
  947. if err != nil {
  948. cluster = env.GetClusterID()
  949. }
  950. name, err := result.GetString("persistentvolume")
  951. if err != nil {
  952. log.Warningf("ClusterDisks: active mins missing pv name")
  953. continue
  954. }
  955. if len(result.Values) == 0 {
  956. continue
  957. }
  958. key := DiskIdentifier{cluster, name}
  959. if _, ok := diskMap[key]; !ok {
  960. diskMap[key] = &Disk{
  961. Cluster: cluster,
  962. Name: name,
  963. Breakdown: &ClusterCostsBreakdown{},
  964. }
  965. }
  966. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  967. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  968. mins := e.Sub(s).Minutes()
  969. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  970. diskMap[key].End = e
  971. diskMap[key].Start = s
  972. diskMap[key].Minutes = mins
  973. }
  974. for _, result := range resPVSize {
  975. cluster, err := result.GetString(env.GetPromClusterLabel())
  976. if err != nil {
  977. cluster = env.GetClusterID()
  978. }
  979. name, err := result.GetString("persistentvolume")
  980. if err != nil {
  981. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  982. continue
  983. }
  984. // TODO niko/assets storage class
  985. bytes := result.Values[0].Value
  986. key := DiskIdentifier{cluster, name}
  987. if _, ok := diskMap[key]; !ok {
  988. diskMap[key] = &Disk{
  989. Cluster: cluster,
  990. Name: name,
  991. Breakdown: &ClusterCostsBreakdown{},
  992. }
  993. }
  994. diskMap[key].Bytes = bytes
  995. }
  996. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  997. customPricingConfig, err := cp.GetConfig()
  998. if err != nil {
  999. log.Warningf("ClusterDisks: failed to load custom pricing: %s", err)
  1000. }
  1001. for _, result := range resPVCost {
  1002. cluster, err := result.GetString(env.GetPromClusterLabel())
  1003. if err != nil {
  1004. cluster = env.GetClusterID()
  1005. }
  1006. name, err := result.GetString("persistentvolume")
  1007. if err != nil {
  1008. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  1009. continue
  1010. }
  1011. // TODO niko/assets storage class
  1012. var cost float64
  1013. if customPricingEnabled && customPricingConfig != nil {
  1014. customPVCostStr := customPricingConfig.Storage
  1015. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1016. if err != nil {
  1017. log.Warningf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1018. }
  1019. cost = customPVCost
  1020. } else {
  1021. cost = result.Values[0].Value
  1022. }
  1023. key := DiskIdentifier{cluster, name}
  1024. if _, ok := diskMap[key]; !ok {
  1025. diskMap[key] = &Disk{
  1026. Cluster: cluster,
  1027. Name: name,
  1028. Breakdown: &ClusterCostsBreakdown{},
  1029. }
  1030. }
  1031. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1032. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1033. if providerID != "" {
  1034. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1035. }
  1036. }
  1037. }