cluster.go 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
  1. package costmodel
  2. import (
  3. "fmt"
  4. "strconv"
  5. "time"
  6. "github.com/kubecost/cost-model/pkg/util/timeutil"
  7. "github.com/kubecost/cost-model/pkg/cloud"
  8. "github.com/kubecost/cost-model/pkg/env"
  9. "github.com/kubecost/cost-model/pkg/log"
  10. "github.com/kubecost/cost-model/pkg/prom"
  11. prometheus "github.com/prometheus/client_golang/api"
  12. "k8s.io/klog"
  13. )
  14. const (
  15. queryClusterCores = `sum(
  16. avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
  17. avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
  18. ) by (%s)`
  19. queryClusterRAM = `sum(
  20. avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
  21. ) by (%s)`
  22. queryStorage = `sum(
  23. avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
  24. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
  25. ) by (%s) %s`
  26. queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
  27. sum(
  28. avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
  29. * avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
  30. ) by (%s) %s`
  31. queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
  32. )
  33. const maxLocalDiskSize = 200 // AWS limits root disks to 100 Gi, and occasional metric errors in filesystem size should not contribute to large costs.
  34. // Costs represents cumulative and monthly cluster costs over a given duration. Costs
  35. // are broken down by cores, memory, and storage.
  36. type ClusterCosts struct {
  37. Start *time.Time `json:"startTime"`
  38. End *time.Time `json:"endTime"`
  39. CPUCumulative float64 `json:"cpuCumulativeCost"`
  40. CPUMonthly float64 `json:"cpuMonthlyCost"`
  41. CPUBreakdown *ClusterCostsBreakdown `json:"cpuBreakdown"`
  42. GPUCumulative float64 `json:"gpuCumulativeCost"`
  43. GPUMonthly float64 `json:"gpuMonthlyCost"`
  44. RAMCumulative float64 `json:"ramCumulativeCost"`
  45. RAMMonthly float64 `json:"ramMonthlyCost"`
  46. RAMBreakdown *ClusterCostsBreakdown `json:"ramBreakdown"`
  47. StorageCumulative float64 `json:"storageCumulativeCost"`
  48. StorageMonthly float64 `json:"storageMonthlyCost"`
  49. StorageBreakdown *ClusterCostsBreakdown `json:"storageBreakdown"`
  50. TotalCumulative float64 `json:"totalCumulativeCost"`
  51. TotalMonthly float64 `json:"totalMonthlyCost"`
  52. DataMinutes float64
  53. }
  54. // ClusterCostsBreakdown provides percentage-based breakdown of a resource by
  55. // categories: user for user-space (i.e. non-system) usage, system, and idle.
  56. type ClusterCostsBreakdown struct {
  57. Idle float64 `json:"idle"`
  58. Other float64 `json:"other"`
  59. System float64 `json:"system"`
  60. User float64 `json:"user"`
  61. }
  62. // NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
  63. // the associated monthly rate data, and returns the Costs.
  64. func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset time.Duration, dataHours float64) (*ClusterCosts, error) {
  65. start, end := timeutil.ParseTimeRange(window, offset)
  66. // If the number of hours is not given (i.e. is zero) compute one from the window and offset
  67. if dataHours == 0 {
  68. dataHours = end.Sub(start).Hours()
  69. }
  70. // Do not allow zero-length windows to prevent divide-by-zero issues
  71. if dataHours == 0 {
  72. return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
  73. }
  74. cc := &ClusterCosts{
  75. Start: &start,
  76. End: &end,
  77. CPUCumulative: cpu,
  78. GPUCumulative: gpu,
  79. RAMCumulative: ram,
  80. StorageCumulative: storage,
  81. TotalCumulative: cpu + gpu + ram + storage,
  82. CPUMonthly: cpu / dataHours * (timeutil.HoursPerMonth),
  83. GPUMonthly: gpu / dataHours * (timeutil.HoursPerMonth),
  84. RAMMonthly: ram / dataHours * (timeutil.HoursPerMonth),
  85. StorageMonthly: storage / dataHours * (timeutil.HoursPerMonth),
  86. }
  87. cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
  88. return cc, nil
  89. }
  90. type Disk struct {
  91. Cluster string
  92. Name string
  93. ProviderID string
  94. Cost float64
  95. Bytes float64
  96. Local bool
  97. Start time.Time
  98. End time.Time
  99. Minutes float64
  100. Breakdown *ClusterCostsBreakdown
  101. }
  102. func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
  103. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  104. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  105. if offset < time.Minute {
  106. offsetStr = ""
  107. }
  108. // minsPerResolution determines accuracy and resource use for the following
  109. // queries. Smaller values (higher resolution) result in better accuracy,
  110. // but more expensive queries, and vice-a-versa.
  111. minsPerResolution := 1
  112. resolution := time.Duration(minsPerResolution) * time.Minute
  113. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  114. // value, converts it to a cumulative value; i.e.
  115. // [$/hr] * [min/res]*[hr/min] = [$/res]
  116. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  117. // TODO niko/assets how do we not hard-code this price?
  118. costPerGBHr := 0.04 / 730.0
  119. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  120. queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s]%s)) by (%s, persistentvolume,provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  121. queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (%s, persistentvolume)`, durationStr, offsetStr, env.GetPromClusterLabel())
  122. queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  123. queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  124. queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
  125. queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s)`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  126. queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  127. resChPVCost := ctx.Query(queryPVCost)
  128. resChPVSize := ctx.Query(queryPVSize)
  129. resChActiveMins := ctx.Query(queryActiveMins)
  130. resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
  131. resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
  132. resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
  133. resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
  134. resPVCost, _ := resChPVCost.Await()
  135. resPVSize, _ := resChPVSize.Await()
  136. resActiveMins, _ := resChActiveMins.Await()
  137. resLocalStorageCost, _ := resChLocalStorageCost.Await()
  138. resLocalStorageUsedCost, _ := resChLocalStorageUsedCost.Await()
  139. resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
  140. resLocalActiveMins, _ := resChLocalActiveMins.Await()
  141. if ctx.HasErrors() {
  142. return nil, ctx.ErrorCollection()
  143. }
  144. diskMap := map[string]*Disk{}
  145. pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
  146. for _, result := range resLocalStorageCost {
  147. cluster, err := result.GetString(env.GetPromClusterLabel())
  148. if err != nil {
  149. cluster = env.GetClusterID()
  150. }
  151. name, err := result.GetString("instance")
  152. if err != nil {
  153. log.Warningf("ClusterDisks: local storage data missing instance")
  154. continue
  155. }
  156. cost := result.Values[0].Value
  157. key := fmt.Sprintf("%s/%s", cluster, name)
  158. if _, ok := diskMap[key]; !ok {
  159. diskMap[key] = &Disk{
  160. Cluster: cluster,
  161. Name: name,
  162. Breakdown: &ClusterCostsBreakdown{},
  163. Local: true,
  164. }
  165. }
  166. diskMap[key].Cost += cost
  167. }
  168. for _, result := range resLocalStorageUsedCost {
  169. cluster, err := result.GetString(env.GetPromClusterLabel())
  170. if err != nil {
  171. cluster = env.GetClusterID()
  172. }
  173. name, err := result.GetString("instance")
  174. if err != nil {
  175. log.Warningf("ClusterDisks: local storage usage data missing instance")
  176. continue
  177. }
  178. cost := result.Values[0].Value
  179. key := fmt.Sprintf("%s/%s", cluster, name)
  180. if _, ok := diskMap[key]; !ok {
  181. diskMap[key] = &Disk{
  182. Cluster: cluster,
  183. Name: name,
  184. Breakdown: &ClusterCostsBreakdown{},
  185. Local: true,
  186. }
  187. }
  188. diskMap[key].Breakdown.System = cost / diskMap[key].Cost
  189. }
  190. for _, result := range resLocalStorageBytes {
  191. cluster, err := result.GetString(env.GetPromClusterLabel())
  192. if err != nil {
  193. cluster = env.GetClusterID()
  194. }
  195. name, err := result.GetString("instance")
  196. if err != nil {
  197. log.Warningf("ClusterDisks: local storage data missing instance")
  198. continue
  199. }
  200. bytes := result.Values[0].Value
  201. key := fmt.Sprintf("%s/%s", cluster, name)
  202. if _, ok := diskMap[key]; !ok {
  203. diskMap[key] = &Disk{
  204. Cluster: cluster,
  205. Name: name,
  206. Breakdown: &ClusterCostsBreakdown{},
  207. Local: true,
  208. }
  209. }
  210. diskMap[key].Bytes = bytes
  211. if bytes/1024/1024/1024 > maxLocalDiskSize {
  212. log.DedupedWarningf(5, "Deleting large root disk/localstorage disk from analysis")
  213. delete(diskMap, key)
  214. }
  215. }
  216. for _, result := range resLocalActiveMins {
  217. cluster, err := result.GetString(env.GetPromClusterLabel())
  218. if err != nil {
  219. cluster = env.GetClusterID()
  220. }
  221. name, err := result.GetString("node")
  222. if err != nil {
  223. log.DedupedWarningf(5, "ClusterDisks: local active mins data missing instance")
  224. continue
  225. }
  226. key := fmt.Sprintf("%s/%s", cluster, name)
  227. if _, ok := diskMap[key]; !ok {
  228. log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
  229. continue
  230. }
  231. if len(result.Values) == 0 {
  232. continue
  233. }
  234. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  235. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  236. mins := e.Sub(s).Minutes()
  237. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  238. diskMap[key].End = e
  239. diskMap[key].Start = s
  240. diskMap[key].Minutes = mins
  241. }
  242. for _, disk := range diskMap {
  243. // Apply all remaining RAM to Idle
  244. disk.Breakdown.Idle = 1.0 - (disk.Breakdown.System + disk.Breakdown.Other + disk.Breakdown.User)
  245. // Set provider Id to the name for reconciliation on Azure
  246. if fmt.Sprintf("%T", provider) == "*provider.Azure" {
  247. if disk.ProviderID == "" {
  248. disk.ProviderID = disk.Name
  249. }
  250. }
  251. }
  252. return diskMap, nil
  253. }
  254. type Node struct {
  255. Cluster string
  256. Name string
  257. ProviderID string
  258. NodeType string
  259. CPUCost float64
  260. CPUCores float64
  261. GPUCost float64
  262. GPUCount float64
  263. RAMCost float64
  264. RAMBytes float64
  265. Discount float64
  266. Preemptible bool
  267. CPUBreakdown *ClusterCostsBreakdown
  268. RAMBreakdown *ClusterCostsBreakdown
  269. Start time.Time
  270. End time.Time
  271. Minutes float64
  272. Labels map[string]string
  273. CostPerCPUHr float64
  274. CostPerRAMGiBHr float64
  275. CostPerGPUHr float64
  276. }
  277. // GKE lies about the number of cores e2 nodes have. This table
  278. // contains a mapping from node type -> actual CPU cores
  279. // for those cases.
  280. var partialCPUMap = map[string]float64{
  281. "e2-micro": 0.25,
  282. "e2-small": 0.5,
  283. "e2-medium": 1.0,
  284. }
  285. type NodeIdentifier struct {
  286. Cluster string
  287. Name string
  288. ProviderID string
  289. }
  290. type nodeIdentifierNoProviderID struct {
  291. Cluster string
  292. Name string
  293. }
  294. func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
  295. for k, v := range activeDataMap {
  296. keyNon := nodeIdentifierNoProviderID{
  297. Cluster: k.Cluster,
  298. Name: k.Name,
  299. }
  300. if cost, ok := costMap[k]; ok {
  301. minutes := v.minutes
  302. count := 1.0
  303. if c, ok := resourceCountMap[keyNon]; ok {
  304. count = c
  305. }
  306. costMap[k] = cost * (minutes / 60) * count
  307. }
  308. }
  309. }
  310. func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
  311. for k, v := range activeDataMap {
  312. if cost, ok := costMap[k]; ok {
  313. minutes := v.minutes
  314. costMap[k] = cost * (minutes / 60)
  315. }
  316. }
  317. }
  318. func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[NodeIdentifier]*Node, error) {
  319. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  320. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  321. if offset < time.Minute {
  322. offsetStr = ""
  323. }
  324. // minsPerResolution determines accuracy and resource use for the following
  325. // queries. Smaller values (higher resolution) result in better accuracy,
  326. // but more expensive queries, and vice-a-versa.
  327. minsPerResolution := 1
  328. resolution := time.Duration(minsPerResolution) * time.Minute
  329. requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
  330. optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
  331. queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  332. queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  333. queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr, env.GetPromClusterLabel())
  334. queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
  335. queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (%s, node, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  336. queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
  337. queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, %s, mode)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel())
  338. queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  339. queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  340. queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  341. queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  342. queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
  343. // Return errors if these fail
  344. resChNodeCPUHourlyCost := requiredCtx.Query(queryNodeCPUHourlyCost)
  345. resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
  346. resChNodeRAMHourlyCost := requiredCtx.Query(queryNodeRAMHourlyCost)
  347. resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
  348. resChNodeGPUCount := requiredCtx.Query(queryNodeGPUCount)
  349. resChNodeGPUHourlyCost := requiredCtx.Query(queryNodeGPUHourlyCost)
  350. resChActiveMins := requiredCtx.Query(queryActiveMins)
  351. resChIsSpot := requiredCtx.Query(queryIsSpot)
  352. // Do not return errors if these fail, but log warnings
  353. resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
  354. resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
  355. resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
  356. resChLabels := optionalCtx.Query(queryLabels)
  357. resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
  358. resNodeCPUCores, _ := resChNodeCPUCores.Await()
  359. resNodeGPUCount, _ := resChNodeGPUCount.Await()
  360. resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
  361. resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
  362. resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
  363. resIsSpot, _ := resChIsSpot.Await()
  364. resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
  365. resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
  366. resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
  367. resActiveMins, _ := resChActiveMins.Await()
  368. resLabels, _ := resChLabels.Await()
  369. if optionalCtx.HasErrors() {
  370. for _, err := range optionalCtx.Errors() {
  371. log.Warningf("ClusterNodes: %s", err)
  372. }
  373. }
  374. if requiredCtx.HasErrors() {
  375. for _, err := range requiredCtx.Errors() {
  376. log.Errorf("ClusterNodes: %s", err)
  377. }
  378. return nil, requiredCtx.ErrorCollection()
  379. }
  380. activeDataMap := buildActiveDataMap(resActiveMins, resolution)
  381. gpuCountMap := buildGPUCountMap(resNodeGPUCount)
  382. preemptibleMap := buildPreemptibleMap(resIsSpot)
  383. cpuCostMap, clusterAndNameToType1 := buildCPUCostMap(resNodeCPUHourlyCost, cp, preemptibleMap)
  384. ramCostMap, clusterAndNameToType2 := buildRAMCostMap(resNodeRAMHourlyCost, cp, preemptibleMap)
  385. gpuCostMap, clusterAndNameToType3 := buildGPUCostMap(resNodeGPUHourlyCost, gpuCountMap, cp, preemptibleMap)
  386. clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
  387. clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
  388. cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
  389. ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
  390. ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
  391. ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
  392. cpuBreakdownMap := buildCPUBreakdownMap(resNodeCPUModeTotal)
  393. labelsMap := buildLabelsMap(resLabels)
  394. costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
  395. costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
  396. costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
  397. nodeMap := buildNodeMap(
  398. cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
  399. cpuCoresMap, ramBytesMap, ramUserPctMap,
  400. ramSystemPctMap,
  401. cpuBreakdownMap,
  402. activeDataMap,
  403. preemptibleMap,
  404. labelsMap,
  405. clusterAndNameToType,
  406. )
  407. c, err := cp.GetConfig()
  408. if err != nil {
  409. return nil, err
  410. }
  411. discount, err := ParsePercentString(c.Discount)
  412. if err != nil {
  413. return nil, err
  414. }
  415. negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
  416. if err != nil {
  417. return nil, err
  418. }
  419. for _, node := range nodeMap {
  420. // TODO take GKE Reserved Instances into account
  421. node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
  422. // Apply all remaining resources to Idle
  423. node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
  424. node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
  425. }
  426. return nodeMap, nil
  427. }
  428. type LoadBalancer struct {
  429. Cluster string
  430. Name string
  431. ProviderID string
  432. Cost float64
  433. Start time.Time
  434. Minutes float64
  435. }
  436. func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
  437. durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
  438. offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
  439. if offset < time.Minute {
  440. offsetStr = ""
  441. }
  442. // minsPerResolution determines accuracy and resource use for the following
  443. // queries. Smaller values (higher resolution) result in better accuracy,
  444. // but more expensive queries, and vice-a-versa.
  445. minsPerResolution := 5
  446. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  447. // value, converts it to a cumulative value; i.e.
  448. // [$/hr] * [min/res]*[hr/min] = [$/res]
  449. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  450. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  451. queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip))[%s:%dm]%s) * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
  452. queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
  453. resChLBCost := ctx.Query(queryLBCost)
  454. resChActiveMins := ctx.Query(queryActiveMins)
  455. resLBCost, _ := resChLBCost.Await()
  456. resActiveMins, _ := resChActiveMins.Await()
  457. if ctx.HasErrors() {
  458. return nil, ctx.ErrorCollection()
  459. }
  460. loadBalancerMap := map[string]*LoadBalancer{}
  461. for _, result := range resLBCost {
  462. cluster, err := result.GetString(env.GetPromClusterLabel())
  463. if err != nil {
  464. cluster = env.GetClusterID()
  465. }
  466. namespace, err := result.GetString("namespace")
  467. if err != nil {
  468. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  469. continue
  470. }
  471. serviceName, err := result.GetString("service_name")
  472. if err != nil {
  473. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  474. continue
  475. }
  476. providerID, err := result.GetString("ingress_ip")
  477. if err != nil {
  478. log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
  479. providerID = ""
  480. }
  481. lbCost := result.Values[0].Value
  482. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  483. if _, ok := loadBalancerMap[key]; !ok {
  484. loadBalancerMap[key] = &LoadBalancer{
  485. Cluster: cluster,
  486. Name: namespace + "/" + serviceName,
  487. ProviderID: cloud.ParseLBID(providerID),
  488. }
  489. }
  490. // Fill in Provider ID if it is available and missing in the loadBalancerMap
  491. // Prevents there from being a duplicate LoadBalancers on the same day
  492. if providerID != "" && loadBalancerMap[key].ProviderID == "" {
  493. loadBalancerMap[key].ProviderID = providerID
  494. }
  495. loadBalancerMap[key].Cost += lbCost
  496. }
  497. for _, result := range resActiveMins {
  498. cluster, err := result.GetString(env.GetPromClusterLabel())
  499. if err != nil {
  500. cluster = env.GetClusterID()
  501. }
  502. namespace, err := result.GetString("namespace")
  503. if err != nil {
  504. log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
  505. continue
  506. }
  507. serviceName, err := result.GetString("service_name")
  508. if err != nil {
  509. log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
  510. continue
  511. }
  512. key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
  513. if len(result.Values) == 0 {
  514. continue
  515. }
  516. if lb, ok := loadBalancerMap[key]; ok {
  517. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  518. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
  519. mins := e.Sub(s).Minutes()
  520. lb.Start = s
  521. lb.Minutes = mins
  522. } else {
  523. log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
  524. }
  525. }
  526. return loadBalancerMap, nil
  527. }
  528. // ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
  529. func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset time.Duration, withBreakdown bool) (map[string]*ClusterCosts, error) {
  530. if window < 10*time.Minute {
  531. return nil, fmt.Errorf("minimum window of 10m required; got %s", window)
  532. }
  533. // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
  534. start, end := timeutil.ParseTimeRange(window, offset)
  535. mins := end.Sub(start).Minutes()
  536. windowStr := timeutil.DurationString(window)
  537. // minsPerResolution determines accuracy and resource use for the following
  538. // queries. Smaller values (higher resolution) result in better accuracy,
  539. // but more expensive queries, and vice-a-versa.
  540. minsPerResolution := 5
  541. // hourlyToCumulative is a scaling factor that, when multiplied by an hourly
  542. // value, converts it to a cumulative value; i.e.
  543. // [$/hr] * [min/res]*[hr/min] = [$/res]
  544. hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
  545. const fmtQueryDataCount = `
  546. count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
  547. `
  548. const fmtQueryTotalGPU = `
  549. sum(
  550. sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
  551. ) by (%s)
  552. `
  553. const fmtQueryTotalCPU = `
  554. sum(
  555. sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
  556. avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  557. ) by (%s)
  558. `
  559. const fmtQueryTotalRAM = `
  560. sum(
  561. sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  562. avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
  563. ) by (%s)
  564. `
  565. const fmtQueryTotalStorage = `
  566. sum(
  567. sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
  568. avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
  569. ) by (%s)
  570. `
  571. const fmtQueryCPUModePct = `
  572. sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
  573. group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
  574. `
  575. const fmtQueryRAMSystemPct = `
  576. sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
  577. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  578. `
  579. const fmtQueryRAMUserPct = `
  580. sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
  581. / sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
  582. `
  583. // TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
  584. // const fmtQueryPVStorageUsePct = `(sum(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass,namespace) + on (persistentvolumeclaim,namespace)
  585. // group_right(storageclass) sum(kubelet_volume_stats_used_bytes) by (persistentvolumeclaim,namespace))`
  586. queryUsedLocalStorage := provider.GetLocalStorageQuery(window, offset, false, true)
  587. queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false, false)
  588. if queryTotalLocalStorage != "" {
  589. queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
  590. }
  591. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  592. queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, minsPerResolution)
  593. queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, windowStr, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
  594. queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  595. queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  596. queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
  597. ctx := prom.NewNamedContext(client, prom.ClusterContextName)
  598. resChs := ctx.QueryAll(
  599. queryDataCount,
  600. queryTotalGPU,
  601. queryTotalCPU,
  602. queryTotalRAM,
  603. queryTotalStorage,
  604. )
  605. // Only submit the local storage query if it is valid. Otherwise Prometheus
  606. // will return errors. Always append something to resChs, regardless, to
  607. // maintain indexing.
  608. if queryTotalLocalStorage != "" {
  609. resChs = append(resChs, ctx.Query(queryTotalLocalStorage))
  610. } else {
  611. resChs = append(resChs, nil)
  612. }
  613. if withBreakdown {
  614. queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, windowStr, fmtOffset, env.GetPromClusterLabel(), windowStr, fmtOffset, env.GetPromClusterLabel())
  615. queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  616. queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), windowStr, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
  617. bdResChs := ctx.QueryAll(
  618. queryCPUModePct,
  619. queryRAMSystemPct,
  620. queryRAMUserPct,
  621. )
  622. // Only submit the local storage query if it is valid. Otherwise Prometheus
  623. // will return errors. Always append something to resChs, regardless, to
  624. // maintain indexing.
  625. if queryUsedLocalStorage != "" {
  626. bdResChs = append(bdResChs, ctx.Query(queryUsedLocalStorage))
  627. } else {
  628. bdResChs = append(bdResChs, nil)
  629. }
  630. resChs = append(resChs, bdResChs...)
  631. }
  632. resDataCount, _ := resChs[0].Await()
  633. resTotalGPU, _ := resChs[1].Await()
  634. resTotalCPU, _ := resChs[2].Await()
  635. resTotalRAM, _ := resChs[3].Await()
  636. resTotalStorage, _ := resChs[4].Await()
  637. if ctx.HasErrors() {
  638. return nil, ctx.ErrorCollection()
  639. }
  640. defaultClusterID := env.GetClusterID()
  641. dataMinsByCluster := map[string]float64{}
  642. for _, result := range resDataCount {
  643. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  644. if clusterID == "" {
  645. clusterID = defaultClusterID
  646. }
  647. dataMins := mins
  648. if len(result.Values) > 0 {
  649. dataMins = result.Values[0].Value
  650. } else {
  651. klog.V(3).Infof("[Warning] cluster cost data count returned no results for cluster %s", clusterID)
  652. }
  653. dataMinsByCluster[clusterID] = dataMins
  654. }
  655. // Determine combined discount
  656. discount, customDiscount := 0.0, 0.0
  657. c, err := a.CloudProvider.GetConfig()
  658. if err == nil {
  659. discount, err = ParsePercentString(c.Discount)
  660. if err != nil {
  661. discount = 0.0
  662. }
  663. customDiscount, err = ParsePercentString(c.NegotiatedDiscount)
  664. if err != nil {
  665. customDiscount = 0.0
  666. }
  667. }
  668. // Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
  669. costData := make(map[string]map[string]float64)
  670. // Helper function to iterate over Prom query results, parsing the raw values into
  671. // the intermediate costData structure.
  672. setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
  673. for _, result := range results {
  674. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  675. if clusterID == "" {
  676. clusterID = defaultClusterID
  677. }
  678. if _, ok := costData[clusterID]; !ok {
  679. costData[clusterID] = map[string]float64{}
  680. }
  681. if len(result.Values) > 0 {
  682. costData[clusterID][name] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  683. costData[clusterID]["total"] += result.Values[0].Value * (1.0 - discount) * (1.0 - customDiscount)
  684. }
  685. }
  686. }
  687. // Apply both sustained use and custom discounts to RAM and CPU
  688. setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
  689. setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
  690. // Apply only custom discount to GPU and storage
  691. setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
  692. setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
  693. if queryTotalLocalStorage != "" {
  694. resTotalLocalStorage, err := resChs[5].Await()
  695. if err != nil {
  696. return nil, err
  697. }
  698. setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
  699. }
  700. cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
  701. ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
  702. pvUsedCostMap := map[string]float64{}
  703. if withBreakdown {
  704. resCPUModePct, _ := resChs[6].Await()
  705. resRAMSystemPct, _ := resChs[7].Await()
  706. resRAMUserPct, _ := resChs[8].Await()
  707. if ctx.HasErrors() {
  708. return nil, ctx.ErrorCollection()
  709. }
  710. for _, result := range resCPUModePct {
  711. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  712. if clusterID == "" {
  713. clusterID = defaultClusterID
  714. }
  715. if _, ok := cpuBreakdownMap[clusterID]; !ok {
  716. cpuBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  717. }
  718. cpuBD := cpuBreakdownMap[clusterID]
  719. mode, err := result.GetString("mode")
  720. if err != nil {
  721. klog.V(3).Infof("[Warning] ComputeClusterCosts: unable to read CPU mode: %s", err)
  722. mode = "other"
  723. }
  724. switch mode {
  725. case "idle":
  726. cpuBD.Idle += result.Values[0].Value
  727. case "system":
  728. cpuBD.System += result.Values[0].Value
  729. case "user":
  730. cpuBD.User += result.Values[0].Value
  731. default:
  732. cpuBD.Other += result.Values[0].Value
  733. }
  734. }
  735. for _, result := range resRAMSystemPct {
  736. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  737. if clusterID == "" {
  738. clusterID = defaultClusterID
  739. }
  740. if _, ok := ramBreakdownMap[clusterID]; !ok {
  741. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  742. }
  743. ramBD := ramBreakdownMap[clusterID]
  744. ramBD.System += result.Values[0].Value
  745. }
  746. for _, result := range resRAMUserPct {
  747. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  748. if clusterID == "" {
  749. clusterID = defaultClusterID
  750. }
  751. if _, ok := ramBreakdownMap[clusterID]; !ok {
  752. ramBreakdownMap[clusterID] = &ClusterCostsBreakdown{}
  753. }
  754. ramBD := ramBreakdownMap[clusterID]
  755. ramBD.User += result.Values[0].Value
  756. }
  757. for _, ramBD := range ramBreakdownMap {
  758. remaining := 1.0
  759. remaining -= ramBD.Other
  760. remaining -= ramBD.System
  761. remaining -= ramBD.User
  762. ramBD.Idle = remaining
  763. }
  764. if queryUsedLocalStorage != "" {
  765. resUsedLocalStorage, err := resChs[9].Await()
  766. if err != nil {
  767. return nil, err
  768. }
  769. for _, result := range resUsedLocalStorage {
  770. clusterID, _ := result.GetString(env.GetPromClusterLabel())
  771. if clusterID == "" {
  772. clusterID = defaultClusterID
  773. }
  774. pvUsedCostMap[clusterID] += result.Values[0].Value
  775. }
  776. }
  777. }
  778. if ctx.HasErrors() {
  779. for _, err := range ctx.Errors() {
  780. log.Errorf("ComputeClusterCosts: %s", err)
  781. }
  782. return nil, ctx.ErrorCollection()
  783. }
  784. // Convert intermediate structure to Costs instances
  785. costsByCluster := map[string]*ClusterCosts{}
  786. for id, cd := range costData {
  787. dataMins, ok := dataMinsByCluster[id]
  788. if !ok {
  789. dataMins = mins
  790. klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
  791. }
  792. costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/timeutil.MinsPerHour)
  793. if err != nil {
  794. klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
  795. return nil, err
  796. }
  797. if cpuBD, ok := cpuBreakdownMap[id]; ok {
  798. costs.CPUBreakdown = cpuBD
  799. }
  800. if ramBD, ok := ramBreakdownMap[id]; ok {
  801. costs.RAMBreakdown = ramBD
  802. }
  803. costs.StorageBreakdown = &ClusterCostsBreakdown{}
  804. if pvUC, ok := pvUsedCostMap[id]; ok {
  805. costs.StorageBreakdown.Idle = (costs.StorageCumulative - pvUC) / costs.StorageCumulative
  806. costs.StorageBreakdown.User = pvUC / costs.StorageCumulative
  807. }
  808. costs.DataMinutes = dataMins
  809. costsByCluster[id] = costs
  810. }
  811. return costsByCluster, nil
  812. }
  813. type Totals struct {
  814. TotalCost [][]string `json:"totalcost"`
  815. CPUCost [][]string `json:"cpucost"`
  816. MemCost [][]string `json:"memcost"`
  817. StorageCost [][]string `json:"storageCost"`
  818. }
  819. func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
  820. if len(qrs) == 0 {
  821. return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
  822. }
  823. result := qrs[0]
  824. totals := [][]string{}
  825. for _, value := range result.Values {
  826. d0 := fmt.Sprintf("%f", value.Timestamp)
  827. d1 := fmt.Sprintf("%f", value.Value)
  828. toAppend := []string{
  829. d0,
  830. d1,
  831. }
  832. totals = append(totals, toAppend)
  833. }
  834. return totals, nil
  835. }
  836. // ClusterCostsOverTime gives the full cluster costs over time
  837. func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString string, window, offset time.Duration) (*Totals, error) {
  838. localStorageQuery := provider.GetLocalStorageQuery(window, offset, true, false)
  839. if localStorageQuery != "" {
  840. localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
  841. }
  842. layout := "2006-01-02T15:04:05.000Z"
  843. start, err := time.Parse(layout, startString)
  844. if err != nil {
  845. klog.V(1).Infof("Error parsing time %s. Error: %s", startString, err.Error())
  846. return nil, err
  847. }
  848. end, err := time.Parse(layout, endString)
  849. if err != nil {
  850. klog.V(1).Infof("Error parsing time %s. Error: %s", endString, err.Error())
  851. return nil, err
  852. }
  853. fmtWindow := timeutil.DurationString(window)
  854. if fmtWindow == "" {
  855. err := fmt.Errorf("window value invalid or missing")
  856. klog.V(1).Infof("Error parsing time %v. Error: %s", window, err.Error())
  857. return nil, err
  858. }
  859. fmtOffset := timeutil.DurationToPromOffsetString(offset)
  860. qCores := fmt.Sprintf(queryClusterCores, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  861. qRAM := fmt.Sprintf(queryClusterRAM, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
  862. qStorage := fmt.Sprintf(queryStorage, fmtWindow, fmtOffset, env.GetPromClusterLabel(), fmtWindow, fmtOffset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  863. qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
  864. ctx := prom.NewNamedContext(cli, prom.ClusterContextName)
  865. resChClusterCores := ctx.QueryRange(qCores, start, end, window)
  866. resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
  867. resChStorage := ctx.QueryRange(qStorage, start, end, window)
  868. resChTotal := ctx.QueryRange(qTotal, start, end, window)
  869. resultClusterCores, err := resChClusterCores.Await()
  870. if err != nil {
  871. return nil, err
  872. }
  873. resultClusterRAM, err := resChClusterRAM.Await()
  874. if err != nil {
  875. return nil, err
  876. }
  877. resultStorage, err := resChStorage.Await()
  878. if err != nil {
  879. return nil, err
  880. }
  881. resultTotal, err := resChTotal.Await()
  882. if err != nil {
  883. return nil, err
  884. }
  885. coreTotal, err := resultToTotals(resultClusterCores)
  886. if err != nil {
  887. klog.Infof("[Warning] ClusterCostsOverTime: no cpu data: %s", err)
  888. return nil, err
  889. }
  890. ramTotal, err := resultToTotals(resultClusterRAM)
  891. if err != nil {
  892. klog.Infof("[Warning] ClusterCostsOverTime: no ram data: %s", err)
  893. return nil, err
  894. }
  895. storageTotal, err := resultToTotals(resultStorage)
  896. if err != nil {
  897. klog.Infof("[Warning] ClusterCostsOverTime: no storage data: %s", err)
  898. }
  899. clusterTotal, err := resultToTotals(resultTotal)
  900. if err != nil {
  901. // If clusterTotal query failed, it's likely because there are no PVs, which
  902. // causes the qTotal query to return no data. Instead, query only node costs.
  903. // If that fails, return an error because something is actually wrong.
  904. qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
  905. resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
  906. for _, warning := range warnings {
  907. log.Warningf(warning)
  908. }
  909. if err != nil {
  910. return nil, err
  911. }
  912. clusterTotal, err = resultToTotals(resultNodes)
  913. if err != nil {
  914. klog.Infof("[Warning] ClusterCostsOverTime: no node data: %s", err)
  915. return nil, err
  916. }
  917. }
  918. return &Totals{
  919. TotalCost: clusterTotal,
  920. CPUCost: coreTotal,
  921. MemCost: ramTotal,
  922. StorageCost: storageTotal,
  923. }, nil
  924. }
  925. func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
  926. for _, result := range resActiveMins {
  927. cluster, err := result.GetString(env.GetPromClusterLabel())
  928. if err != nil {
  929. cluster = env.GetClusterID()
  930. }
  931. name, err := result.GetString("persistentvolume")
  932. if err != nil {
  933. log.Warningf("ClusterDisks: active mins missing pv name")
  934. continue
  935. }
  936. if len(result.Values) == 0 {
  937. continue
  938. }
  939. key := fmt.Sprintf("%s/%s", cluster, name)
  940. if _, ok := diskMap[key]; !ok {
  941. diskMap[key] = &Disk{
  942. Cluster: cluster,
  943. Name: name,
  944. Breakdown: &ClusterCostsBreakdown{},
  945. }
  946. }
  947. s := time.Unix(int64(result.Values[0].Timestamp), 0)
  948. e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
  949. mins := e.Sub(s).Minutes()
  950. // TODO niko/assets if mins >= threshold, interpolate for missing data?
  951. diskMap[key].End = e
  952. diskMap[key].Start = s
  953. diskMap[key].Minutes = mins
  954. }
  955. for _, result := range resPVSize {
  956. cluster, err := result.GetString(env.GetPromClusterLabel())
  957. if err != nil {
  958. cluster = env.GetClusterID()
  959. }
  960. name, err := result.GetString("persistentvolume")
  961. if err != nil {
  962. log.Warningf("ClusterDisks: PV size data missing persistentvolume")
  963. continue
  964. }
  965. // TODO niko/assets storage class
  966. bytes := result.Values[0].Value
  967. key := fmt.Sprintf("%s/%s", cluster, name)
  968. if _, ok := diskMap[key]; !ok {
  969. diskMap[key] = &Disk{
  970. Cluster: cluster,
  971. Name: name,
  972. Breakdown: &ClusterCostsBreakdown{},
  973. }
  974. }
  975. diskMap[key].Bytes = bytes
  976. }
  977. customPricingEnabled := cloud.CustomPricesEnabled(cp)
  978. customPricingConfig, err := cp.GetConfig()
  979. if err != nil {
  980. log.Warningf("ClusterDisks: failed to load custom pricing: %s", err)
  981. }
  982. for _, result := range resPVCost {
  983. cluster, err := result.GetString(env.GetPromClusterLabel())
  984. if err != nil {
  985. cluster = env.GetClusterID()
  986. }
  987. name, err := result.GetString("persistentvolume")
  988. if err != nil {
  989. log.Warningf("ClusterDisks: PV cost data missing persistentvolume")
  990. continue
  991. }
  992. // TODO niko/assets storage class
  993. var cost float64
  994. if customPricingEnabled && customPricingConfig != nil {
  995. customPVCostStr := customPricingConfig.Storage
  996. customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
  997. if err != nil {
  998. log.Warningf("ClusterDisks: error parsing custom PV price: %s", customPVCostStr)
  999. }
  1000. cost = customPVCost
  1001. } else {
  1002. cost = result.Values[0].Value
  1003. }
  1004. key := fmt.Sprintf("%s/%s", cluster, name)
  1005. if _, ok := diskMap[key]; !ok {
  1006. diskMap[key] = &Disk{
  1007. Cluster: cluster,
  1008. Name: name,
  1009. Breakdown: &ClusterCostsBreakdown{},
  1010. }
  1011. }
  1012. diskMap[key].Cost = cost * (diskMap[key].Bytes / 1024 / 1024 / 1024) * (diskMap[key].Minutes / 60)
  1013. providerID, _ := result.GetString("provider_id") // just put the providerID set up here, it's the simplest query.
  1014. if providerID != "" {
  1015. diskMap[key].ProviderID = cloud.ParsePVID(providerID)
  1016. }
  1017. }
  1018. }