cluster.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/opencost/opencost/pkg/kubecost"
  7. "github.com/opencost/opencost/pkg/util/timeutil"
  8. "golang.org/x/exp/slices"
  9. "github.com/opencost/opencost/pkg/cloud"
  10. "github.com/opencost/opencost/pkg/env"
  11. "github.com/opencost/opencost/pkg/log"
  12. "github.com/opencost/opencost/pkg/prom"
  13. prometheus "github.com/prometheus/client_golang/api"
  14. )
  15. const (
  16. queryClusterCores = `sum(
  17. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  18. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  19. ) by (%s)`
  20. queryClusterRAM = `sum(
  21. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  22. ) by (%s)`
  23. queryStorage = `sum(
  24. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  26. ) by (%s) %s`
  27. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  28. sum(
  29. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  30. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  31. ) by (%s) %s`
  32. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  33. )
  34. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  35. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  36. // are broken down by cores, memory, and storage.
  37. type ClusterCosts struct {
  38. Start *time.Time `json:"startTime"`
  39. End *time.Time `json:"endTime"`
  40. CPUCumulative float64 `json:"cpuCumulativeCost"`
  41. CPUMonthly float64 `json:"cpuMonthlyCost"`
  42. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  43. GPUCumulative float64 `json:"gpuCumulativeCost"`
  44. GPUMonthly float64 `json:"gpuMonthlyCost"`
  45. RAMCumulative float64 `json:"ramCumulativeCost"`
  46. RAMMonthly float64 `json:"ramMonthlyCost"`
  47. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  48. StorageCumulative float64 `json:"storageCumulativeCost"`
  49. StorageMonthly float64 `json:"storageMonthlyCost"`
  50. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  51. TotalCumulative float64 `json:"totalCumulativeCost"`
  52. TotalMonthly float64 `json:"totalMonthlyCost"`
  53. DataMinutes float64
  54. }
  55. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  56. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  57. type ClusterCostsBreakdown struct {
  58. Idle float64 `json:"idle"`
  59. Other float64 `json:"other"`
  60. System float64 `json:"system"`
  61. User float64 `json:"user"`
  62. }
  63. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  64. // the associated monthly rate data, and returns the Costs.
  65. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  66. start, end := timeutil.ParseTimeRange(window, offset)
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: &start,
  77. End: &end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. StorageClass string
  96. Cost float64
  97. Bytes float64
  98. Local bool
  99. Start time.Time
  100. End time.Time
  101. Minutes float64
  102. Breakdown *ClusterCostsBreakdown
  103. }
  104. type DiskIdentifier struct {
  105. Cluster string
  106. Name string
  107. }
  108. func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  109. // Query for the duration between start and end
  110. durStr := timeutil.DurationString(end.Sub(start))
  111. if durStr == "" {
  112. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  113. }
  114. // Start from the time "end", querying backwards
  115. t := end
  116. // minsPerResolution determines accuracy and resource use for the following
  117. // queries. Smaller values (higher resolution) result in better accuracy,
  118. // but more expensive queries, and vice-a-versa.
  119. resolution := env.GetETLResolution()
  120. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  121. var minsPerResolution int
  122. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  123. minsPerResolution = 1
  124. log.DedupedWarningf(3, "ClusterDisks(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  125. }
  126. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  127. // value, converts it to a cumulative value; i.e.
  128. // [$/hr] * [min/res]*[hr/min] = [$/res]
  129. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  130. // TODO niko/assets how do we not hard-code this price?
  131. costPerGBHr := 0.04 / 730.0
  132. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  133. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  134. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  135. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  136. queryPVStorageClass := fmt.Sprintf(`avg(avg_over_time(kubecost_pv_info[%s])) by (%s, persistentvolume, storageclass)`, durStr, env.GetPromClusterLabel())
  137. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  138. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  139. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  140. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  141. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  142. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  143. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  144. resChPVStorageClass := ctx.QueryAtTime(queryPVStorageClass, t)
  145. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  146. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  147. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  148. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  149. resPVCost, _ := resChPVCost.Await()
  150. resPVSize, _ := resChPVSize.Await()
  151. resActiveMins, _ := resChActiveMins.Await()
  152. resPVStorageClass, _ := resChPVStorageClass.Await()
  153. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  154. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  155. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  156. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  157. if ctx.HasErrors() {
  158. return nil, ctx.ErrorCollection()
  159. }
  160. diskMap := map[DiskIdentifier]*Disk{}
  161. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
  162. for _, result := range resLocalStorageCost {
  163. cluster, err := result.GetString(env.GetPromClusterLabel())
  164. if err != nil {
  165. cluster = env.GetClusterID()
  166. }
  167. name, err := result.GetString("instance")
  168. if err != nil {
  169. log.Warnf("ClusterDisks: local storage data missing instance")
  170. continue
  171. }
  172. cost := result.Values[0].Value
  173. key := DiskIdentifier{cluster, name}
  174. if _, ok := diskMap[key]; !ok {
  175. diskMap[key] = &Disk{
  176. Cluster: cluster,
  177. Name: name,
  178. Breakdown: &ClusterCostsBreakdown{},
  179. Local: true,
  180. }
  181. }
  182. diskMap[key].Cost += cost
  183. //Assigning explicitly the storage class of local storage to local
  184. diskMap[key].StorageClass = kubecost.LocalStorageClass
  185. }
  186. for _, result := range resLocalStorageUsedCost {
  187. cluster, err := result.GetString(env.GetPromClusterLabel())
  188. if err != nil {
  189. cluster = env.GetClusterID()
  190. }
  191. name, err := result.GetString("instance")
  192. if err != nil {
  193. log.Warnf("ClusterDisks: local storage usage data missing instance")
  194. continue
  195. }
  196. cost := result.Values[0].Value
  197. key := DiskIdentifier{cluster, name}
  198. if _, ok := diskMap[key]; !ok {
  199. diskMap[key] = &Disk{
  200. Cluster: cluster,
  201. Name: name,
  202. Breakdown: &ClusterCostsBreakdown{},
  203. Local: true,
  204. }
  205. }
  206. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  207. }
  208. for _, result := range resLocalStorageBytes {
  209. cluster, err := result.GetString(env.GetPromClusterLabel())
  210. if err != nil {
  211. cluster = env.GetClusterID()
  212. }
  213. name, err := result.GetString("instance")
  214. if err != nil {
  215. log.Warnf("ClusterDisks: local storage data missing instance")
  216. continue
  217. }
  218. bytes := result.Values[0].Value
  219. key := DiskIdentifier{cluster, name}
  220. if _, ok := diskMap[key]; !ok {
  221. diskMap[key] = &Disk{
  222. Cluster: cluster,
  223. Name: name,
  224. Breakdown: &ClusterCostsBreakdown{},
  225. Local: true,
  226. }
  227. }
  228. diskMap[key].Bytes = bytes
  229. if bytes/1024/1024/1024 > maxLocalDiskSize {
  230. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  231. delete(diskMap, key)
  232. }
  233. }
  234. for _, result := range resLocalActiveMins {
  235. cluster, err := result.GetString(env.GetPromClusterLabel())
  236. if err != nil {
  237. cluster = env.GetClusterID()
  238. }
  239. name, err := result.GetString("node")
  240. if err != nil {
  241. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  242. continue
  243. }
  244. key := DiskIdentifier{cluster, name}
  245. if _, ok := diskMap[key]; !ok {
  246. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  247. continue
  248. }
  249. if len(result.Values) == 0 {
  250. continue
  251. }
  252. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  253. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  254. mins := e.Sub(s).Minutes()
  255. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  256. diskMap[key].End = e
  257. diskMap[key].Start = s
  258. diskMap[key].Minutes = mins
  259. }
  260. var unTracedDiskLogData []DiskIdentifier
  261. //Iterating through Persistent Volume given by custom metrics kubecost_pv_info and assign the storage class if known and __unknown__ if not populated.
  262. for _, result := range resPVStorageClass {
  263. cluster, err := result.GetString(env.GetPromClusterLabel())
  264. if err != nil {
  265. cluster = env.GetClusterID()
  266. }
  267. name, _ := result.GetString("persistentvolume")
  268. key := DiskIdentifier{cluster, name}
  269. if _, ok := diskMap[key]; !ok {
  270. if !slices.Contains(unTracedDiskLogData, key) {
  271. unTracedDiskLogData = append(unTracedDiskLogData, key)
  272. }
  273. continue
  274. }
  275. if len(result.Values) == 0 {
  276. continue
  277. }
  278. storageClass, err := result.GetString("storageclass")
  279. if err != nil {
  280. diskMap[key].StorageClass = kubecost.UnknownStorageClass
  281. } else {
  282. diskMap[key].StorageClass = storageClass
  283. }
  284. }
  285. // Logging the unidentified disk information outside the loop
  286. for _, unIdentifiedDisk := range unTracedDiskLogData {
  287. log.Warnf("ClusterDisks: Cluster %s has Storage Class information for unidentified disk %s or disk deleted from analysis", unIdentifiedDisk.Cluster, unIdentifiedDisk.Name)
  288. }
  289. for _, disk := range diskMap {
  290. // Apply all remaining RAM to Idle
  291. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  292. // Set provider Id to the name for reconciliation
  293. if disk.ProviderID == "" {
  294. disk.ProviderID = disk.Name
  295. }
  296. }
  297. return diskMap, nil
  298. }
  299. type Node struct {
  300. Cluster string
  301. Name string
  302. ProviderID string
  303. NodeType string
  304. CPUCost float64
  305. CPUCores float64
  306. GPUCost float64
  307. GPUCount float64
  308. RAMCost float64
  309. RAMBytes float64
  310. Discount float64
  311. Preemptible bool
  312. CPUBreakdown *ClusterCostsBreakdown
  313. RAMBreakdown *ClusterCostsBreakdown
  314. Start time.Time
  315. End time.Time
  316. Minutes float64
  317. Labels map[string]string
  318. CostPerCPUHr float64
  319. CostPerRAMGiBHr float64
  320. CostPerGPUHr float64
  321. }
  322. // GKE lies about the number of cores e2 nodes have. This table
  323. // contains a mapping from node type -> actual CPU cores
  324. // for those cases.
  325. var partialCPUMap = map[string]float64{
  326. "e2-micro": 0.25,
  327. "e2-small": 0.5,
  328. "e2-medium": 1.0,
  329. }
  330. type NodeIdentifier struct {
  331. Cluster string
  332. Name string
  333. ProviderID string
  334. }
  335. type nodeIdentifierNoProviderID struct {
  336. Cluster string
  337. Name string
  338. }
  339. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  340. for k, v := range activeDataMap {
  341. keyNon := nodeIdentifierNoProviderID{
  342. Cluster: k.Cluster,
  343. Name: k.Name,
  344. }
  345. if cost, ok := costMap[k]; ok {
  346. minutes := v.minutes
  347. count := 1.0
  348. if c, ok := resourceCountMap[keyNon]; ok {
  349. count = c
  350. }
  351. costMap[k] = cost * (minutes / 60) * count
  352. }
  353. }
  354. }
  355. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  356. for k, v := range activeDataMap {
  357. if cost, ok := costMap[k]; ok {
  358. minutes := v.minutes
  359. costMap[k] = cost * (minutes / 60)
  360. }
  361. }
  362. }
  363. func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  364. // Query for the duration between start and end
  365. durStr := timeutil.DurationString(end.Sub(start))
  366. if durStr == "" {
  367. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  368. }
  369. // Start from the time "end", querying backwards
  370. t := end
  371. // minsPerResolution determines accuracy and resource use for the following
  372. // queries. Smaller values (higher resolution) result in better accuracy,
  373. // but more expensive queries, and vice-a-versa.
  374. resolution := env.GetETLResolution()
  375. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  376. var minsPerResolution int
  377. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  378. minsPerResolution = 1
  379. log.DedupedWarningf(3, "ClusterNodes(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  380. }
  381. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  382. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  383. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  384. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  385. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  386. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  387. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  388. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  389. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  390. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  391. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  392. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  393. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  394. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  395. // Return errors if these fail
  396. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  397. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  398. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  399. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  400. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  401. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  402. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  403. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  404. // Do not return errors if these fail, but log warnings
  405. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  406. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  407. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  408. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  409. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  410. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  411. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  412. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  413. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  414. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  415. resIsSpot, _ := resChIsSpot.Await()
  416. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  417. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  418. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  419. resActiveMins, _ := resChActiveMins.Await()
  420. resLabels, _ := resChLabels.Await()
  421. if optionalCtx.HasErrors() {
  422. for _, err := range optionalCtx.Errors() {
  423. log.Warnf("ClusterNodes: %s", err)
  424. }
  425. }
  426. if requiredCtx.HasErrors() {
  427. for _, err := range requiredCtx.Errors() {
  428. log.Errorf("ClusterNodes: %s", err)
  429. }
  430. return nil, requiredCtx.ErrorCollection()
  431. }
  432. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  433. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  434. preemptibleMap := buildPreemptibleMap(resIsSpot)
  435. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  436. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  437. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  438. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  439. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  440. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  441. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  442. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  443. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  444. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  445. labelsMap := buildLabelsMap(resLabels)
  446. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  447. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  448. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  449. nodeMap := buildNodeMap(
  450. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  451. cpuCoresMap, ramBytesMap, ramUserPctMap,
  452. ramSystemPctMap,
  453. cpuBreakdownMap,
  454. activeDataMap,
  455. preemptibleMap,
  456. labelsMap,
  457. clusterAndNameToType,
  458. resolution,
  459. )
  460. c, err := cp.GetConfig()
  461. if err != nil {
  462. return nil, err
  463. }
  464. discount, err := ParsePercentString(c.Discount)
  465. if err != nil {
  466. return nil, err
  467. }
  468. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  469. if err != nil {
  470. return nil, err
  471. }
  472. for _, node := range nodeMap {
  473. // TODO take GKE Reserved Instances into account
  474. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  475. // Apply all remaining resources to Idle
  476. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  477. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  478. }
  479. return nodeMap, nil
  480. }
  481. type LoadBalancerIdentifier struct {
  482. Cluster string
  483. Namespace string
  484. Name string
  485. }
  486. type LoadBalancer struct {
  487. Cluster string
  488. Namespace string
  489. Name string
  490. ProviderID string
  491. Cost float64
  492. Start time.Time
  493. End time.Time
  494. Minutes float64
  495. }
  496. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  497. // Query for the duration between start and end
  498. durStr := timeutil.DurationString(end.Sub(start))
  499. if durStr == "" {
  500. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  501. }
  502. // Start from the time "end", querying backwards
  503. t := end
  504. // minsPerResolution determines accuracy and resource use for the following
  505. // queries. Smaller values (higher resolution) result in better accuracy,
  506. // but more expensive queries, and vice-a-versa.
  507. resolution := env.GetETLResolution()
  508. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  509. var minsPerResolution int
  510. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) == 0 {
  511. minsPerResolution = 1
  512. log.DedupedWarningf(3, "ClusterLoadBalancers(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  513. }
  514. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  515. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  516. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  517. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  518. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  519. resLBCost, _ := resChLBCost.Await()
  520. resActiveMins, _ := resChActiveMins.Await()
  521. if ctx.HasErrors() {
  522. return nil, ctx.ErrorCollection()
  523. }
  524. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  525. for _, result := range resActiveMins {
  526. cluster, err := result.GetString(env.GetPromClusterLabel())
  527. if err != nil {
  528. cluster = env.GetClusterID()
  529. }
  530. namespace, err := result.GetString("namespace")
  531. if err != nil {
  532. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  533. continue
  534. }
  535. name, err := result.GetString("service_name")
  536. if err != nil {
  537. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  538. continue
  539. }
  540. providerID, err := result.GetString("ingress_ip")
  541. if err != nil {
  542. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  543. providerID = ""
  544. }
  545. key := LoadBalancerIdentifier{
  546. Cluster: cluster,
  547. Namespace: namespace,
  548. Name: name,
  549. }
  550. // Skip if there are no data
  551. if len(result.Values) == 0 {
  552. continue
  553. }
  554. // Add load balancer to the set of load balancers
  555. if _, ok := loadBalancerMap[key]; !ok {
  556. loadBalancerMap[key] = &LoadBalancer{
  557. Cluster: cluster,
  558. Namespace: namespace,
  559. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  560. ProviderID: cloud.ParseLBID(providerID),
  561. }
  562. }
  563. // Append start, end, and minutes. This should come before all other data.
  564. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  565. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  566. loadBalancerMap[key].Start = s
  567. loadBalancerMap[key].End = e
  568. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  569. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  570. // Prevents there from being a duplicate LoadBalancers on the same day
  571. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  572. loadBalancerMap[key].ProviderID = providerID
  573. }
  574. }
  575. for _, result := range resLBCost {
  576. cluster, err := result.GetString(env.GetPromClusterLabel())
  577. if err != nil {
  578. cluster = env.GetClusterID()
  579. }
  580. namespace, err := result.GetString("namespace")
  581. if err != nil {
  582. log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
  583. continue
  584. }
  585. name, err := result.GetString("service_name")
  586. if err != nil {
  587. log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
  588. continue
  589. }
  590. key := LoadBalancerIdentifier{
  591. Cluster: cluster,
  592. Namespace: namespace,
  593. Name: name,
  594. }
  595. // Apply cost as price-per-hour * hours
  596. if lb, ok := loadBalancerMap[key]; ok {
  597. lbPricePerHr := result.Values[0].Value
  598. hrs := lb.Minutes / 60.0
  599. lb.Cost += lbPricePerHr * hrs
  600. } else {
  601. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  602. }
  603. }
  604. return loadBalancerMap, nil
  605. }
  606. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  607. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  608. if window < 10*time.Minute {
  609. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  610. }
  611. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  612. start, end := timeutil.ParseTimeRange(window, offset)
  613. mins := end.Sub(start).Minutes()
  614. windowStr := timeutil.DurationString(window)
  615. // minsPerResolution determines accuracy and resource use for the following
  616. // queries. Smaller values (higher resolution) result in better accuracy,
  617. // but more expensive queries, and vice-a-versa.
  618. resolution := env.GetETLResolution()
  619. //Ensuring if ETL_RESOLUTION_SECONDS is less than 60s default it to 1m
  620. var minsPerResolution int
  621. if minsPerResolution = int(resolution.Minutes()); int(resolution.Minutes()) < 1 {
  622. minsPerResolution = 1
  623. log.DedupedWarningf(3, "ComputeClusterCosts(): Configured ETL resolution (%d seconds) is below the 60 seconds threshold. Overriding with 1 minute.", int(resolution.Seconds()))
  624. }
  625. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  626. // value, converts it to a cumulative value; i.e.
  627. // [$/hr] * [min/res]*[hr/min] = [$/res]
  628. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  629. const fmtQueryDataCount = `
  630. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  631. `
  632. const fmtQueryTotalGPU = `
  633. sum(
  634. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  635. ) by (%s)
  636. `
  637. const fmtQueryTotalCPU = `
  638. sum(
  639. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  640. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  641. ) by (%s)
  642. `
  643. const fmtQueryTotalRAM = `
  644. sum(
  645. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  646. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  647. ) by (%s)
  648. `
  649. const fmtQueryTotalStorage = `
  650. sum(
  651. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  652. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  653. ) by (%s)
  654. `
  655. const fmtQueryCPUModePct = `
  656. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  657. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  658. `
  659. const fmtQueryRAMSystemPct = `
  660. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  661. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  662. `
  663. const fmtQueryRAMUserPct = `
  664. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  665. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  666. `
  667. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  668. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  669. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  670. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  671. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  672. if queryTotalLocalStorage != "" {
  673. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  674. }
  675. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  676. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  677. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  678. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  679. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  680. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  681. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  682. resChs := ctx.QueryAll(
  683. queryDataCount,
  684. queryTotalGPU,
  685. queryTotalCPU,
  686. queryTotalRAM,
  687. queryTotalStorage,
  688. )
  689. // Only submit the local storage query if it is valid. Otherwise Prometheus
  690. // will return errors. Always append something to resChs, regardless, to
  691. // maintain indexing.
  692. if queryTotalLocalStorage != "" {
  693. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  694. } else {
  695. resChs = append(resChs, nil)
  696. }
  697. if withBreakdown {
  698. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  699. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  700. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  701. bdResChs := ctx.QueryAll(
  702. queryCPUModePct,
  703. queryRAMSystemPct,
  704. queryRAMUserPct,
  705. )
  706. // Only submit the local storage query if it is valid. Otherwise Prometheus
  707. // will return errors. Always append something to resChs, regardless, to
  708. // maintain indexing.
  709. if queryUsedLocalStorage != "" {
  710. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  711. } else {
  712. bdResChs = append(bdResChs, nil)
  713. }
  714. resChs = append(resChs, bdResChs...)
  715. }
  716. resDataCount, _ := resChs[0].Await()
  717. resTotalGPU, _ := resChs[1].Await()
  718. resTotalCPU, _ := resChs[2].Await()
  719. resTotalRAM, _ := resChs[3].Await()
  720. resTotalStorage, _ := resChs[4].Await()
  721. if ctx.HasErrors() {
  722. return nil, ctx.ErrorCollection()
  723. }
  724. defaultClusterID := env.GetClusterID()
  725. dataMinsByCluster := map[string]float64{}
  726. for _, result := range resDataCount {
  727. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  728. if clusterID == "" {
  729. clusterID = defaultClusterID
  730. }
  731. dataMins := mins
  732. if len(result.Values) > 0 {
  733. dataMins = result.Values[0].Value
  734. } else {
  735. log.Warnf("Cluster cost data count returned no results for cluster %s", clusterID)
  736. }
  737. dataMinsByCluster[clusterID] = dataMins
  738. }
  739. // Determine combined discount
  740. discount, customDiscount := 0.0, 0.0
  741. c, err := a.CloudProvider.GetConfig()
  742. if err == nil {
  743. discount, err = ParsePercentString(c.Discount)
  744. if err != nil {
  745. discount = 0.0
  746. }
  747. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  748. if err != nil {
  749. customDiscount = 0.0
  750. }
  751. }
  752. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  753. costData := make(map[string]map[string]float64)
  754. // Helper function to iterate over Prom query results, parsing the raw values into
  755. // the intermediate costData structure.
  756. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  757. for _, result := range results {
  758. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  759. if clusterID == "" {
  760. clusterID = defaultClusterID
  761. }
  762. if _, ok := costData[clusterID]; !ok {
  763. costData[clusterID] = map[string]float64{}
  764. }
  765. if len(result.Values) > 0 {
  766. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  767. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  768. }
  769. }
  770. }
  771. // Apply both sustained use and custom discounts to RAM and CPU
  772. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  773. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  774. // Apply only custom discount to GPU and storage
  775. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  776. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  777. if queryTotalLocalStorage != "" {
  778. resTotalLocalStorage, err := resChs[5].Await()
  779. if err != nil {
  780. return nil, err
  781. }
  782. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  783. }
  784. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  785. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  786. pvUsedCostMap := map[string]float64{}
  787. if withBreakdown {
  788. resCPUModePct, _ := resChs[6].Await()
  789. resRAMSystemPct, _ := resChs[7].Await()
  790. resRAMUserPct, _ := resChs[8].Await()
  791. if ctx.HasErrors() {
  792. return nil, ctx.ErrorCollection()
  793. }
  794. for _, result := range resCPUModePct {
  795. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  796. if clusterID == "" {
  797. clusterID = defaultClusterID
  798. }
  799. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  800. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  801. }
  802. cpuBD := cpuBreakdownMap[clusterID]
  803. mode, err := result.GetString("mode")
  804. if err != nil {
  805. log.Warnf("ComputeClusterCosts: unable to read CPU mode: %s", err)
  806. mode = "other"
  807. }
  808. switch mode {
  809. case "idle":
  810. cpuBD.Idle += result.Values[0].Value
  811. case "system":
  812. cpuBD.System += result.Values[0].Value
  813. case "user":
  814. cpuBD.User += result.Values[0].Value
  815. default:
  816. cpuBD.Other += result.Values[0].Value
  817. }
  818. }
  819. for _, result := range resRAMSystemPct {
  820. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  821. if clusterID == "" {
  822. clusterID = defaultClusterID
  823. }
  824. if _, ok := ramBreakdownMap[clusterID]; !ok {
  825. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  826. }
  827. ramBD := ramBreakdownMap[clusterID]
  828. ramBD.System += result.Values[0].Value
  829. }
  830. for _, result := range resRAMUserPct {
  831. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  832. if clusterID == "" {
  833. clusterID = defaultClusterID
  834. }
  835. if _, ok := ramBreakdownMap[clusterID]; !ok {
  836. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  837. }
  838. ramBD := ramBreakdownMap[clusterID]
  839. ramBD.User += result.Values[0].Value
  840. }
  841. for _, ramBD := range ramBreakdownMap {
  842. remaining := 1.0
  843. remaining -= ramBD.Other
  844. remaining -= ramBD.System
  845. remaining -= ramBD.User
  846. ramBD.Idle = remaining
  847. }
  848. if queryUsedLocalStorage != "" {
  849. resUsedLocalStorage, err := resChs[9].Await()
  850. if err != nil {
  851. return nil, err
  852. }
  853. for _, result := range resUsedLocalStorage {
  854. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  855. if clusterID == "" {
  856. clusterID = defaultClusterID
  857. }
  858. pvUsedCostMap[clusterID] += result.Values[0].Value
  859. }
  860. }
  861. }
  862. if ctx.HasErrors() {
  863. for _, err := range ctx.Errors() {
  864. log.Errorf("ComputeClusterCosts: %s", err)
  865. }
  866. return nil, ctx.ErrorCollection()
  867. }
  868. // Convert intermediate structure to Costs instances
  869. costsByCluster := map[string]*ClusterCosts{}
  870. for id, cd := range costData {
  871. dataMins, ok := dataMinsByCluster[id]
  872. if !ok {
  873. dataMins = mins
  874. log.Warnf("Cluster cost data count not found for cluster %s", id)
  875. }
  876. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  877. if err != nil {
  878. log.Warnf("Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  879. return nil, err
  880. }
  881. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  882. costs.CPUBreakdown = cpuBD
  883. }
  884. if ramBD, ok := ramBreakdownMap[id]; ok {
  885. costs.RAMBreakdown = ramBD
  886. }
  887. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  888. if pvUC, ok := pvUsedCostMap[id]; ok {
  889. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  890. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  891. }
  892. costs.DataMinutes = dataMins
  893. costsByCluster[id] = costs
  894. }
  895. return costsByCluster, nil
  896. }
  897. type Totals struct {
  898. TotalCost [][]string `json:"totalcost"`
  899. CPUCost [][]string `json:"cpucost"`
  900. MemCost [][]string `json:"memcost"`
  901. StorageCost [][]string `json:"storageCost"`
  902. }
  903. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  904. if len(qrs) == 0 {
  905. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  906. }
  907. result := qrs[0]
  908. totals := [][]string{}
  909. for _, value := range result.Values {
  910. d0 := fmt.Sprintf("%f", value.Timestamp)
  911. d1 := fmt.Sprintf("%f", value.Value)
  912. toAppend := []string{
  913. d0,
  914. d1,
  915. }
  916. totals = append(totals, toAppend)
  917. }
  918. return totals, nil
  919. }
  920. // ClusterCostsOverTime gives the full cluster costs over time
  921. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  922. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  923. if localStorageQuery != "" {
  924. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  925. }
  926. layout := "2006-01-02T15:04:05.000Z"
  927. start, err := time.Parse(layout, startString)
  928. if err != nil {
  929. log.Errorf("Error parsing time %s. Error: %s", startString, err.Error())
  930. return nil, err
  931. }
  932. end, err := time.Parse(layout, endString)
  933. if err != nil {
  934. log.Errorf("Error parsing time %s. Error: %s", endString, err.Error())
  935. return nil, err
  936. }
  937. fmtWindow := timeutil.DurationString(window)
  938. if fmtWindow == "" {
  939. err := fmt.Errorf("window value invalid or missing")
  940. log.Errorf("Error parsing time %v. Error: %s", window, err.Error())
  941. return nil, err
  942. }
  943. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  944. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  945. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  946. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  947. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  948. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  949. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  950. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  951. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  952. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  953. resultClusterCores, err := resChClusterCores.Await()
  954. if err != nil {
  955. return nil, err
  956. }
  957. resultClusterRAM, err := resChClusterRAM.Await()
  958. if err != nil {
  959. return nil, err
  960. }
  961. resultStorage, err := resChStorage.Await()
  962. if err != nil {
  963. return nil, err
  964. }
  965. resultTotal, err := resChTotal.Await()
  966. if err != nil {
  967. return nil, err
  968. }
  969. coreTotal, err := resultToTotals(resultClusterCores)
  970. if err != nil {
  971. log.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  972. return nil, err
  973. }
  974. ramTotal, err := resultToTotals(resultClusterRAM)
  975. if err != nil {
  976. log.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  977. return nil, err
  978. }
  979. storageTotal, err := resultToTotals(resultStorage)
  980. if err != nil {
  981. log.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  982. }
  983. clusterTotal, err := resultToTotals(resultTotal)
  984. if err != nil {
  985. // If clusterTotal query failed, it's likely because there are no PVs, which
  986. // causes the qTotal query to return no data. Instead, query only node costs.
  987. // If that fails, return an error because something is actually wrong.
  988. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  989. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  990. for _, warning := range warnings {
  991. log.Warnf(warning)
  992. }
  993. if err != nil {
  994. return nil, err
  995. }
  996. clusterTotal, err = resultToTotals(resultNodes)
  997. if err != nil {
  998. log.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  999. return nil, err
  1000. }
  1001. }
  1002. return &Totals{
  1003. TotalCost: clusterTotal,
  1004. CPUCost: coreTotal,
  1005. MemCost: ramTotal,
  1006. StorageCost: storageTotal,
  1007. }, nil
  1008. }
  1009. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
  1010. for _, result := range resActiveMins {
  1011. cluster, err := result.GetString(env.GetPromClusterLabel())
  1012. if err != nil {
  1013. cluster = env.GetClusterID()
  1014. }
  1015. name, err := result.GetString("persistentvolume")
  1016. if err != nil {
  1017. log.Warnf("ClusterDisks: active mins missing pv name")
  1018. continue
  1019. }
  1020. if len(result.Values) == 0 {
  1021. continue
  1022. }
  1023. key := DiskIdentifier{cluster, name}
  1024. if _, ok := diskMap[key]; !ok {
  1025. diskMap[key] = &Disk{
  1026. Cluster: cluster,
  1027. Name: name,
  1028. Breakdown: &ClusterCostsBreakdown{},
  1029. }
  1030. }
  1031. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  1032. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  1033. mins := e.Sub(s).Minutes()
  1034. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  1035. diskMap[key].End = e
  1036. diskMap[key].Start = s
  1037. diskMap[key].Minutes = mins
  1038. }
  1039. for _, result := range resPVSize {
  1040. cluster, err := result.GetString(env.GetPromClusterLabel())
  1041. if err != nil {
  1042. cluster = env.GetClusterID()
  1043. }
  1044. name, err := result.GetString("persistentvolume")
  1045. if err != nil {
  1046. log.Warnf("ClusterDisks: PV size data missing persistentvolume")
  1047. continue
  1048. }
  1049. // TODO niko/assets storage class
  1050. bytes := result.Values[0].Value
  1051. key := DiskIdentifier{cluster, name}
  1052. if _, ok := diskMap[key]; !ok {
  1053. diskMap[key] = &Disk{
  1054. Cluster: cluster,
  1055. Name: name,
  1056. Breakdown: &ClusterCostsBreakdown{},
  1057. }
  1058. }
  1059. diskMap[key].Bytes = bytes
  1060. }
  1061. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  1062. customPricingConfig, err := cp.GetConfig()
  1063. if err != nil {
  1064. log.Warnf("ClusterDisks: failed to load custom pricing: %s", err)
  1065. }
  1066. for _, result := range resPVCost {
  1067. cluster, err := result.GetString(env.GetPromClusterLabel())
  1068. if err != nil {
  1069. cluster = env.GetClusterID()
  1070. }
  1071. name, err := result.GetString("persistentvolume")
  1072. if err != nil {
  1073. log.Warnf("ClusterDisks: PV cost data missing persistentvolume")
  1074. continue
  1075. }
  1076. // TODO niko/assets storage class
  1077. var cost float64
  1078. if customPricingEnabled && customPricingConfig != nil {
  1079. customPVCostStr := customPricingConfig.Storage
  1080. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1081. if err != nil {
  1082. log.Warnf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1083. }
  1084. cost = customPVCost
  1085. } else {
  1086. cost = result.Values[0].Value
  1087. }
  1088. key := DiskIdentifier{cluster, name}
  1089. if _, ok := diskMap[key]; !ok {
  1090. diskMap[key] = &Disk{
  1091. Cluster: cluster,
  1092. Name: name,
  1093. Breakdown: &ClusterCostsBreakdown{},
  1094. }
  1095. }
  1096. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1097. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1098. if providerID != "" {
  1099. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1100. }
  1101. }
  1102. }