cluster.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/kubecost/cost-model/pkg/kubecost"
  7. "github.com/kubecost/cost-model/pkg/util/timeutil"
  8. "github.com/kubecost/cost-model/pkg/cloud"
  9. "github.com/kubecost/cost-model/pkg/env"
  10. "github.com/kubecost/cost-model/pkg/log"
  11. "github.com/kubecost/cost-model/pkg/prom"
  12. prometheus "github.com/prometheus/client_golang/api"
  13. "k8s.io/klog"
  14. )
  15. const (
  16. queryClusterCores = `sum(
  17. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  18. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  19. ) by (%s)`
  20. queryClusterRAM = `sum(
  21. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  22. ) by (%s)`
  23. queryStorage = `sum(
  24. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  25. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  26. ) by (%s) %s`
  27. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  28. sum(
  29. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  30. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  31. ) by (%s) %s`
  32. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  33. )
  34. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  35. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  36. // are broken down by cores, memory, and storage.
  37. type ClusterCosts struct {
  38. Start *time.Time `json:"startTime"`
  39. End *time.Time `json:"endTime"`
  40. CPUCumulative float64 `json:"cpuCumulativeCost"`
  41. CPUMonthly float64 `json:"cpuMonthlyCost"`
  42. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  43. GPUCumulative float64 `json:"gpuCumulativeCost"`
  44. GPUMonthly float64 `json:"gpuMonthlyCost"`
  45. RAMCumulative float64 `json:"ramCumulativeCost"`
  46. RAMMonthly float64 `json:"ramMonthlyCost"`
  47. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  48. StorageCumulative float64 `json:"storageCumulativeCost"`
  49. StorageMonthly float64 `json:"storageMonthlyCost"`
  50. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  51. TotalCumulative float64 `json:"totalCumulativeCost"`
  52. TotalMonthly float64 `json:"totalMonthlyCost"`
  53. DataMinutes float64
  54. }
  55. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  56. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  57. type ClusterCostsBreakdown struct {
  58. Idle float64 `json:"idle"`
  59. Other float64 `json:"other"`
  60. System float64 `json:"system"`
  61. User float64 `json:"user"`
  62. }
  63. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  64. // the associated monthly rate data, and returns the Costs.
  65. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  66. start, end := timeutil.ParseTimeRange(window, offset)
  67. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  68. if dataHours == 0 {
  69. dataHours = end.Sub(start).Hours()
  70. }
  71. // Do not allow zero-length windows to prevent divide-by-zero issues
  72. if dataHours == 0 {
  73. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  74. }
  75. cc := &ClusterCosts{
  76. Start: &start,
  77. End: &end,
  78. CPUCumulative: cpu,
  79. GPUCumulative: gpu,
  80. RAMCumulative: ram,
  81. StorageCumulative: storage,
  82. TotalCumulative: cpu + gpu + ram + storage,
  83. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  84. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  85. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  86. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  87. }
  88. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  89. return cc, nil
  90. }
  91. type Disk struct {
  92. Cluster string
  93. Name string
  94. ProviderID string
  95. Cost float64
  96. Bytes float64
  97. Local bool
  98. Start time.Time
  99. End time.Time
  100. Minutes float64
  101. Breakdown *ClusterCostsBreakdown
  102. }
  103. type DiskIdentifier struct {
  104. Cluster string
  105. Name string
  106. }
  107. func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
  108. // Query for the duration between start and end
  109. durStr := timeutil.DurationString(end.Sub(start))
  110. if durStr == "" {
  111. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  112. }
  113. // Start from the time "end", querying backwards
  114. t := end
  115. // minsPerResolution determines accuracy and resource use for the following
  116. // queries. Smaller values (higher resolution) result in better accuracy,
  117. // but more expensive queries, and vice-a-versa.
  118. minsPerResolution := 1
  119. resolution := time.Duration(minsPerResolution) * time.Minute
  120. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  121. // value, converts it to a cumulative value; i.e.
  122. // [$/hr] * [min/res]*[hr/min] = [$/res]
  123. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  124. // TODO niko/assets how do we not hard-code this price?
  125. costPerGBHr := 0.04 / 730.0
  126. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  127. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
  128. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
  129. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  130. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  131. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
  132. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  133. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  134. resChPVCost := ctx.QueryAtTime(queryPVCost, t)
  135. resChPVSize := ctx.QueryAtTime(queryPVSize, t)
  136. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  137. resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
  138. resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
  139. resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
  140. resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
  141. resPVCost, _ := resChPVCost.Await()
  142. resPVSize, _ := resChPVSize.Await()
  143. resActiveMins, _ := resChActiveMins.Await()
  144. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  145. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  146. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  147. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  148. if ctx.HasErrors() {
  149. return nil, ctx.ErrorCollection()
  150. }
  151. diskMap := map[DiskIdentifier]*Disk{}
  152. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
  153. for _, result := range resLocalStorageCost {
  154. cluster, err := result.GetString(env.GetPromClusterLabel())
  155. if err != nil {
  156. cluster = env.GetClusterID()
  157. }
  158. name, err := result.GetString("instance")
  159. if err != nil {
  160. log.Warningf("ClusterDisks: local storage data missing instance")
  161. continue
  162. }
  163. cost := result.Values[0].Value
  164. key := DiskIdentifier{cluster, name}
  165. if _, ok := diskMap[key]; !ok {
  166. diskMap[key] = &Disk{
  167. Cluster: cluster,
  168. Name: name,
  169. Breakdown: &ClusterCostsBreakdown{},
  170. Local: true,
  171. }
  172. }
  173. diskMap[key].Cost += cost
  174. }
  175. for _, result := range resLocalStorageUsedCost {
  176. cluster, err := result.GetString(env.GetPromClusterLabel())
  177. if err != nil {
  178. cluster = env.GetClusterID()
  179. }
  180. name, err := result.GetString("instance")
  181. if err != nil {
  182. log.Warningf("ClusterDisks: local storage usage data missing instance")
  183. continue
  184. }
  185. cost := result.Values[0].Value
  186. key := DiskIdentifier{cluster, name}
  187. if _, ok := diskMap[key]; !ok {
  188. diskMap[key] = &Disk{
  189. Cluster: cluster,
  190. Name: name,
  191. Breakdown: &ClusterCostsBreakdown{},
  192. Local: true,
  193. }
  194. }
  195. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  196. }
  197. for _, result := range resLocalStorageBytes {
  198. cluster, err := result.GetString(env.GetPromClusterLabel())
  199. if err != nil {
  200. cluster = env.GetClusterID()
  201. }
  202. name, err := result.GetString("instance")
  203. if err != nil {
  204. log.Warningf("ClusterDisks: local storage data missing instance")
  205. continue
  206. }
  207. bytes := result.Values[0].Value
  208. key := DiskIdentifier{cluster, name}
  209. if _, ok := diskMap[key]; !ok {
  210. diskMap[key] = &Disk{
  211. Cluster: cluster,
  212. Name: name,
  213. Breakdown: &ClusterCostsBreakdown{},
  214. Local: true,
  215. }
  216. }
  217. diskMap[key].Bytes = bytes
  218. if bytes/1024/1024/1024 > maxLocalDiskSize {
  219. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  220. delete(diskMap, key)
  221. }
  222. }
  223. for _, result := range resLocalActiveMins {
  224. cluster, err := result.GetString(env.GetPromClusterLabel())
  225. if err != nil {
  226. cluster = env.GetClusterID()
  227. }
  228. name, err := result.GetString("node")
  229. if err != nil {
  230. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  231. continue
  232. }
  233. key := DiskIdentifier{cluster, name}
  234. if _, ok := diskMap[key]; !ok {
  235. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  236. continue
  237. }
  238. if len(result.Values) == 0 {
  239. continue
  240. }
  241. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  242. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  243. mins := e.Sub(s).Minutes()
  244. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  245. diskMap[key].End = e
  246. diskMap[key].Start = s
  247. diskMap[key].Minutes = mins
  248. }
  249. for _, disk := range diskMap {
  250. // Apply all remaining RAM to Idle
  251. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  252. // Set provider Id to the name for reconciliation on Azure
  253. if fmt.Sprintf("%T", provider) == "*provider.Azure" {
  254. if disk.ProviderID == "" {
  255. disk.ProviderID = disk.Name
  256. }
  257. }
  258. }
  259. return diskMap, nil
  260. }
  261. type Node struct {
  262. Cluster string
  263. Name string
  264. ProviderID string
  265. NodeType string
  266. CPUCost float64
  267. CPUCores float64
  268. GPUCost float64
  269. GPUCount float64
  270. RAMCost float64
  271. RAMBytes float64
  272. Discount float64
  273. Preemptible bool
  274. CPUBreakdown *ClusterCostsBreakdown
  275. RAMBreakdown *ClusterCostsBreakdown
  276. Start time.Time
  277. End time.Time
  278. Minutes float64
  279. Labels map[string]string
  280. CostPerCPUHr float64
  281. CostPerRAMGiBHr float64
  282. CostPerGPUHr float64
  283. }
  284. // GKE lies about the number of cores e2 nodes have. This table
  285. // contains a mapping from node type -> actual CPU cores
  286. // for those cases.
  287. var partialCPUMap = map[string]float64{
  288. "e2-micro": 0.25,
  289. "e2-small": 0.5,
  290. "e2-medium": 1.0,
  291. }
  292. type NodeIdentifier struct {
  293. Cluster string
  294. Name string
  295. ProviderID string
  296. }
  297. type nodeIdentifierNoProviderID struct {
  298. Cluster string
  299. Name string
  300. }
  301. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  302. for k, v := range activeDataMap {
  303. keyNon := nodeIdentifierNoProviderID{
  304. Cluster: k.Cluster,
  305. Name: k.Name,
  306. }
  307. if cost, ok := costMap[k]; ok {
  308. minutes := v.minutes
  309. count := 1.0
  310. if c, ok := resourceCountMap[keyNon]; ok {
  311. count = c
  312. }
  313. costMap[k] = cost * (minutes / 60) * count
  314. }
  315. }
  316. }
  317. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  318. for k, v := range activeDataMap {
  319. if cost, ok := costMap[k]; ok {
  320. minutes := v.minutes
  321. costMap[k] = cost * (minutes / 60)
  322. }
  323. }
  324. }
  325. func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
  326. // Query for the duration between start and end
  327. durStr := timeutil.DurationString(end.Sub(start))
  328. if durStr == "" {
  329. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  330. }
  331. // Start from the time "end", querying backwards
  332. t := end
  333. // minsPerResolution determines accuracy and resource use for the following
  334. // queries. Smaller values (higher resolution) result in better accuracy,
  335. // but more expensive queries, and vice-a-versa.
  336. minsPerResolution := 1
  337. resolution := time.Duration(minsPerResolution) * time.Minute
  338. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  339. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  340. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  341. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  342. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
  343. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
  344. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
  345. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
  346. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
  347. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  348. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  349. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  350. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
  351. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
  352. // Return errors if these fail
  353. resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
  354. resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
  355. resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
  356. resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
  357. resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
  358. resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
  359. resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
  360. resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
  361. // Do not return errors if these fail, but log warnings
  362. resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
  363. resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
  364. resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
  365. resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
  366. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  367. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  368. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  369. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  370. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  371. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  372. resIsSpot, _ := resChIsSpot.Await()
  373. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  374. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  375. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  376. resActiveMins, _ := resChActiveMins.Await()
  377. resLabels, _ := resChLabels.Await()
  378. if optionalCtx.HasErrors() {
  379. for _, err := range optionalCtx.Errors() {
  380. log.Warningf("ClusterNodes: %s", err)
  381. }
  382. }
  383. if requiredCtx.HasErrors() {
  384. for _, err := range requiredCtx.Errors() {
  385. log.Errorf("ClusterNodes: %s", err)
  386. }
  387. return nil, requiredCtx.ErrorCollection()
  388. }
  389. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  390. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  391. preemptibleMap := buildPreemptibleMap(resIsSpot)
  392. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  393. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  394. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  395. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  396. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  397. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  398. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  399. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  400. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  401. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  402. labelsMap := buildLabelsMap(resLabels)
  403. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  404. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  405. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  406. nodeMap := buildNodeMap(
  407. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  408. cpuCoresMap, ramBytesMap, ramUserPctMap,
  409. ramSystemPctMap,
  410. cpuBreakdownMap,
  411. activeDataMap,
  412. preemptibleMap,
  413. labelsMap,
  414. clusterAndNameToType,
  415. resolution,
  416. )
  417. c, err := cp.GetConfig()
  418. if err != nil {
  419. return nil, err
  420. }
  421. discount, err := ParsePercentString(c.Discount)
  422. if err != nil {
  423. return nil, err
  424. }
  425. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  426. if err != nil {
  427. return nil, err
  428. }
  429. for _, node := range nodeMap {
  430. // TODO take GKE Reserved Instances into account
  431. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  432. // Apply all remaining resources to Idle
  433. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  434. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  435. }
  436. return nodeMap, nil
  437. }
  438. type LoadBalancerIdentifier struct {
  439. Cluster string
  440. Namespace string
  441. Name string
  442. }
  443. type LoadBalancer struct {
  444. Cluster string
  445. Namespace string
  446. Name string
  447. ProviderID string
  448. Cost float64
  449. Start time.Time
  450. End time.Time
  451. Minutes float64
  452. }
  453. func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
  454. // Query for the duration between start and end
  455. durStr := timeutil.DurationString(end.Sub(start))
  456. if durStr == "" {
  457. return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  458. }
  459. // Start from the time "end", querying backwards
  460. t := end
  461. // minsPerResolution determines accuracy and resource use for the following
  462. // queries. Smaller values (higher resolution) result in better accuracy,
  463. // but more expensive queries, and vice-a-versa.
  464. minsPerResolution := 1
  465. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  466. queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
  467. queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
  468. resChLBCost := ctx.QueryAtTime(queryLBCost, t)
  469. resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
  470. resLBCost, _ := resChLBCost.Await()
  471. resActiveMins, _ := resChActiveMins.Await()
  472. if ctx.HasErrors() {
  473. return nil, ctx.ErrorCollection()
  474. }
  475. loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
  476. for _, result := range resActiveMins {
  477. cluster, err := result.GetString(env.GetPromClusterLabel())
  478. if err != nil {
  479. cluster = env.GetClusterID()
  480. }
  481. namespace, err := result.GetString("namespace")
  482. if err != nil {
  483. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  484. continue
  485. }
  486. name, err := result.GetString("service_name")
  487. if err != nil {
  488. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  489. continue
  490. }
  491. providerID, err := result.GetString("ingress_ip")
  492. if err != nil {
  493. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  494. providerID = ""
  495. }
  496. key := LoadBalancerIdentifier{
  497. Cluster: cluster,
  498. Namespace: namespace,
  499. Name: name,
  500. }
  501. // Skip if there are no data
  502. if len(result.Values) == 0 {
  503. continue
  504. }
  505. // Add load balancer to the set of load balancers
  506. if _, ok := loadBalancerMap[key]; !ok {
  507. loadBalancerMap[key] = &LoadBalancer{
  508. Cluster: cluster,
  509. Namespace: namespace,
  510. Name: fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
  511. ProviderID: cloud.ParseLBID(providerID),
  512. }
  513. }
  514. // Append start, end, and minutes. This should come before all other data.
  515. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  516. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  517. loadBalancerMap[key].Start = s
  518. loadBalancerMap[key].End = e
  519. loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
  520. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  521. // Prevents there from being a duplicate LoadBalancers on the same day
  522. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  523. loadBalancerMap[key].ProviderID = providerID
  524. }
  525. }
  526. for _, result := range resLBCost {
  527. cluster, err := result.GetString(env.GetPromClusterLabel())
  528. if err != nil {
  529. cluster = env.GetClusterID()
  530. }
  531. namespace, err := result.GetString("namespace")
  532. if err != nil {
  533. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  534. continue
  535. }
  536. name, err := result.GetString("service_name")
  537. if err != nil {
  538. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  539. continue
  540. }
  541. key := LoadBalancerIdentifier{
  542. Cluster: cluster,
  543. Namespace: namespace,
  544. Name: name,
  545. }
  546. // Apply cost as price-per-hour * hours
  547. if lb, ok := loadBalancerMap[key]; ok {
  548. lbPricePerHr := result.Values[0].Value
  549. hrs := lb.Minutes / 60.0
  550. lb.Cost += lbPricePerHr * hrs
  551. } else {
  552. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  553. }
  554. }
  555. return loadBalancerMap, nil
  556. }
  557. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  558. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  559. if window < 10*time.Minute {
  560. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  561. }
  562. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  563. start, end := timeutil.ParseTimeRange(window, offset)
  564. mins := end.Sub(start).Minutes()
  565. windowStr := timeutil.DurationString(window)
  566. // minsPerResolution determines accuracy and resource use for the following
  567. // queries. Smaller values (higher resolution) result in better accuracy,
  568. // but more expensive queries, and vice-a-versa.
  569. minsPerResolution := 5
  570. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  571. // value, converts it to a cumulative value; i.e.
  572. // [$/hr] * [min/res]*[hr/min] = [$/res]
  573. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  574. const fmtQueryDataCount = `
  575. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  576. `
  577. const fmtQueryTotalGPU = `
  578. sum(
  579. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  580. ) by (%s)
  581. `
  582. const fmtQueryTotalCPU = `
  583. sum(
  584. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  585. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  586. ) by (%s)
  587. `
  588. const fmtQueryTotalRAM = `
  589. sum(
  590. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  591. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  592. ) by (%s)
  593. `
  594. const fmtQueryTotalStorage = `
  595. sum(
  596. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  597. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  598. ) by (%s)
  599. `
  600. const fmtQueryCPUModePct = `
  601. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  602. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  603. `
  604. const fmtQueryRAMSystemPct = `
  605. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  606. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  607. `
  608. const fmtQueryRAMUserPct = `
  609. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  610. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  611. `
  612. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  613. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  614. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  615. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  616. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  617. if queryTotalLocalStorage != "" {
  618. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  619. }
  620. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  621. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  622. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  623. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  624. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  625. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  626. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  627. resChs := ctx.QueryAll(
  628. queryDataCount,
  629. queryTotalGPU,
  630. queryTotalCPU,
  631. queryTotalRAM,
  632. queryTotalStorage,
  633. )
  634. // Only submit the local storage query if it is valid. Otherwise Prometheus
  635. // will return errors. Always append something to resChs, regardless, to
  636. // maintain indexing.
  637. if queryTotalLocalStorage != "" {
  638. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  639. } else {
  640. resChs = append(resChs, nil)
  641. }
  642. if withBreakdown {
  643. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  644. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  645. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  646. bdResChs := ctx.QueryAll(
  647. queryCPUModePct,
  648. queryRAMSystemPct,
  649. queryRAMUserPct,
  650. )
  651. // Only submit the local storage query if it is valid. Otherwise Prometheus
  652. // will return errors. Always append something to resChs, regardless, to
  653. // maintain indexing.
  654. if queryUsedLocalStorage != "" {
  655. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  656. } else {
  657. bdResChs = append(bdResChs, nil)
  658. }
  659. resChs = append(resChs, bdResChs...)
  660. }
  661. resDataCount, _ := resChs[0].Await()
  662. resTotalGPU, _ := resChs[1].Await()
  663. resTotalCPU, _ := resChs[2].Await()
  664. resTotalRAM, _ := resChs[3].Await()
  665. resTotalStorage, _ := resChs[4].Await()
  666. if ctx.HasErrors() {
  667. return nil, ctx.ErrorCollection()
  668. }
  669. defaultClusterID := env.GetClusterID()
  670. dataMinsByCluster := map[string]float64{}
  671. for _, result := range resDataCount {
  672. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  673. if clusterID == "" {
  674. clusterID = defaultClusterID
  675. }
  676. dataMins := mins
  677. if len(result.Values) > 0 {
  678. dataMins = result.Values[0].Value
  679. } else {
  680. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  681. }
  682. dataMinsByCluster[clusterID] = dataMins
  683. }
  684. // Determine combined discount
  685. discount, customDiscount := 0.0, 0.0
  686. c, err := a.CloudProvider.GetConfig()
  687. if err == nil {
  688. discount, err = ParsePercentString(c.Discount)
  689. if err != nil {
  690. discount = 0.0
  691. }
  692. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  693. if err != nil {
  694. customDiscount = 0.0
  695. }
  696. }
  697. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  698. costData := make(map[string]map[string]float64)
  699. // Helper function to iterate over Prom query results, parsing the raw values into
  700. // the intermediate costData structure.
  701. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  702. for _, result := range results {
  703. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  704. if clusterID == "" {
  705. clusterID = defaultClusterID
  706. }
  707. if _, ok := costData[clusterID]; !ok {
  708. costData[clusterID] = map[string]float64{}
  709. }
  710. if len(result.Values) > 0 {
  711. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  712. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  713. }
  714. }
  715. }
  716. // Apply both sustained use and custom discounts to RAM and CPU
  717. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  718. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  719. // Apply only custom discount to GPU and storage
  720. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  721. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  722. if queryTotalLocalStorage != "" {
  723. resTotalLocalStorage, err := resChs[5].Await()
  724. if err != nil {
  725. return nil, err
  726. }
  727. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  728. }
  729. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  730. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  731. pvUsedCostMap := map[string]float64{}
  732. if withBreakdown {
  733. resCPUModePct, _ := resChs[6].Await()
  734. resRAMSystemPct, _ := resChs[7].Await()
  735. resRAMUserPct, _ := resChs[8].Await()
  736. if ctx.HasErrors() {
  737. return nil, ctx.ErrorCollection()
  738. }
  739. for _, result := range resCPUModePct {
  740. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  741. if clusterID == "" {
  742. clusterID = defaultClusterID
  743. }
  744. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  745. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  746. }
  747. cpuBD := cpuBreakdownMap[clusterID]
  748. mode, err := result.GetString("mode")
  749. if err != nil {
  750. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  751. mode = "other"
  752. }
  753. switch mode {
  754. case "idle":
  755. cpuBD.Idle += result.Values[0].Value
  756. case "system":
  757. cpuBD.System += result.Values[0].Value
  758. case "user":
  759. cpuBD.User += result.Values[0].Value
  760. default:
  761. cpuBD.Other += result.Values[0].Value
  762. }
  763. }
  764. for _, result := range resRAMSystemPct {
  765. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  766. if clusterID == "" {
  767. clusterID = defaultClusterID
  768. }
  769. if _, ok := ramBreakdownMap[clusterID]; !ok {
  770. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  771. }
  772. ramBD := ramBreakdownMap[clusterID]
  773. ramBD.System += result.Values[0].Value
  774. }
  775. for _, result := range resRAMUserPct {
  776. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  777. if clusterID == "" {
  778. clusterID = defaultClusterID
  779. }
  780. if _, ok := ramBreakdownMap[clusterID]; !ok {
  781. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  782. }
  783. ramBD := ramBreakdownMap[clusterID]
  784. ramBD.User += result.Values[0].Value
  785. }
  786. for _, ramBD := range ramBreakdownMap {
  787. remaining := 1.0
  788. remaining -= ramBD.Other
  789. remaining -= ramBD.System
  790. remaining -= ramBD.User
  791. ramBD.Idle = remaining
  792. }
  793. if queryUsedLocalStorage != "" {
  794. resUsedLocalStorage, err := resChs[9].Await()
  795. if err != nil {
  796. return nil, err
  797. }
  798. for _, result := range resUsedLocalStorage {
  799. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  800. if clusterID == "" {
  801. clusterID = defaultClusterID
  802. }
  803. pvUsedCostMap[clusterID] += result.Values[0].Value
  804. }
  805. }
  806. }
  807. if ctx.HasErrors() {
  808. for _, err := range ctx.Errors() {
  809. log.Errorf("ComputeClusterCosts: %s", err)
  810. }
  811. return nil, ctx.ErrorCollection()
  812. }
  813. // Convert intermediate structure to Costs instances
  814. costsByCluster := map[string]*ClusterCosts{}
  815. for id, cd := range costData {
  816. dataMins, ok := dataMinsByCluster[id]
  817. if !ok {
  818. dataMins = mins
  819. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  820. }
  821. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  822. if err != nil {
  823. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  824. return nil, err
  825. }
  826. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  827. costs.CPUBreakdown = cpuBD
  828. }
  829. if ramBD, ok := ramBreakdownMap[id]; ok {
  830. costs.RAMBreakdown = ramBD
  831. }
  832. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  833. if pvUC, ok := pvUsedCostMap[id]; ok {
  834. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  835. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  836. }
  837. costs.DataMinutes = dataMins
  838. costsByCluster[id] = costs
  839. }
  840. return costsByCluster, nil
  841. }
  842. type Totals struct {
  843. TotalCost [][]string `json:"totalcost"`
  844. CPUCost [][]string `json:"cpucost"`
  845. MemCost [][]string `json:"memcost"`
  846. StorageCost [][]string `json:"storageCost"`
  847. }
  848. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  849. if len(qrs) == 0 {
  850. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  851. }
  852. result := qrs[0]
  853. totals := [][]string{}
  854. for _, value := range result.Values {
  855. d0 := fmt.Sprintf("%f", value.Timestamp)
  856. d1 := fmt.Sprintf("%f", value.Value)
  857. toAppend := []string{
  858. d0,
  859. d1,
  860. }
  861. totals = append(totals, toAppend)
  862. }
  863. return totals, nil
  864. }
  865. // ClusterCostsOverTime gives the full cluster costs over time
  866. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  867. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  868. if localStorageQuery != "" {
  869. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  870. }
  871. layout := "2006-01-02T15:04:05.000Z"
  872. start, err := time.Parse(layout, startString)
  873. if err != nil {
  874. klog.V(1).Infof("Error parsing time %s. Error: %s", startString, err.Error())
  875. return nil, err
  876. }
  877. end, err := time.Parse(layout, endString)
  878. if err != nil {
  879. klog.V(1).Infof("Error parsing time %s. Error: %s", endString, err.Error())
  880. return nil, err
  881. }
  882. fmtWindow := timeutil.DurationString(window)
  883. if fmtWindow == "" {
  884. err := fmt.Errorf("window value invalid or missing")
  885. klog.V(1).Infof("Error parsing time %v. Error: %s", window, err.Error())
  886. return nil, err
  887. }
  888. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  889. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  890. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  891. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  892. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  893. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  894. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  895. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  896. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  897. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  898. resultClusterCores, err := resChClusterCores.Await()
  899. if err != nil {
  900. return nil, err
  901. }
  902. resultClusterRAM, err := resChClusterRAM.Await()
  903. if err != nil {
  904. return nil, err
  905. }
  906. resultStorage, err := resChStorage.Await()
  907. if err != nil {
  908. return nil, err
  909. }
  910. resultTotal, err := resChTotal.Await()
  911. if err != nil {
  912. return nil, err
  913. }
  914. coreTotal, err := resultToTotals(resultClusterCores)
  915. if err != nil {
  916. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  917. return nil, err
  918. }
  919. ramTotal, err := resultToTotals(resultClusterRAM)
  920. if err != nil {
  921. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  922. return nil, err
  923. }
  924. storageTotal, err := resultToTotals(resultStorage)
  925. if err != nil {
  926. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  927. }
  928. clusterTotal, err := resultToTotals(resultTotal)
  929. if err != nil {
  930. // If clusterTotal query failed, it's likely because there are no PVs, which
  931. // causes the qTotal query to return no data. Instead, query only node costs.
  932. // If that fails, return an error because something is actually wrong.
  933. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  934. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  935. for _, warning := range warnings {
  936. log.Warningf(warning)
  937. }
  938. if err != nil {
  939. return nil, err
  940. }
  941. clusterTotal, err = resultToTotals(resultNodes)
  942. if err != nil {
  943. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  944. return nil, err
  945. }
  946. }
  947. return &Totals{
  948. TotalCost: clusterTotal,
  949. CPUCost: coreTotal,
  950. MemCost: ramTotal,
  951. StorageCost: storageTotal,
  952. }, nil
  953. }
  954. func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
  955. for _, result := range resActiveMins {
  956. cluster, err := result.GetString(env.GetPromClusterLabel())
  957. if err != nil {
  958. cluster = env.GetClusterID()
  959. }
  960. name, err := result.GetString("persistentvolume")
  961. if err != nil {
  962. log.Warningf("ClusterDisks: active mins missing pv name")
  963. continue
  964. }
  965. if len(result.Values) == 0 {
  966. continue
  967. }
  968. key := DiskIdentifier{cluster, name}
  969. if _, ok := diskMap[key]; !ok {
  970. diskMap[key] = &Disk{
  971. Cluster: cluster,
  972. Name: name,
  973. Breakdown: &ClusterCostsBreakdown{},
  974. }
  975. }
  976. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  977. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  978. mins := e.Sub(s).Minutes()
  979. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  980. diskMap[key].End = e
  981. diskMap[key].Start = s
  982. diskMap[key].Minutes = mins
  983. }
  984. for _, result := range resPVSize {
  985. cluster, err := result.GetString(env.GetPromClusterLabel())
  986. if err != nil {
  987. cluster = env.GetClusterID()
  988. }
  989. name, err := result.GetString("persistentvolume")
  990. if err != nil {
  991. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  992. continue
  993. }
  994. // TODO niko/assets storage class
  995. bytes := result.Values[0].Value
  996. key := DiskIdentifier{cluster, name}
  997. if _, ok := diskMap[key]; !ok {
  998. diskMap[key] = &Disk{
  999. Cluster: cluster,
  1000. Name: name,
  1001. Breakdown: &ClusterCostsBreakdown{},
  1002. }
  1003. }
  1004. diskMap[key].Bytes = bytes
  1005. }
  1006. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  1007. customPricingConfig, err := cp.GetConfig()
  1008. if err != nil {
  1009. log.Warningf("ClusterDisks: failed to load custom pricing: %s", err)
  1010. }
  1011. for _, result := range resPVCost {
  1012. cluster, err := result.GetString(env.GetPromClusterLabel())
  1013. if err != nil {
  1014. cluster = env.GetClusterID()
  1015. }
  1016. name, err := result.GetString("persistentvolume")
  1017. if err != nil {
  1018. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  1019. continue
  1020. }
  1021. // TODO niko/assets storage class
  1022. var cost float64
  1023. if customPricingEnabled && customPricingConfig != nil {
  1024. customPVCostStr := customPricingConfig.Storage
  1025. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  1026. if err != nil {
  1027. log.Warningf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  1028. }
  1029. cost = customPVCost
  1030. } else {
  1031. cost = result.Values[0].Value
  1032. }
  1033. key := DiskIdentifier{cluster, name}
  1034. if _, ok := diskMap[key]; !ok {
  1035. diskMap[key] = &Disk{
  1036. Cluster: cluster,
  1037. Name: name,
  1038. Breakdown: &ClusterCostsBreakdown{},
  1039. }
  1040. }
  1041. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1042. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1043. if providerID != "" {
  1044. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1045. }
  1046. }
  1047. }