cluster.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/kubecost/cost-model/pkg/cloud"
  6. "github.com/kubecost/cost-model/pkg/env"
  7. "github.com/kubecost/cost-model/pkg/log"
  8. "github.com/kubecost/cost-model/pkg/prom"
  9. "github.com/kubecost/cost-model/pkg/util"
  10. prometheus "github.com/prometheus/client_golang/api"
  11. "k8s.io/klog"
  12. )
  13. const (
  14. queryClusterCores = `sum(
  15. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
  16. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  17. ) by (cluster_id)`
  18. queryClusterRAM = `sum(
  19. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
  20. ) by (cluster_id)`
  21. queryStorage = `sum(
  22. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
  23. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  24. ) by (cluster_id) %s`
  25. queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
  26. sum(
  27. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
  28. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
  29. ) by (cluster_id) %s`
  30. queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
  31. )
  32. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  33. // are broken down by cores, memory, and storage.
  34. type ClusterCosts struct {
  35. Start *time.Time `json:"startTime"`
  36. End *time.Time `json:"endTime"`
  37. CPUCumulative float64 `json:"cpuCumulativeCost"`
  38. CPUMonthly float64 `json:"cpuMonthlyCost"`
  39. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  40. GPUCumulative float64 `json:"gpuCumulativeCost"`
  41. GPUMonthly float64 `json:"gpuMonthlyCost"`
  42. RAMCumulative float64 `json:"ramCumulativeCost"`
  43. RAMMonthly float64 `json:"ramMonthlyCost"`
  44. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  45. StorageCumulative float64 `json:"storageCumulativeCost"`
  46. StorageMonthly float64 `json:"storageMonthlyCost"`
  47. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  48. TotalCumulative float64 `json:"totalCumulativeCost"`
  49. TotalMonthly float64 `json:"totalMonthlyCost"`
  50. DataMinutes float64
  51. }
  52. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  53. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  54. type ClusterCostsBreakdown struct {
  55. Idle float64 `json:"idle"`
  56. Other float64 `json:"other"`
  57. System float64 `json:"system"`
  58. User float64 `json:"user"`
  59. }
  60. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  61. // the associated monthly rate data, and returns the Costs.
  62. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
  63. start, end, err := util.ParseTimeRange(window, offset)
  64. if err != nil {
  65. return nil, err
  66. }
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(*start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: start,
  77. End: end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (util.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (util.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (util.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (util.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. Cost float64
  96. Bytes float64
  97. Local bool
  98. Start time.Time
  99. End time.Time
  100. Minutes float64
  101. Breakdown *ClusterCostsBreakdown
  102. }
  103. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
  104. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  105. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  106. if offset < time.Minute {
  107. offsetStr = ""
  108. }
  109. // minsPerResolution determines accuracy and resource use for the following
  110. // queries. Smaller values (higher resolution) result in better accuracy,
  111. // but more expensive queries, and vice-a-versa.
  112. minsPerResolution := 1
  113. resolution := time.Duration(minsPerResolution) * time.Minute
  114. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  115. // value, converts it to a cumulative value; i.e.
  116. // [$/hr] * [min/res]*[hr/min] = [$/res]
  117. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  118. // TODO niko/assets how do we not hard-code this price?
  119. costPerGBHr := 0.04 / 730.0
  120. ctx := prom.NewContext(client)
  121. queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume) * on(cluster_id, persistentvolume) group_right avg(pv_hourly_cost) by (cluster_id, persistentvolume,provider_id))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  122. queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  123. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  124. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  125. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  126. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  127. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  128. resChPVCost := ctx.Query(queryPVCost)
  129. resChPVSize := ctx.Query(queryPVSize)
  130. resChActiveMins := ctx.Query(queryActiveMins)
  131. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  132. resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
  133. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  134. resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
  135. resPVCost, _ := resChPVCost.Await()
  136. resPVSize, _ := resChPVSize.Await()
  137. resActiveMins, _ := resChActiveMins.Await()
  138. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  139. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  140. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  141. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  142. if ctx.HasErrors() {
  143. return nil, ctx.ErrorCollection()
  144. }
  145. diskMap := map[string]*Disk{}
  146. for _, result := range resPVCost {
  147. cluster, err := result.GetString("cluster_id")
  148. if err != nil {
  149. cluster = env.GetClusterID()
  150. }
  151. name, err := result.GetString("persistentvolume")
  152. if err != nil {
  153. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  154. continue
  155. }
  156. // TODO niko/assets storage class
  157. cost := result.Values[0].Value
  158. key := fmt.Sprintf("%s/%s", cluster, name)
  159. if _, ok := diskMap[key]; !ok {
  160. diskMap[key] = &Disk{
  161. Cluster: cluster,
  162. Name: name,
  163. Breakdown: &ClusterCostsBreakdown{},
  164. }
  165. }
  166. diskMap[key].Cost += cost
  167. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  168. if providerID != "" {
  169. diskMap[key].ProviderID = provider.ParsePVID(providerID)
  170. }
  171. }
  172. for _, result := range resPVSize {
  173. cluster, err := result.GetString("cluster_id")
  174. if err != nil {
  175. cluster = env.GetClusterID()
  176. }
  177. name, err := result.GetString("persistentvolume")
  178. if err != nil {
  179. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  180. continue
  181. }
  182. // TODO niko/assets storage class
  183. bytes := result.Values[0].Value
  184. key := fmt.Sprintf("%s/%s", cluster, name)
  185. if _, ok := diskMap[key]; !ok {
  186. diskMap[key] = &Disk{
  187. Cluster: cluster,
  188. Name: name,
  189. Breakdown: &ClusterCostsBreakdown{},
  190. }
  191. }
  192. diskMap[key].Bytes = bytes
  193. }
  194. for _, result := range resLocalStorageCost {
  195. cluster, err := result.GetString("cluster_id")
  196. if err != nil {
  197. cluster = env.GetClusterID()
  198. }
  199. name, err := result.GetString("instance")
  200. if err != nil {
  201. log.Warningf("ClusterDisks: local storage data missing instance")
  202. continue
  203. }
  204. cost := result.Values[0].Value
  205. key := fmt.Sprintf("%s/%s", cluster, name)
  206. if _, ok := diskMap[key]; !ok {
  207. diskMap[key] = &Disk{
  208. Cluster: cluster,
  209. Name: name,
  210. Breakdown: &ClusterCostsBreakdown{},
  211. Local: true,
  212. }
  213. }
  214. diskMap[key].Cost += cost
  215. }
  216. for _, result := range resLocalStorageUsedCost {
  217. cluster, err := result.GetString("cluster_id")
  218. if err != nil {
  219. cluster = env.GetClusterID()
  220. }
  221. name, err := result.GetString("instance")
  222. if err != nil {
  223. log.Warningf("ClusterDisks: local storage usage data missing instance")
  224. continue
  225. }
  226. cost := result.Values[0].Value
  227. key := fmt.Sprintf("%s/%s", cluster, name)
  228. if _, ok := diskMap[key]; !ok {
  229. diskMap[key] = &Disk{
  230. Cluster: cluster,
  231. Name: name,
  232. Breakdown: &ClusterCostsBreakdown{},
  233. Local: true,
  234. }
  235. }
  236. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  237. }
  238. for _, result := range resLocalStorageBytes {
  239. cluster, err := result.GetString("cluster_id")
  240. if err != nil {
  241. cluster = env.GetClusterID()
  242. }
  243. name, err := result.GetString("instance")
  244. if err != nil {
  245. log.Warningf("ClusterDisks: local storage data missing instance")
  246. continue
  247. }
  248. bytes := result.Values[0].Value
  249. key := fmt.Sprintf("%s/%s", cluster, name)
  250. if _, ok := diskMap[key]; !ok {
  251. diskMap[key] = &Disk{
  252. Cluster: cluster,
  253. Name: name,
  254. Breakdown: &ClusterCostsBreakdown{},
  255. Local: true,
  256. }
  257. }
  258. diskMap[key].Bytes = bytes
  259. }
  260. for _, result := range resActiveMins {
  261. cluster, err := result.GetString("cluster_id")
  262. if err != nil {
  263. cluster = env.GetClusterID()
  264. }
  265. name, err := result.GetString("persistentvolume")
  266. if err != nil {
  267. log.Warningf("ClusterDisks: active mins missing instance")
  268. continue
  269. }
  270. key := fmt.Sprintf("%s/%s", cluster, name)
  271. if _, ok := diskMap[key]; !ok {
  272. log.Warningf("ClusterDisks: active mins for unidentified disk")
  273. continue
  274. }
  275. if len(result.Values) == 0 {
  276. continue
  277. }
  278. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  279. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  280. mins := e.Sub(s).Minutes()
  281. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  282. diskMap[key].End = e
  283. diskMap[key].Start = s
  284. diskMap[key].Minutes = mins
  285. }
  286. for _, result := range resLocalActiveMins {
  287. cluster, err := result.GetString("cluster_id")
  288. if err != nil {
  289. cluster = env.GetClusterID()
  290. }
  291. name, err := result.GetString("node")
  292. if err != nil {
  293. log.Warningf("ClusterDisks: local active mins data missing instance")
  294. continue
  295. }
  296. key := fmt.Sprintf("%s/%s", cluster, name)
  297. if _, ok := diskMap[key]; !ok {
  298. log.Warningf("ClusterDisks: local active mins for unidentified disk")
  299. continue
  300. }
  301. if len(result.Values) == 0 {
  302. continue
  303. }
  304. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  305. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  306. mins := e.Sub(s).Minutes()
  307. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  308. diskMap[key].End = e
  309. diskMap[key].Start = s
  310. diskMap[key].Minutes = mins
  311. }
  312. for _, disk := range diskMap {
  313. // Apply all remaining RAM to Idle
  314. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  315. // Set provider Id to the name for reconciliation on Azure
  316. if fmt.Sprintf("%T", provider) == "*provider.Azure"{
  317. if disk.ProviderID == "" {
  318. disk.ProviderID = disk.Name
  319. }
  320. }
  321. }
  322. return diskMap, nil
  323. }
  324. type Node struct {
  325. Cluster string
  326. Name string
  327. ProviderID string
  328. NodeType string
  329. CPUCost float64
  330. CPUCores float64
  331. GPUCost float64
  332. RAMCost float64
  333. RAMBytes float64
  334. Discount float64
  335. Preemptible bool
  336. CPUBreakdown *ClusterCostsBreakdown
  337. RAMBreakdown *ClusterCostsBreakdown
  338. Start time.Time
  339. End time.Time
  340. Minutes float64
  341. Labels map[string]string
  342. CostPerCPUHr float64
  343. CostPerRAMGiBHr float64
  344. CostPerGPUHr float64
  345. }
  346. // GKE lies about the number of cores e2 nodes have. This table
  347. // contains a mapping from node type -> actual CPU cores
  348. // for those cases.
  349. var partialCPUMap = map[string]float64{
  350. "e2-micro": 0.25,
  351. "e2-small": 0.5,
  352. "e2-medium": 1.0,
  353. }
  354. type NodeIdentifier struct {
  355. Cluster string
  356. Name string
  357. ProviderID string
  358. }
  359. type nodeIdentifierNoProviderID struct {
  360. Cluster string
  361. Name string
  362. }
  363. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[NodeIdentifier]*Node, error) {
  364. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  365. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  366. if offset < time.Minute {
  367. offsetStr = ""
  368. }
  369. // minsPerResolution determines accuracy and resource use for the following
  370. // queries. Smaller values (higher resolution) result in better accuracy,
  371. // but more expensive queries, and vice-a-versa.
  372. minsPerResolution := 1
  373. resolution := time.Duration(minsPerResolution) * time.Minute
  374. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  375. // value, converts it to a cumulative value; i.e.
  376. // [$/hr] * [min/res]*[hr/min] = [$/res]
  377. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  378. requiredCtx := prom.NewContext(client)
  379. optionalCtx := prom.NewContext(client)
  380. queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  381. queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  382. queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  383. queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  384. queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
  385. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
  386. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
  387. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
  388. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, cluster_id, provider_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  389. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  390. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  391. // Return errors if these fail
  392. resChNodeCPUCost := requiredCtx.Query(queryNodeCPUCost)
  393. resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
  394. resChNodeRAMCost := requiredCtx.Query(queryNodeRAMCost)
  395. resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
  396. resChNodeGPUCost := requiredCtx.Query(queryNodeGPUCost)
  397. resChActiveMins := requiredCtx.Query(queryActiveMins)
  398. resChIsSpot := requiredCtx.Query(queryIsSpot)
  399. // Do not return errors if these fail, but log warnings
  400. resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
  401. resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
  402. resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
  403. resChLabels := optionalCtx.Query(queryLabels)
  404. resNodeCPUCost, _ := resChNodeCPUCost.Await()
  405. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  406. resNodeGPUCost, _ := resChNodeGPUCost.Await()
  407. resNodeRAMCost, _ := resChNodeRAMCost.Await()
  408. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  409. resIsSpot, _ := resChIsSpot.Await()
  410. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  411. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  412. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  413. resActiveMins, _ := resChActiveMins.Await()
  414. resLabels, _ := resChLabels.Await()
  415. if optionalCtx.HasErrors() {
  416. for _, err := range optionalCtx.Errors() {
  417. log.Warningf("ClusterNodes: %s", err)
  418. }
  419. }
  420. if requiredCtx.HasErrors() {
  421. for _, err := range requiredCtx.Errors() {
  422. log.Errorf("ClusterNodes: %s", err)
  423. }
  424. return nil, requiredCtx.ErrorCollection()
  425. }
  426. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUCost, cp.ParseID)
  427. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMCost, cp.ParseID)
  428. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUCost, cp.ParseID)
  429. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  430. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  431. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores, clusterAndNameToType)
  432. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  433. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  434. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  435. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  436. activeDataMap := buildActiveDataMap(resActiveMins, resolution, cp.ParseID)
  437. preemptibleMap := buildPreemptibleMap(resIsSpot, cp.ParseID)
  438. labelsMap := buildLabelsMap(resLabels)
  439. nodeMap := buildNodeMap(
  440. cpuCostMap, ramCostMap, gpuCostMap,
  441. cpuCoresMap, ramBytesMap, ramUserPctMap,
  442. ramSystemPctMap,
  443. cpuBreakdownMap,
  444. activeDataMap,
  445. preemptibleMap,
  446. labelsMap,
  447. clusterAndNameToType,
  448. )
  449. c, err := cp.GetConfig()
  450. if err != nil {
  451. return nil, err
  452. }
  453. discount, err := ParsePercentString(c.Discount)
  454. if err != nil {
  455. return nil, err
  456. }
  457. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  458. if err != nil {
  459. return nil, err
  460. }
  461. for _, node := range nodeMap {
  462. // TODO take GKE Reserved Instances into account
  463. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  464. // Apply all remaining resources to Idle
  465. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  466. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  467. }
  468. return nodeMap, nil
  469. }
  470. type LoadBalancer struct {
  471. Cluster string
  472. Name string
  473. ProviderID string
  474. Cost float64
  475. Start time.Time
  476. Minutes float64
  477. }
  478. func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
  479. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  480. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  481. if offset < time.Minute {
  482. offsetStr = ""
  483. }
  484. // minsPerResolution determines accuracy and resource use for the following
  485. // queries. Smaller values (higher resolution) result in better accuracy,
  486. // but more expensive queries, and vice-a-versa.
  487. minsPerResolution := 5
  488. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  489. // value, converts it to a cumulative value; i.e.
  490. // [$/hr] * [min/res]*[hr/min] = [$/res]
  491. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  492. ctx := prom.NewContext(client)
  493. queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  494. queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
  495. resChLBCost := ctx.Query(queryLBCost)
  496. resChActiveMins := ctx.Query(queryActiveMins)
  497. resLBCost, _ := resChLBCost.Await()
  498. resActiveMins, _ := resChActiveMins.Await()
  499. if ctx.HasErrors() {
  500. return nil, ctx.ErrorCollection()
  501. }
  502. loadBalancerMap := map[string]*LoadBalancer{}
  503. for _, result := range resLBCost {
  504. cluster, err := result.GetString("cluster_id")
  505. if err != nil {
  506. cluster = env.GetClusterID()
  507. }
  508. namespace, err := result.GetString("namespace")
  509. if err != nil {
  510. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  511. continue
  512. }
  513. serviceName, err := result.GetString("service_name")
  514. if err != nil {
  515. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  516. continue
  517. }
  518. providerID := ""
  519. lbCost := result.Values[0].Value
  520. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  521. if _, ok := loadBalancerMap[key]; !ok {
  522. loadBalancerMap[key] = &LoadBalancer{
  523. Cluster: cluster,
  524. Name: namespace + "/" + serviceName,
  525. ProviderID: providerID, // cp.ParseID(providerID) if providerID does get recorded later
  526. }
  527. }
  528. loadBalancerMap[key].Cost += lbCost
  529. }
  530. for _, result := range resActiveMins {
  531. cluster, err := result.GetString("cluster_id")
  532. if err != nil {
  533. cluster = env.GetClusterID()
  534. }
  535. namespace, err := result.GetString("namespace")
  536. if err != nil {
  537. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  538. continue
  539. }
  540. serviceName, err := result.GetString("service_name")
  541. if err != nil {
  542. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  543. continue
  544. }
  545. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  546. if len(result.Values) == 0 {
  547. continue
  548. }
  549. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  550. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  551. mins := e.Sub(s).Minutes()
  552. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  553. loadBalancerMap[key].Start = s
  554. loadBalancerMap[key].Minutes = mins
  555. }
  556. return loadBalancerMap, nil
  557. }
  558. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  559. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string, withBreakdown bool) (map[string]*ClusterCosts, error) {
  560. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  561. start, end, err := util.ParseTimeRange(window, offset)
  562. if err != nil {
  563. return nil, err
  564. }
  565. mins := end.Sub(*start).Minutes()
  566. // minsPerResolution determines accuracy and resource use for the following
  567. // queries. Smaller values (higher resolution) result in better accuracy,
  568. // but more expensive queries, and vice-a-versa.
  569. minsPerResolution := 5
  570. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  571. // value, converts it to a cumulative value; i.e.
  572. // [$/hr] * [min/res]*[hr/min] = [$/res]
  573. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  574. const fmtQueryDataCount = `
  575. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (cluster_id)[%s:%dm]%s) * %d
  576. `
  577. const fmtQueryTotalGPU = `
  578. sum(
  579. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  580. ) by (cluster_id)
  581. `
  582. const fmtQueryTotalCPU = `
  583. sum(
  584. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, cluster_id)[%s:%dm]%s) *
  585. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  586. ) by (cluster_id)
  587. `
  588. const fmtQueryTotalRAM = `
  589. sum(
  590. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  591. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
  592. ) by (cluster_id)
  593. `
  594. const fmtQueryTotalStorage = `
  595. sum(
  596. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  597. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, cluster_id) * %f
  598. ) by (cluster_id)
  599. `
  600. const fmtQueryCPUModePct = `
  601. sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
  602. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)
  603. `
  604. const fmtQueryRAMSystemPct = `
  605. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (cluster_id)
  606. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  607. `
  608. const fmtQueryRAMUserPct = `
  609. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (cluster_id)
  610. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
  611. `
  612. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  613. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  614. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  615. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  616. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  617. if queryTotalLocalStorage != "" {
  618. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  619. }
  620. fmtOffset := ""
  621. if offset != "" {
  622. fmtOffset = fmt.Sprintf("offset %s", offset)
  623. }
  624. queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, minsPerResolution, fmtOffset, minsPerResolution)
  625. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  626. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  627. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  628. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
  629. ctx := prom.NewContext(client)
  630. resChs := ctx.QueryAll(
  631. queryDataCount,
  632. queryTotalGPU,
  633. queryTotalCPU,
  634. queryTotalRAM,
  635. queryTotalStorage,
  636. )
  637. // Only submit the local storage query if it is valid. Otherwise Prometheus
  638. // will return errors. Always append something to resChs, regardless, to
  639. // maintain indexing.
  640. if queryTotalLocalStorage != "" {
  641. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  642. } else {
  643. resChs = append(resChs, nil)
  644. }
  645. if withBreakdown {
  646. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
  647. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  648. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
  649. bdResChs := ctx.QueryAll(
  650. queryCPUModePct,
  651. queryRAMSystemPct,
  652. queryRAMUserPct,
  653. )
  654. // Only submit the local storage query if it is valid. Otherwise Prometheus
  655. // will return errors. Always append something to resChs, regardless, to
  656. // maintain indexing.
  657. if queryUsedLocalStorage != "" {
  658. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  659. } else {
  660. bdResChs = append(bdResChs, nil)
  661. }
  662. resChs = append(resChs, bdResChs...)
  663. }
  664. resDataCount, _ := resChs[0].Await()
  665. resTotalGPU, _ := resChs[1].Await()
  666. resTotalCPU, _ := resChs[2].Await()
  667. resTotalRAM, _ := resChs[3].Await()
  668. resTotalStorage, _ := resChs[4].Await()
  669. if ctx.HasErrors() {
  670. return nil, ctx.ErrorCollection()
  671. }
  672. defaultClusterID := env.GetClusterID()
  673. dataMinsByCluster := map[string]float64{}
  674. for _, result := range resDataCount {
  675. clusterID, _ := result.GetString("cluster_id")
  676. if clusterID == "" {
  677. clusterID = defaultClusterID
  678. }
  679. dataMins := mins
  680. if len(result.Values) > 0 {
  681. dataMins = result.Values[0].Value
  682. } else {
  683. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  684. }
  685. dataMinsByCluster[clusterID] = dataMins
  686. }
  687. // Determine combined discount
  688. discount, customDiscount := 0.0, 0.0
  689. c, err := a.CloudProvider.GetConfig()
  690. if err == nil {
  691. discount, err = ParsePercentString(c.Discount)
  692. if err != nil {
  693. discount = 0.0
  694. }
  695. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  696. if err != nil {
  697. customDiscount = 0.0
  698. }
  699. }
  700. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  701. costData := make(map[string]map[string]float64)
  702. // Helper function to iterate over Prom query results, parsing the raw values into
  703. // the intermediate costData structure.
  704. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  705. for _, result := range results {
  706. clusterID, _ := result.GetString("cluster_id")
  707. if clusterID == "" {
  708. clusterID = defaultClusterID
  709. }
  710. if _, ok := costData[clusterID]; !ok {
  711. costData[clusterID] = map[string]float64{}
  712. }
  713. if len(result.Values) > 0 {
  714. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  715. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  716. }
  717. }
  718. }
  719. // Apply both sustained use and custom discounts to RAM and CPU
  720. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  721. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  722. // Apply only custom discount to GPU and storage
  723. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  724. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  725. if queryTotalLocalStorage != "" {
  726. resTotalLocalStorage, err := resChs[5].Await()
  727. if err != nil {
  728. return nil, err
  729. }
  730. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  731. }
  732. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  733. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  734. pvUsedCostMap := map[string]float64{}
  735. if withBreakdown {
  736. resCPUModePct, _ := resChs[6].Await()
  737. resRAMSystemPct, _ := resChs[7].Await()
  738. resRAMUserPct, _ := resChs[8].Await()
  739. if ctx.HasErrors() {
  740. return nil, ctx.ErrorCollection()
  741. }
  742. for _, result := range resCPUModePct {
  743. clusterID, _ := result.GetString("cluster_id")
  744. if clusterID == "" {
  745. clusterID = defaultClusterID
  746. }
  747. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  748. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  749. }
  750. cpuBD := cpuBreakdownMap[clusterID]
  751. mode, err := result.GetString("mode")
  752. if err != nil {
  753. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  754. mode = "other"
  755. }
  756. switch mode {
  757. case "idle":
  758. cpuBD.Idle += result.Values[0].Value
  759. case "system":
  760. cpuBD.System += result.Values[0].Value
  761. case "user":
  762. cpuBD.User += result.Values[0].Value
  763. default:
  764. cpuBD.Other += result.Values[0].Value
  765. }
  766. }
  767. for _, result := range resRAMSystemPct {
  768. clusterID, _ := result.GetString("cluster_id")
  769. if clusterID == "" {
  770. clusterID = defaultClusterID
  771. }
  772. if _, ok := ramBreakdownMap[clusterID]; !ok {
  773. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  774. }
  775. ramBD := ramBreakdownMap[clusterID]
  776. ramBD.System += result.Values[0].Value
  777. }
  778. for _, result := range resRAMUserPct {
  779. clusterID, _ := result.GetString("cluster_id")
  780. if clusterID == "" {
  781. clusterID = defaultClusterID
  782. }
  783. if _, ok := ramBreakdownMap[clusterID]; !ok {
  784. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  785. }
  786. ramBD := ramBreakdownMap[clusterID]
  787. ramBD.User += result.Values[0].Value
  788. }
  789. for _, ramBD := range ramBreakdownMap {
  790. remaining := 1.0
  791. remaining -= ramBD.Other
  792. remaining -= ramBD.System
  793. remaining -= ramBD.User
  794. ramBD.Idle = remaining
  795. }
  796. if queryUsedLocalStorage != "" {
  797. resUsedLocalStorage, err := resChs[9].Await()
  798. if err != nil {
  799. return nil, err
  800. }
  801. for _, result := range resUsedLocalStorage {
  802. clusterID, _ := result.GetString("cluster_id")
  803. if clusterID == "" {
  804. clusterID = defaultClusterID
  805. }
  806. pvUsedCostMap[clusterID] += result.Values[0].Value
  807. }
  808. }
  809. }
  810. if ctx.HasErrors() {
  811. for _, err := range ctx.Errors() {
  812. log.Errorf("ComputeClusterCosts: %s", err)
  813. }
  814. return nil, ctx.ErrorCollection()
  815. }
  816. // Convert intermediate structure to Costs instances
  817. costsByCluster := map[string]*ClusterCosts{}
  818. for id, cd := range costData {
  819. dataMins, ok := dataMinsByCluster[id]
  820. if !ok {
  821. dataMins = mins
  822. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  823. }
  824. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/util.MinsPerHour)
  825. if err != nil {
  826. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  827. return nil, err
  828. }
  829. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  830. costs.CPUBreakdown = cpuBD
  831. }
  832. if ramBD, ok := ramBreakdownMap[id]; ok {
  833. costs.RAMBreakdown = ramBD
  834. }
  835. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  836. if pvUC, ok := pvUsedCostMap[id]; ok {
  837. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  838. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  839. }
  840. costs.DataMinutes = dataMins
  841. costsByCluster[id] = costs
  842. }
  843. return costsByCluster, nil
  844. }
  845. type Totals struct {
  846. TotalCost [][]string `json:"totalcost"`
  847. CPUCost [][]string `json:"cpucost"`
  848. MemCost [][]string `json:"memcost"`
  849. StorageCost [][]string `json:"storageCost"`
  850. }
  851. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  852. if len(qrs) == 0 {
  853. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  854. }
  855. result := qrs[0]
  856. totals := [][]string{}
  857. for _, value := range result.Values {
  858. d0 := fmt.Sprintf("%f", value.Timestamp)
  859. d1 := fmt.Sprintf("%f", value.Value)
  860. toAppend := []string{
  861. d0,
  862. d1,
  863. }
  864. totals = append(totals, toAppend)
  865. }
  866. return totals, nil
  867. }
  868. // ClusterCostsOverTime gives the full cluster costs over time
  869. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
  870. localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true, false)
  871. if localStorageQuery != "" {
  872. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  873. }
  874. layout := "2006-01-02T15:04:05.000Z"
  875. start, err := time.Parse(layout, startString)
  876. if err != nil {
  877. klog.V(1).Infof("Error parsing time " + startString + ". Error: " + err.Error())
  878. return nil, err
  879. }
  880. end, err := time.Parse(layout, endString)
  881. if err != nil {
  882. klog.V(1).Infof("Error parsing time " + endString + ". Error: " + err.Error())
  883. return nil, err
  884. }
  885. window, err := time.ParseDuration(windowString)
  886. if err != nil {
  887. klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
  888. return nil, err
  889. }
  890. // turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
  891. if offset != "" {
  892. offset = fmt.Sprintf("offset %s", offset)
  893. }
  894. qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
  895. qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
  896. qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
  897. qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
  898. ctx := prom.NewContext(cli)
  899. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  900. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  901. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  902. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  903. resultClusterCores, err := resChClusterCores.Await()
  904. if err != nil {
  905. return nil, err
  906. }
  907. resultClusterRAM, err := resChClusterRAM.Await()
  908. if err != nil {
  909. return nil, err
  910. }
  911. resultStorage, err := resChStorage.Await()
  912. if err != nil {
  913. return nil, err
  914. }
  915. resultTotal, err := resChTotal.Await()
  916. if err != nil {
  917. return nil, err
  918. }
  919. coreTotal, err := resultToTotals(resultClusterCores)
  920. if err != nil {
  921. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  922. return nil, err
  923. }
  924. ramTotal, err := resultToTotals(resultClusterRAM)
  925. if err != nil {
  926. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  927. return nil, err
  928. }
  929. storageTotal, err := resultToTotals(resultStorage)
  930. if err != nil {
  931. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  932. }
  933. clusterTotal, err := resultToTotals(resultTotal)
  934. if err != nil {
  935. // If clusterTotal query failed, it's likely because there are no PVs, which
  936. // causes the qTotal query to return no data. Instead, query only node costs.
  937. // If that fails, return an error because something is actually wrong.
  938. qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
  939. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  940. for _, warning := range warnings {
  941. log.Warningf(warning)
  942. }
  943. if err != nil {
  944. return nil, err
  945. }
  946. clusterTotal, err = resultToTotals(resultNodes)
  947. if err != nil {
  948. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  949. return nil, err
  950. }
  951. }
  952. return &Totals{
  953. TotalCost: clusterTotal,
  954. CPUCost: coreTotal,
  955. MemCost: ramTotal,
  956. StorageCost: storageTotal,
  957. }, nil
  958. }