cluster.go 43 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/kubecost/cost-model/pkg/kubecost"
  7. "github.com/kubecost/cost-model/pkg/util/timeutil"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/log"
  11. "github.com/kubecost/cost-model/pkg/prom"
  12. prometheus "github.com/prometheus/client_golang/api"
  13. )
  14. const (
  15. queryClusterCores = `sum(
  16. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  17. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  18. ) by (%s)`
  19. queryClusterRAM = `sum(
  20. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryStorage = `sum(
  23. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  24. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  25. ) by (%s) %s`
  26. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  27. sum(
  28. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  29. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  30. ) by (%s) %s`
  31. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  32. )
  33. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  34. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  35. // are broken down by cores, memory, and storage.
  36. type ClusterCosts struct {
  37. Start *time.Time `json:"startTime"`
  38. End *time.Time `json:"endTime"`
  39. CPUCumulative float64 `json:"cpuCumulativeCost"`
  40. CPUMonthly float64 `json:"cpuMonthlyCost"`
  41. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  42. GPUCumulative float64 `json:"gpuCumulativeCost"`
  43. GPUMonthly float64 `json:"gpuMonthlyCost"`
  44. RAMCumulative float64 `json:"ramCumulativeCost"`
  45. RAMMonthly float64 `json:"ramMonthlyCost"`
  46. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  47. StorageCumulative float64 `json:"storageCumulativeCost"`
  48. StorageMonthly float64 `json:"storageMonthlyCost"`
  49. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  50. TotalCumulative float64 `json:"totalCumulativeCost"`
  51. TotalMonthly float64 `json:"totalMonthlyCost"`
  52. DataMinutes float64
  53. }
  54. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  55. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  56. type ClusterCostsBreakdown struct {
  57. Idle float64 `json:"idle"`
  58. Other float64 `json:"other"`
  59. System float64 `json:"system"`
  60. User float64 `json:"user"`
  61. }
  62. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  63. // the associated monthly rate data, and returns the Costs.
  64. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  65. start, end := timeutil.ParseTimeRange(window, offset)
  66. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  67. if dataHours == 0 {
  68. dataHours = end.Sub(start).Hours()
  69. }
  70. // Do not allow zero-length windows to prevent divide-by-zero issues
  71. if dataHours == 0 {
  72. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  73. }
  74. cc := &ClusterCosts{
  75. Start: &start,
  76. End: &end,
  77. CPUCumulative: cpu,
  78. GPUCumulative: gpu,
  79. RAMCumulative: ram,
  80. StorageCumulative: storage,
  81. TotalCumulative: cpu + gpu + ram + storage,
  82. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  83. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  84. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  85. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  86. }
  87. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  88. return cc, nil
  89. }
  90. type Disk struct {
  91. Cluster string
  92. Name string
  93. ProviderID string
  94. Cost float64
  95. Bytes float64
  96. Local bool
  97. Start time.Time
  98. End time.Time
  99. Minutes float64
  100. Breakdown *ClusterCostsBreakdown
  101. }
  102. type DiskIdentifier struct {
  103. Cluster string
  104. Name string
  105. }
  106. func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  107. // Query for the duration between start and end
  108. durStr := timeutil.DurationString(end.Sub(start))
  109. if durStr == "" {
  110. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  111. }
  112. // Start from the time "end", querying backwards
  113. t := end
  114. // minsPerResolution determines accuracy and resource use for the following
  115. // queries. Smaller values (higher resolution) result in better accuracy,
  116. // but more expensive queries, and vice-a-versa.
  117. minsPerResolution := 1
  118. resolution := time.Duration(minsPerResolution) * time.Minute
  119. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  120. // value, converts it to a cumulative value; i.e.
  121. // [$/hr] * [min/res]*[hr/min] = [$/res]
  122. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  123. // TODO niko/assets how do we not hard-code this price?
  124. costPerGBHr := 0.04 / 730.0
  125. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  126. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  127. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  128. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  129. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  130. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  131. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  132. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  133. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  134. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  135. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  136. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  137. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  138. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  139. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  140. resPVCost, _ := resChPVCost.Await()
  141. resPVSize, _ := resChPVSize.Await()
  142. resActiveMins, _ := resChActiveMins.Await()
  143. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  144. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  145. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  146. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  147. if ctx.HasErrors() {
  148. return nil, ctx.ErrorCollection()
  149. }
  150. diskMap := map[DiskIdentifier]*Disk{}
  151. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
  152. for _, result := range resLocalStorageCost {
  153. cluster, err := result.GetString(env.GetPromClusterLabel())
  154. if err != nil {
  155. cluster = env.GetClusterID()
  156. }
  157. name, err := result.GetString("instance")
  158. if err != nil {
  159. log.Warnf("ClusterDisks: local storage data missing instance")
  160. continue
  161. }
  162. cost := result.Values[0].Value
  163. key := DiskIdentifier{cluster, name}
  164. if _, ok := diskMap[key]; !ok {
  165. diskMap[key] = &Disk{
  166. Cluster: cluster,
  167. Name: name,
  168. Breakdown: &ClusterCostsBreakdown{},
  169. Local: true,
  170. }
  171. }
  172. diskMap[key].Cost += cost
  173. }
  174. for _, result := range resLocalStorageUsedCost {
  175. cluster, err := result.GetString(env.GetPromClusterLabel())
  176. if err != nil {
  177. cluster = env.GetClusterID()
  178. }
  179. name, err := result.GetString("instance")
  180. if err != nil {
  181. log.Warnf("ClusterDisks: local storage usage data missing instance")
  182. continue
  183. }
  184. cost := result.Values[0].Value
  185. key := DiskIdentifier{cluster, name}
  186. if _, ok := diskMap[key]; !ok {
  187. diskMap[key] = &Disk{
  188. Cluster: cluster,
  189. Name: name,
  190. Breakdown: &ClusterCostsBreakdown{},
  191. Local: true,
  192. }
  193. }
  194. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  195. }
  196. for _, result := range resLocalStorageBytes {
  197. cluster, err := result.GetString(env.GetPromClusterLabel())
  198. if err != nil {
  199. cluster = env.GetClusterID()
  200. }
  201. name, err := result.GetString("instance")
  202. if err != nil {
  203. log.Warnf("ClusterDisks: local storage data missing instance")
  204. continue
  205. }
  206. bytes := result.Values[0].Value
  207. key := DiskIdentifier{cluster, name}
  208. if _, ok := diskMap[key]; !ok {
  209. diskMap[key] = &Disk{
  210. Cluster: cluster,
  211. Name: name,
  212. Breakdown: &ClusterCostsBreakdown{},
  213. Local: true,
  214. }
  215. }
  216. diskMap[key].Bytes = bytes
  217. if bytes/1024/1024/1024 > maxLocalDiskSize {
  218. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  219. delete(diskMap, key)
  220. }
  221. }
  222. for _, result := range resLocalActiveMins {
  223. cluster, err := result.GetString(env.GetPromClusterLabel())
  224. if err != nil {
  225. cluster = env.GetClusterID()
  226. }
  227. name, err := result.GetString("node")
  228. if err != nil {
  229. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  230. continue
  231. }
  232. key := DiskIdentifier{cluster, name}
  233. if _, ok := diskMap[key]; !ok {
  234. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  235. continue
  236. }
  237. if len(result.Values) == 0 {
  238. continue
  239. }
  240. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  241. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  242. mins := e.Sub(s).Minutes()
  243. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  244. diskMap[key].End = e
  245. diskMap[key].Start = s
  246. diskMap[key].Minutes = mins
  247. }
  248. for _, disk := range diskMap {
  249. // Apply all remaining RAM to Idle
  250. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  251. // Set provider Id to the name for reconciliation on Azure
  252. if fmt.Sprintf("%T", provider) == "*provider.Azure" {
  253. if disk.ProviderID == "" {
  254. disk.ProviderID = disk.Name
  255. }
  256. }
  257. }
  258. return diskMap, nil
  259. }
  260. type Node struct {
  261. Cluster string
  262. Name string
  263. ProviderID string
  264. NodeType string
  265. CPUCost float64
  266. CPUCores float64
  267. GPUCost float64
  268. GPUCount float64
  269. RAMCost float64
  270. RAMBytes float64
  271. Discount float64
  272. Preemptible bool
  273. CPUBreakdown *ClusterCostsBreakdown
  274. RAMBreakdown *ClusterCostsBreakdown
  275. Start time.Time
  276. End time.Time
  277. Minutes float64
  278. Labels map[string]string
  279. CostPerCPUHr float64
  280. CostPerRAMGiBHr float64
  281. CostPerGPUHr float64
  282. }
  283. // GKE lies about the number of cores e2 nodes have. This table
  284. // contains a mapping from node type -> actual CPU cores
  285. // for those cases.
  286. var partialCPUMap = map[string]float64{
  287. "e2-micro": 0.25,
  288. "e2-small": 0.5,
  289. "e2-medium": 1.0,
  290. }
  291. type NodeIdentifier struct {
  292. Cluster string
  293. Name string
  294. ProviderID string
  295. }
  296. type nodeIdentifierNoProviderID struct {
  297. Cluster string
  298. Name string
  299. }
  300. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  301. for k, v := range activeDataMap {
  302. keyNon := nodeIdentifierNoProviderID{
  303. Cluster: k.Cluster,
  304. Name: k.Name,
  305. }
  306. if cost, ok := costMap[k]; ok {
  307. minutes := v.minutes
  308. count := 1.0
  309. if c, ok := resourceCountMap[keyNon]; ok {
  310. count = c
  311. }
  312. costMap[k] = cost * (minutes / 60) * count
  313. }
  314. }
  315. }
  316. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  317. for k, v := range activeDataMap {
  318. if cost, ok := costMap[k]; ok {
  319. minutes := v.minutes
  320. costMap[k] = cost * (minutes / 60)
  321. }
  322. }
  323. }
  324. func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  325. // Query for the duration between start and end
  326. durStr := timeutil.DurationString(end.Sub(start))
  327. if durStr == "" {
  328. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  329. }
  330. // Start from the time "end", querying backwards
  331. t := end
  332. // minsPerResolution determines accuracy and resource use for the following
  333. // queries. Smaller values (higher resolution) result in better accuracy,
  334. // but more expensive queries, and vice-a-versa.
  335. minsPerResolution := 1
  336. resolution := time.Duration(minsPerResolution) * time.Minute
  337. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  338. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  339. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  340. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  341. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  342. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  343. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  344. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  345. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  346. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  347. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  348. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  349. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  350. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  351. // Return errors if these fail
  352. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  353. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  354. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  355. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  356. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  357. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  358. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  359. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  360. // Do not return errors if these fail, but log warnings
  361. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  362. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  363. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  364. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  365. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  366. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  367. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  368. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  369. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  370. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  371. resIsSpot, _ := resChIsSpot.Await()
  372. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  373. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  374. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  375. resActiveMins, _ := resChActiveMins.Await()
  376. resLabels, _ := resChLabels.Await()
  377. if optionalCtx.HasErrors() {
  378. for _, err := range optionalCtx.Errors() {
  379. log.Warnf("ClusterNodes: %s", err)
  380. }
  381. }
  382. if requiredCtx.HasErrors() {
  383. for _, err := range requiredCtx.Errors() {
  384. log.Errorf("ClusterNodes: %s", err)
  385. }
  386. return nil, requiredCtx.ErrorCollection()
  387. }
  388. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  389. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  390. preemptibleMap := buildPreemptibleMap(resIsSpot)
  391. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  392. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  393. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  394. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  395. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  396. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  397. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  398. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  399. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  400. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  401. labelsMap := buildLabelsMap(resLabels)
  402. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  403. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  404. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  405. nodeMap := buildNodeMap(
  406. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  407. cpuCoresMap, ramBytesMap, ramUserPctMap,
  408. ramSystemPctMap,
  409. cpuBreakdownMap,
  410. activeDataMap,
  411. preemptibleMap,
  412. labelsMap,
  413. clusterAndNameToType,
  414. resolution,
  415. )
  416. c, err := cp.GetConfig()
  417. if err != nil {
  418. return nil, err
  419. }
  420. discount, err := ParsePercentString(c.Discount)
  421. if err != nil {
  422. return nil, err
  423. }
  424. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  425. if err != nil {
  426. return nil, err
  427. }
  428. for _, node := range nodeMap {
  429. // TODO take GKE Reserved Instances into account
  430. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  431. // Apply all remaining resources to Idle
  432. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  433. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  434. }
  435. return nodeMap, nil
  436. }
  437. type LoadBalancerIdentifier struct {
  438. Cluster string
  439. Namespace string
  440. Name string
  441. }
  442. type LoadBalancer struct {
  443. Cluster string
  444. Namespace string
  445. Name string
  446. ProviderID string
  447. Cost float64
  448. Start time.Time
  449. End time.Time
  450. Minutes float64
  451. }
  452. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  453. // Query for the duration between start and end
  454. durStr := timeutil.DurationString(end.Sub(start))
  455. if durStr == "" {
  456. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  457. }
  458. // Start from the time "end", querying backwards
  459. t := end
  460. // minsPerResolution determines accuracy and resource use for the following
  461. // queries. Smaller values (higher resolution) result in better accuracy,
  462. // but more expensive queries, and vice-a-versa.
  463. minsPerResolution := 1
  464. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  465. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  466. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  467. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  468. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  469. resLBCost, _ := resChLBCost.Await()
  470. resActiveMins, _ := resChActiveMins.Await()
  471. if ctx.HasErrors() {
  472. return nil, ctx.ErrorCollection()
  473. }
  474. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  475. for _, result := range resActiveMins {
  476. cluster, err := result.GetString(env.GetPromClusterLabel())
  477. if err != nil {
  478. cluster = env.GetClusterID()
  479. }
  480. namespace, err := result.GetString("namespace")
  481. if err != nil {
  482. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  483. continue
  484. }
  485. name, err := result.GetString("service_name")
  486. if err != nil {
  487. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  488. continue
  489. }
  490. providerID, err := result.GetString("ingress_ip")
  491. if err != nil {
  492. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  493. providerID = ""
  494. }
  495. key := LoadBalancerIdentifier{
  496. Cluster: cluster,
  497. Namespace: namespace,
  498. Name: name,
  499. }
  500. // Skip if there are no data
  501. if len(result.Values) == 0 {
  502. continue
  503. }
  504. // Add load balancer to the set of load balancers
  505. if _, ok := loadBalancerMap[key]; !ok {
  506. loadBalancerMap[key] = &LoadBalancer{
  507. Cluster: cluster,
  508. Namespace: namespace,
  509. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  510. ProviderID: cloud.ParseLBID(providerID),
  511. }
  512. }
  513. // Append start, end, and minutes. This should come before all other data.
  514. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  515. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  516. loadBalancerMap[key].Start = s
  517. loadBalancerMap[key].End = e
  518. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  519. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  520. // Prevents there from being a duplicate LoadBalancers on the same day
  521. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  522. loadBalancerMap[key].ProviderID = providerID
  523. }
  524. }
  525. for _, result := range resLBCost {
  526. cluster, err := result.GetString(env.GetPromClusterLabel())
  527. if err != nil {
  528. cluster = env.GetClusterID()
  529. }
  530. namespace, err := result.GetString("namespace")
  531. if err != nil {
  532. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  533. continue
  534. }
  535. name, err := result.GetString("service_name")
  536. if err != nil {
  537. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  538. continue
  539. }
  540. key := LoadBalancerIdentifier{
  541. Cluster: cluster,
  542. Namespace: namespace,
  543. Name: name,
  544. }
  545. // Apply cost as price-per-hour * hours
  546. if lb, ok := loadBalancerMap[key]; ok {
  547. lbPricePerHr := result.Values[0].Value
  548. hrs := lb.Minutes / 60.0
  549. lb.Cost += lbPricePerHr * hrs
  550. } else {
  551. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  552. }
  553. }
  554. return loadBalancerMap, nil
  555. }
  556. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  557. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  558. if window < 10*time.Minute {
  559. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  560. }
  561. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  562. start, end := timeutil.ParseTimeRange(window, offset)
  563. mins := end.Sub(start).Minutes()
  564. windowStr := timeutil.DurationString(window)
  565. // minsPerResolution determines accuracy and resource use for the following
  566. // queries. Smaller values (higher resolution) result in better accuracy,
  567. // but more expensive queries, and vice-a-versa.
  568. minsPerResolution := 5
  569. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  570. // value, converts it to a cumulative value; i.e.
  571. // [$/hr] * [min/res]*[hr/min] = [$/res]
  572. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  573. const fmtQueryDataCount = `
  574. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  575. `
  576. const fmtQueryTotalGPU = `
  577. sum(
  578. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  579. ) by (%s)
  580. `
  581. const fmtQueryTotalCPU = `
  582. sum(
  583. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  584. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  585. ) by (%s)
  586. `
  587. const fmtQueryTotalRAM = `
  588. sum(
  589. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  590. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  591. ) by (%s)
  592. `
  593. const fmtQueryTotalStorage = `
  594. sum(
  595. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  596. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  597. ) by (%s)
  598. `
  599. const fmtQueryCPUModePct = `
  600. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  601. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  602. `
  603. const fmtQueryRAMSystemPct = `
  604. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  605. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  606. `
  607. const fmtQueryRAMUserPct = `
  608. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  609. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  610. `
  611. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  612. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  613. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  614. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  615. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  616. if queryTotalLocalStorage != "" {
  617. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  618. }
  619. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  620. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  621. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  622. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  623. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  624. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  625. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  626. resChs := ctx.QueryAll(
  627. queryDataCount,
  628. queryTotalGPU,
  629. queryTotalCPU,
  630. queryTotalRAM,
  631. queryTotalStorage,
  632. )
  633. // Only submit the local storage query if it is valid. Otherwise Prometheus
  634. // will return errors. Always append something to resChs, regardless, to
  635. // maintain indexing.
  636. if queryTotalLocalStorage != "" {
  637. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  638. } else {
  639. resChs = append(resChs, nil)
  640. }
  641. if withBreakdown {
  642. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  643. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  644. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  645. bdResChs := ctx.QueryAll(
  646. queryCPUModePct,
  647. queryRAMSystemPct,
  648. queryRAMUserPct,
  649. )
  650. // Only submit the local storage query if it is valid. Otherwise Prometheus
  651. // will return errors. Always append something to resChs, regardless, to
  652. // maintain indexing.
  653. if queryUsedLocalStorage != "" {
  654. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  655. } else {
  656. bdResChs = append(bdResChs, nil)
  657. }
  658. resChs = append(resChs, bdResChs...)
  659. }
  660. resDataCount, _ := resChs[0].Await()
  661. resTotalGPU, _ := resChs[1].Await()
  662. resTotalCPU, _ := resChs[2].Await()
  663. resTotalRAM, _ := resChs[3].Await()
  664. resTotalStorage, _ := resChs[4].Await()
  665. if ctx.HasErrors() {
  666. return nil, ctx.ErrorCollection()
  667. }
  668. defaultClusterID := env.GetClusterID()
  669. dataMinsByCluster := map[string]float64{}
  670. for _, result := range resDataCount {
  671. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  672. if clusterID == "" {
  673. clusterID = defaultClusterID
  674. }
  675. dataMins := mins
  676. if len(result.Values) > 0 {
  677. dataMins = result.Values[0].Value
  678. } else {
  679. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  680. }
  681. dataMinsByCluster[clusterID] = dataMins
  682. }
  683. // Determine combined discount
  684. discount, customDiscount := 0.0, 0.0
  685. c, err := a.CloudProvider.GetConfig()
  686. if err == nil {
  687. discount, err = ParsePercentString(c.Discount)
  688. if err != nil {
  689. discount = 0.0
  690. }
  691. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  692. if err != nil {
  693. customDiscount = 0.0
  694. }
  695. }
  696. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  697. costData := make(map[string]map[string]float64)
  698. // Helper function to iterate over Prom query results, parsing the raw values into
  699. // the intermediate costData structure.
  700. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  701. for _, result := range results {
  702. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  703. if clusterID == "" {
  704. clusterID = defaultClusterID
  705. }
  706. if _, ok := costData[clusterID]; !ok {
  707. costData[clusterID] = map[string]float64{}
  708. }
  709. if len(result.Values) > 0 {
  710. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  711. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  712. }
  713. }
  714. }
  715. // Apply both sustained use and custom discounts to RAM and CPU
  716. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  717. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  718. // Apply only custom discount to GPU and storage
  719. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  720. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  721. if queryTotalLocalStorage != "" {
  722. resTotalLocalStorage, err := resChs[5].Await()
  723. if err != nil {
  724. return nil, err
  725. }
  726. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  727. }
  728. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  729. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  730. pvUsedCostMap := map[string]float64{}
  731. if withBreakdown {
  732. resCPUModePct, _ := resChs[6].Await()
  733. resRAMSystemPct, _ := resChs[7].Await()
  734. resRAMUserPct, _ := resChs[8].Await()
  735. if ctx.HasErrors() {
  736. return nil, ctx.ErrorCollection()
  737. }
  738. for _, result := range resCPUModePct {
  739. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  740. if clusterID == "" {
  741. clusterID = defaultClusterID
  742. }
  743. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  744. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  745. }
  746. cpuBD := cpuBreakdownMap[clusterID]
  747. mode, err := result.GetString("mode")
  748. if err != nil {
  749. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  750. mode = "other"
  751. }
  752. switch mode {
  753. case "idle":
  754. cpuBD.Idle += result.Values[0].Value
  755. case "system":
  756. cpuBD.System += result.Values[0].Value
  757. case "user":
  758. cpuBD.User += result.Values[0].Value
  759. default:
  760. cpuBD.Other += result.Values[0].Value
  761. }
  762. }
  763. for _, result := range resRAMSystemPct {
  764. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  765. if clusterID == "" {
  766. clusterID = defaultClusterID
  767. }
  768. if _, ok := ramBreakdownMap[clusterID]; !ok {
  769. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  770. }
  771. ramBD := ramBreakdownMap[clusterID]
  772. ramBD.System += result.Values[0].Value
  773. }
  774. for _, result := range resRAMUserPct {
  775. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  776. if clusterID == "" {
  777. clusterID = defaultClusterID
  778. }
  779. if _, ok := ramBreakdownMap[clusterID]; !ok {
  780. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  781. }
  782. ramBD := ramBreakdownMap[clusterID]
  783. ramBD.User += result.Values[0].Value
  784. }
  785. for _, ramBD := range ramBreakdownMap {
  786. remaining := 1.0
  787. remaining -= ramBD.Other
  788. remaining -= ramBD.System
  789. remaining -= ramBD.User
  790. ramBD.Idle = remaining
  791. }
  792. if queryUsedLocalStorage != "" {
  793. resUsedLocalStorage, err := resChs[9].Await()
  794. if err != nil {
  795. return nil, err
  796. }
  797. for _, result := range resUsedLocalStorage {
  798. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  799. if clusterID == "" {
  800. clusterID = defaultClusterID
  801. }
  802. pvUsedCostMap[clusterID] += result.Values[0].Value
  803. }
  804. }
  805. }
  806. if ctx.HasErrors() {
  807. for _, err := range ctx.Errors() {
  808. log.Errorf("ComputeClusterCosts: %s", err)
  809. }
  810. return nil, ctx.ErrorCollection()
  811. }
  812. // Convert intermediate structure to Costs instances
  813. costsByCluster := map[string]*ClusterCosts{}
  814. for id, cd := range costData {
  815. dataMins, ok := dataMinsByCluster[id]
  816. if !ok {
  817. dataMins = mins
  818. log.Warnf("Cluster cost data count not found for cluster %s", id)
  819. }
  820. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  821. if err != nil {
  822. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  823. return nil, err
  824. }
  825. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  826. costs.CPUBreakdown = cpuBD
  827. }
  828. if ramBD, ok := ramBreakdownMap[id]; ok {
  829. costs.RAMBreakdown = ramBD
  830. }
  831. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  832. if pvUC, ok := pvUsedCostMap[id]; ok {
  833. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  834. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  835. }
  836. costs.DataMinutes = dataMins
  837. costsByCluster[id] = costs
  838. }
  839. return costsByCluster, nil
  840. }
  841. type Totals struct {
  842. TotalCost [][]string `json:"totalcost"`
  843. CPUCost [][]string `json:"cpucost"`
  844. MemCost [][]string `json:"memcost"`
  845. StorageCost [][]string `json:"storageCost"`
  846. }
  847. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  848. if len(qrs) == 0 {
  849. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  850. }
  851. result := qrs[0]
  852. totals := [][]string{}
  853. for _, value := range result.Values {
  854. d0 := fmt.Sprintf("%f", value.Timestamp)
  855. d1 := fmt.Sprintf("%f", value.Value)
  856. toAppend := []string{
  857. d0,
  858. d1,
  859. }
  860. totals = append(totals, toAppend)
  861. }
  862. return totals, nil
  863. }
  864. // ClusterCostsOverTime gives the full cluster costs over time
  865. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  866. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  867. if localStorageQuery != "" {
  868. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  869. }
  870. layout := "2006-01-02T15:04:05.000Z"
  871. start, err := time.Parse(layout, startString)
  872. if err != nil {
  873. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  874. return nil, err
  875. }
  876. end, err := time.Parse(layout, endString)
  877. if err != nil {
  878. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  879. return nil, err
  880. }
  881. fmtWindow := timeutil.DurationString(window)
  882. if fmtWindow == "" {
  883. err := fmt.Errorf("window value invalid or missing")
  884. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  885. return nil, err
  886. }
  887. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  888. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  889. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  890. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  891. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  892. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  893. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  894. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  895. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  896. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  897. resultClusterCores, err := resChClusterCores.Await()
  898. if err != nil {
  899. return nil, err
  900. }
  901. resultClusterRAM, err := resChClusterRAM.Await()
  902. if err != nil {
  903. return nil, err
  904. }
  905. resultStorage, err := resChStorage.Await()
  906. if err != nil {
  907. return nil, err
  908. }
  909. resultTotal, err := resChTotal.Await()
  910. if err != nil {
  911. return nil, err
  912. }
  913. coreTotal, err := resultToTotals(resultClusterCores)
  914. if err != nil {
  915. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  916. return nil, err
  917. }
  918. ramTotal, err := resultToTotals(resultClusterRAM)
  919. if err != nil {
  920. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  921. return nil, err
  922. }
  923. storageTotal, err := resultToTotals(resultStorage)
  924. if err != nil {
  925. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  926. }
  927. clusterTotal, err := resultToTotals(resultTotal)
  928. if err != nil {
  929. // If clusterTotal query failed, it's likely because there are no PVs, which
  930. // causes the qTotal query to return no data. Instead, query only node costs.
  931. // If that fails, return an error because something is actually wrong.
  932. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  933. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  934. for _, warning := range warnings {
  935. log.Warnf(warning)
  936. }
  937. if err != nil {
  938. return nil, err
  939. }
  940. clusterTotal, err = resultToTotals(resultNodes)
  941. if err != nil {
  942. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  943. return nil, err
  944. }
  945. }
  946. return &Totals{
  947. TotalCost: clusterTotal,
  948. CPUCost: coreTotal,
  949. MemCost: ramTotal,
  950. StorageCost: storageTotal,
  951. }, nil
  952. }
  953. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
  954. for _, result := range resActiveMins {
  955. cluster, err := result.GetString(env.GetPromClusterLabel())
  956. if err != nil {
  957. cluster = env.GetClusterID()
  958. }
  959. name, err := result.GetString("persistentvolume")
  960. if err != nil {
  961. log.Warnf("ClusterDisks: active mins missing pv name")
  962. continue
  963. }
  964. if len(result.Values) == 0 {
  965. continue
  966. }
  967. key := DiskIdentifier{cluster, name}
  968. if _, ok := diskMap[key]; !ok {
  969. diskMap[key] = &Disk{
  970. Cluster: cluster,
  971. Name: name,
  972. Breakdown: &ClusterCostsBreakdown{},
  973. }
  974. }
  975. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  976. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  977. mins := e.Sub(s).Minutes()
  978. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  979. diskMap[key].End = e
  980. diskMap[key].Start = s
  981. diskMap[key].Minutes = mins
  982. }
  983. for _, result := range resPVSize {
  984. cluster, err := result.GetString(env.GetPromClusterLabel())
  985. if err != nil {
  986. cluster = env.GetClusterID()
  987. }
  988. name, err := result.GetString("persistentvolume")
  989. if err != nil {
  990. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  991. continue
  992. }
  993. // TODO niko/assets storage class
  994. bytes := result.Values[0].Value
  995. key := DiskIdentifier{cluster, name}
  996. if _, ok := diskMap[key]; !ok {
  997. diskMap[key] = &Disk{
  998. Cluster: cluster,
  999. Name: name,
  1000. Breakdown: &ClusterCostsBreakdown{},
  1001. }
  1002. }
  1003. diskMap[key].Bytes = bytes
  1004. }
  1005. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  1006. customPricingConfig, err := cp.GetConfig()
  1007. if err != nil {
  1008. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1009. }
  1010. for _, result := range resPVCost {
  1011. cluster, err := result.GetString(env.GetPromClusterLabel())
  1012. if err != nil {
  1013. cluster = env.GetClusterID()
  1014. }
  1015. name, err := result.GetString("persistentvolume")
  1016. if err != nil {
  1017. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1018. continue
  1019. }
  1020. // TODO niko/assets storage class
  1021. var cost float64
  1022. if customPricingEnabled && customPricingConfig != nil {
  1023. customPVCostStr := customPricingConfig.Storage
  1024. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1025. if err != nil {
  1026. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1027. }
  1028. cost = customPVCost
  1029. } else {
  1030. cost = result.Values[0].Value
  1031. }
  1032. key := DiskIdentifier{cluster, name}
  1033. if _, ok := diskMap[key]; !ok {
  1034. diskMap[key] = &Disk{
  1035. Cluster: cluster,
  1036. Name: name,
  1037. Breakdown: &ClusterCostsBreakdown{},
  1038. }
  1039. }
  1040. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1041. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1042. if providerID != "" {
  1043. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1044. }
  1045. }
  1046. }