allocation.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/opencost"
  6. "github.com/opencost/opencost/core/pkg/util/timeutil"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/pkg/env"
  9. "github.com/opencost/opencost/pkg/prom"
  10. )
  11. const (
  12. // https://kubecost.atlassian.net/browse/BURNDOWN-234
  13. // upstream KSM has implementation change vs OC internal KSM - it sets metric to 0 when pod goes down
  14. // VS OC implementation which stops emitting it
  15. // by adding != 0 filter, we keep just the active times in the prom result
  16. queryFmtPods = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
  17. queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
  18. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s, provider_id)`
  19. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  20. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  21. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  23. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  24. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  25. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  26. queryFmtGPUsUsageAvg = `avg(avg_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
  27. queryFmtGPUsUsageMax = `max(max_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
  28. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
  29. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
  30. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
  31. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
  32. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot{%s}[%s])`
  33. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
  34. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation{%s}[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  35. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{%s}[%s])) by (persistentvolumeclaim, namespace, %s)`
  36. queryFmtPVActiveMins = `count(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%s]`
  37. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (persistentvolume, %s)`
  38. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (volumename, %s)`
  39. queryFmtPVMeta = `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, provider_id)`
  40. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  41. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{%s}[%s])) by (%s)`
  42. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  43. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{%s}[%s])) by (%s)`
  44. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  45. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{%s}[%s])) by (%s)`
  46. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
  47. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
  48. queryFmtNodeLabels = `avg_over_time(kube_node_labels{%s}[%s])`
  49. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels{%s}[%s])`
  50. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations{%s}[%s])`
  51. queryFmtPodLabels = `avg_over_time(kube_pod_labels{%s}[%s])`
  52. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations{%s}[%s])`
  53. queryFmtServiceLabels = `avg_over_time(service_selector_labels{%s}[%s])`
  54. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels{%s}[%s])`
  55. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels{%s}[%s])`
  56. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet", %s}[%s])) by (pod, owner_name, namespace, %s)`
  57. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job", %s}[%s])) by (pod, owner_name, namespace ,%s)`
  58. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet", %s}[%s])) by (pod, owner_name, namespace ,%s)`
  59. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>", %s}[%s])) by (replicaset, namespace, %s)`
  60. queryFmtReplicaSetsWithRolloutOwner = `avg(avg_over_time(kube_replicaset_owner{owner_kind="Rollout", %s}[%s])) by (replicaset, namespace, owner_kind, owner_name, %s)`
  61. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, ingress_ip, %s)`
  62. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s)[%s:%s]`
  63. queryFmtOldestSample = `min_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
  64. queryFmtNewestSample = `max_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
  65. queryFmtIsGPuShared = `avg(avg_over_time(kube_pod_container_resource_requests{container!="", node != "", pod != "", container!= "", unit = "integer", %s}[%s])) by (container, pod, namespace, node, resource)`
  66. queryFmtGetGPuInfo = `avg(avg_over_time(DCGM_FI_DEV_DEC_UTIL{container!="",%s}[%s])) by (container, pod, namespace, device, modelName, UUID)`
  67. // Because we use container_cpu_usage_seconds_total to calculate CPU usage
  68. // at any given "instant" of time, we need to use an irate or rate. To then
  69. // calculate a max (or any aggregation) we have to perform an aggregation
  70. // query on top of an instant-by-instant maximum. Prometheus supports this
  71. // type of query with a "subquery" [1], however it is reportedly expensive
  72. // to make such a query. By default, Kubecost's Prometheus config includes
  73. // a recording rule that keeps track of the instant-by-instant irate for CPU
  74. // usage. The metric in this query is created by that recording rule.
  75. //
  76. // [1] https://prometheus.io/blog/2019/01/28/subquery-support/
  77. //
  78. // If changing the name of the recording rule, make sure to update the
  79. // corresponding diagnostic query to avoid confusion.
  80. queryFmtCPUUsageMaxRecordingRule = `max(max_over_time(kubecost_container_cpu_usage_irate{%s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  81. // This is the subquery equivalent of the above recording rule query. It is
  82. // more expensive, but does not require the recording rule. It should be
  83. // used as a fallback query if the recording rule data does not exist.
  84. //
  85. // The parameter after the colon [:<thisone>] in the subquery affects the
  86. // resolution of the subquery.
  87. // The parameter after the metric ...{}[<thisone>] should be set to 2x
  88. // the resolution, to make sure the irate always has two points to query
  89. // in case the Prom scrape duration has been reduced to be equal to the
  90. // ETL resolution.
  91. queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%s])[%s:%s])) by (container, pod_name, pod, namespace, instance, %s)`
  92. )
  93. // Constants for Network Cost Subtype
  94. const (
  95. networkCrossZoneCost = "NetworkCrossZoneCost"
  96. networkCrossRegionCost = "NetworkCrossRegionCost"
  97. networkInternetCost = "NetworkInternetCost"
  98. )
  99. // CanCompute should return true if CostModel can act as a valid source for the
  100. // given time range. In the case of CostModel we want to attempt to compute as
  101. // long as the range starts in the past. If the CostModel ends up not having
  102. // data to match, that's okay, and should be communicated with an error
  103. // response from ComputeAllocation.
  104. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  105. return start.Before(time.Now())
  106. }
  107. // Name returns the name of the Source
  108. func (cm *CostModel) Name() string {
  109. return "CostModel"
  110. }
  111. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  112. // for the window defined by the given start and end times. The Allocations
  113. // returned are unaggregated (i.e. down to the container level).
  114. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
  115. // If the duration is short enough, compute the AllocationSet directly
  116. if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
  117. as, _, err := cm.computeAllocation(start, end, resolution)
  118. return as, err
  119. }
  120. // If the duration exceeds the configured MaxPrometheusQueryDuration, then
  121. // query for maximum-sized AllocationSets, collect them, and accumulate.
  122. // s and e track the coverage of the entire given window over multiple
  123. // internal queries.
  124. s, e := start, start
  125. // Collect AllocationSets in a range, then accumulate
  126. // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
  127. asr := opencost.NewAllocationSetRange()
  128. for e.Before(end) {
  129. // By default, query for the full remaining duration. But do not let
  130. // any individual query duration exceed the configured max Prometheus
  131. // query duration.
  132. duration := end.Sub(e)
  133. if duration > cm.MaxPrometheusQueryDuration {
  134. duration = cm.MaxPrometheusQueryDuration
  135. }
  136. // Set start and end parameters (s, e) for next individual computation.
  137. e = s.Add(duration)
  138. // Compute the individual AllocationSet for just (s, e)
  139. as, _, err := cm.computeAllocation(s, e, resolution)
  140. if err != nil {
  141. return opencost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", opencost.NewClosedWindow(s, e), err)
  142. }
  143. // Append to the range
  144. asr.Append(as)
  145. // Set s equal to e to set up the next query, if one exists.
  146. s = e
  147. }
  148. // Populate annotations, labels, and services on each Allocation. This is
  149. // necessary because Properties.Intersection does not propagate any values
  150. // stored in maps or slices for performance reasons. In this case, however,
  151. // it is both acceptable and necessary to do so.
  152. allocationAnnotations := map[string]map[string]string{}
  153. allocationLabels := map[string]map[string]string{}
  154. allocationServices := map[string]map[string]bool{}
  155. // Also record errors and warnings, then append them to the results later.
  156. errors := []string{}
  157. warnings := []string{}
  158. for _, as := range asr.Allocations {
  159. for k, a := range as.Allocations {
  160. if len(a.Properties.Annotations) > 0 {
  161. if _, ok := allocationAnnotations[k]; !ok {
  162. allocationAnnotations[k] = map[string]string{}
  163. }
  164. for name, val := range a.Properties.Annotations {
  165. allocationAnnotations[k][name] = val
  166. }
  167. }
  168. if len(a.Properties.Labels) > 0 {
  169. if _, ok := allocationLabels[k]; !ok {
  170. allocationLabels[k] = map[string]string{}
  171. }
  172. for name, val := range a.Properties.Labels {
  173. allocationLabels[k][name] = val
  174. }
  175. }
  176. if len(a.Properties.Services) > 0 {
  177. if _, ok := allocationServices[k]; !ok {
  178. allocationServices[k] = map[string]bool{}
  179. }
  180. for _, val := range a.Properties.Services {
  181. allocationServices[k][val] = true
  182. }
  183. }
  184. }
  185. errors = append(errors, as.Errors...)
  186. warnings = append(warnings, as.Warnings...)
  187. }
  188. // Accumulate to yield the result AllocationSet. After this step, we will
  189. // be nearly complete, but without the raw allocation data, which must be
  190. // recomputed.
  191. resultASR, err := asr.Accumulate(opencost.AccumulateOptionAll)
  192. if err != nil {
  193. return opencost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", opencost.NewClosedWindow(s, e), err)
  194. }
  195. if resultASR != nil && len(resultASR.Allocations) == 0 {
  196. return opencost.NewAllocationSet(start, end), nil
  197. }
  198. if length := len(resultASR.Allocations); length != 1 {
  199. return opencost.NewAllocationSet(start, end), fmt.Errorf("expected 1 accumulated allocation set, found %d sets", length)
  200. }
  201. result := resultASR.Allocations[0]
  202. // Apply the annotations, labels, and services to the post-accumulation
  203. // results. (See above for why this is necessary.)
  204. for k, a := range result.Allocations {
  205. if annotations, ok := allocationAnnotations[k]; ok {
  206. a.Properties.Annotations = annotations
  207. }
  208. if labels, ok := allocationLabels[k]; ok {
  209. a.Properties.Labels = labels
  210. }
  211. if services, ok := allocationServices[k]; ok {
  212. a.Properties.Services = []string{}
  213. for s := range services {
  214. a.Properties.Services = append(a.Properties.Services, s)
  215. }
  216. }
  217. // Expand the Window of all Allocations within the AllocationSet
  218. // to match the Window of the AllocationSet, which gets expanded
  219. // at the end of this function.
  220. a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
  221. }
  222. // Maintain RAM and CPU max usage values by iterating over the range,
  223. // computing maximums on a rolling basis, and setting on the result set.
  224. for _, as := range asr.Allocations {
  225. for key, alloc := range as.Allocations {
  226. resultAlloc := result.Get(key)
  227. if resultAlloc == nil {
  228. continue
  229. }
  230. if resultAlloc.RawAllocationOnly == nil {
  231. resultAlloc.RawAllocationOnly = &opencost.RawAllocationOnlyData{}
  232. }
  233. if alloc.RawAllocationOnly == nil {
  234. // This will happen inevitably for unmounted disks, but should
  235. // ideally not happen for any allocation with CPU and RAM data.
  236. if !alloc.IsUnmounted() {
  237. log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
  238. }
  239. continue
  240. }
  241. if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
  242. resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
  243. }
  244. if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
  245. resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
  246. }
  247. if alloc.RawAllocationOnly.GPUUsageMax != nil {
  248. if *alloc.RawAllocationOnly.GPUUsageMax > *resultAlloc.RawAllocationOnly.GPUUsageMax {
  249. resultAlloc.RawAllocationOnly.GPUUsageMax = alloc.RawAllocationOnly.GPUUsageMax
  250. }
  251. }
  252. }
  253. }
  254. // Expand the window to match the queried time range.
  255. result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
  256. // Append errors and warnings
  257. result.Errors = errors
  258. result.Warnings = warnings
  259. // Convert any NaNs to 0 to avoid JSON marshaling issues and avoid cascading NaN appearances elsewhere
  260. result.SanitizeNaN()
  261. return result, nil
  262. }
  263. // DateRange checks the data (up to 90 days in the past), and returns the oldest and newest sample timestamp from opencost scraping metric
  264. // it supposed to be a good indicator of available allocation data
  265. func (cm *CostModel) DateRange() (time.Time, time.Time, error) {
  266. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  267. exportCsvDaysFmt := fmt.Sprintf("%dd", env.GetExportCSVMaxDays())
  268. resOldest, _, err := ctx.QuerySync(fmt.Sprintf(queryFmtOldestSample, env.GetPromClusterFilter(), exportCsvDaysFmt, "1h"))
  269. if err != nil {
  270. return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: %w", err)
  271. }
  272. if len(resOldest) == 0 || len(resOldest[0].Values) == 0 {
  273. return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: no results")
  274. }
  275. oldest := time.Unix(int64(resOldest[0].Values[0].Value), 0)
  276. resNewest, _, err := ctx.QuerySync(fmt.Sprintf(queryFmtNewestSample, env.GetPromClusterFilter(), exportCsvDaysFmt, "1h"))
  277. if err != nil {
  278. return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: %w", err)
  279. }
  280. if len(resNewest) == 0 || len(resNewest[0].Values) == 0 {
  281. return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: no results")
  282. }
  283. newest := time.Unix(int64(resNewest[0].Values[0].Value), 0)
  284. return oldest, newest, nil
  285. }
  286. func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, map[nodeKey]*nodePricing, error) {
  287. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  288. // 2. Run and apply the results of the remaining queries to
  289. // 3. Build out AllocationSet from completed Pod map
  290. // Create a window spanning the requested query
  291. window := opencost.NewWindow(&start, &end)
  292. // Create an empty AllocationSet. For safety, in the case of an error, we
  293. // should prefer to return this empty set with the error. (In the case of
  294. // no error, of course we populate the set and return it.)
  295. allocSet := opencost.NewAllocationSet(start, end)
  296. // (1) Build out Pod map
  297. // Build out a map of Allocations as a mapping from pod-to-container-to-
  298. // underlying-Allocation instance, starting with (start, end) so that we
  299. // begin with minutes, from which we compute resource allocation and cost
  300. // totals from measured rate data.
  301. podMap := map[podKey]*pod{}
  302. // clusterStarts and clusterEnds record the earliest start and latest end
  303. // times, respectively, on a cluster-basis. These are used for unmounted
  304. // PVs and other "virtual" Allocations so that minutes are maximally
  305. // accurate during start-up or spin-down of a cluster
  306. clusterStart := map[string]time.Time{}
  307. clusterEnd := map[string]time.Time{}
  308. // If ingesting pod UID, we query kube_pod_container_status_running avg
  309. // by uid as well as the default values, and all podKeys/pods have their
  310. // names changed to "<pod_name> <pod_uid>". Because other metrics need
  311. // to generate keys to match pods but don't have UIDs, podUIDKeyMap
  312. // stores values of format:
  313. // default podKey : []{edited podkey 1, edited podkey 2}
  314. // This is because ingesting UID allows us to catch uncontrolled pods
  315. // with the same names. However, this will lead to a many-to-one metric
  316. // to podKey relation, so this map allows us to map the metric's
  317. // "<pod_name>" key to the edited "<pod_name> <pod_uid>" keys in podMap.
  318. ingestPodUID := env.IsIngestingPodUID()
  319. podUIDKeyMap := make(map[podKey][]podKey)
  320. if ingestPodUID {
  321. log.Debugf("CostModel.ComputeAllocation: ingesting UID data from KSM metrics...")
  322. }
  323. // TODO:CLEANUP remove "max batch" idea and clusterStart/End
  324. err := cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
  325. if err != nil {
  326. log.Errorf("CostModel.ComputeAllocation: failed to build pod map: %s", err.Error())
  327. }
  328. // (2) Run and apply remaining queries
  329. // Query for the duration between start and end
  330. durStr := timeutil.DurationString(end.Sub(start))
  331. if durStr == "" {
  332. return allocSet, nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
  333. }
  334. // Convert resolution duration to a query-ready string
  335. resStr := timeutil.DurationString(resolution)
  336. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  337. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  338. resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
  339. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  340. resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
  341. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  342. resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
  343. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  344. resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
  345. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  346. resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
  347. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  348. resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
  349. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  350. resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
  351. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMaxRecordingRule, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  352. resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
  353. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  354. // If the recording rule has no data, try to fall back to the subquery.
  355. if len(resCPUUsageMax) == 0 {
  356. // The parameter after the metric ...{}[<thisone>] should be set to 2x
  357. // the resolution, to make sure the irate always has two points to query
  358. // in case the Prom scrape duration has been reduced to be equal to the
  359. // resolution.
  360. doubleResStr := timeutil.DurationString(2 * resolution)
  361. queryCPUUsageMax = fmt.Sprintf(queryFmtCPUUsageMaxSubquery, env.GetPromClusterFilter(), doubleResStr, durStr, resStr, env.GetPromClusterLabel())
  362. resChCPUUsageMax = ctx.QueryAtTime(queryCPUUsageMax, end)
  363. resCPUUsageMax, _ = resChCPUUsageMax.Await()
  364. // This avoids logspam if there is no data for either metric (e.g. if
  365. // the Prometheus didn't exist in the queried window of time).
  366. if len(resCPUUsageMax) > 0 {
  367. log.Debugf("CPU usage recording rule query returned an empty result when queried at %s over %s. Fell back to subquery. Consider setting up Kubecost CPU usage recording role to reduce query load on Prometheus; subqueries are expensive.", end.String(), durStr)
  368. }
  369. }
  370. // GPU Queries
  371. //queryIsGpuShared := fmt.Sprintf(queryFmtIsGPuShared, durStr)
  372. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  373. resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
  374. queryGPUsUsageAvg := fmt.Sprintf(queryFmtGPUsUsageAvg, durStr, env.GetPromClusterLabel())
  375. resChGPUsUsageAvg := ctx.Query(queryGPUsUsageAvg)
  376. queryGPUsUsageMax := fmt.Sprintf(queryFmtGPUsUsageMax, durStr, env.GetPromClusterLabel())
  377. resChGPUsUsageMax := ctx.Query(queryGPUsUsageMax)
  378. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  379. resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
  380. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  381. resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
  382. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  383. resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
  384. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  385. resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
  386. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, env.GetPromClusterFilter(), durStr)
  387. resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
  388. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  389. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
  390. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  391. resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
  392. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  393. resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
  394. queryPVActiveMins := fmt.Sprintf(queryFmtPVActiveMins, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  395. resChPVActiveMins := ctx.QueryAtTime(queryPVActiveMins, end)
  396. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  397. resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
  398. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  399. resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
  400. queryPVMeta := fmt.Sprintf(queryFmtPVMeta, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  401. resChPVMeta := ctx.QueryAtTime(queryPVMeta, end)
  402. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  403. resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
  404. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  405. resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
  406. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  407. resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
  408. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  409. resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
  410. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  411. resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
  412. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  413. resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
  414. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  415. resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
  416. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  417. resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
  418. //GPU Queries
  419. queryIsGpuShared := fmt.Sprintf(queryFmtIsGPuShared, env.GetPromClusterFilter(), durStr)
  420. resChIsGpuShared := ctx.QueryAtTime(queryIsGpuShared, end)
  421. queryGetGPUInfo := fmt.Sprintf(queryFmtGetGPuInfo, env.GetPromClusterFilter(), durStr)
  422. resChGetGPUInfo := ctx.QueryAtTime(queryGetGPUInfo, end)
  423. var resChNodeLabels prom.QueryResultsChan
  424. if env.GetAllocationNodeLabelsEnabled() {
  425. queryNodeLabels := fmt.Sprintf(queryFmtNodeLabels, env.GetPromClusterFilter(), durStr)
  426. resChNodeLabels = ctx.QueryAtTime(queryNodeLabels, end)
  427. }
  428. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, env.GetPromClusterFilter(), durStr)
  429. resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
  430. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, env.GetPromClusterFilter(), durStr)
  431. resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
  432. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, env.GetPromClusterFilter(), durStr)
  433. resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
  434. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, env.GetPromClusterFilter(), durStr)
  435. resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
  436. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, env.GetPromClusterFilter(), durStr)
  437. resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
  438. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, env.GetPromClusterFilter(), durStr)
  439. resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
  440. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, env.GetPromClusterFilter(), durStr)
  441. resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
  442. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  443. resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
  444. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  445. resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
  446. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  447. resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
  448. queryReplicaSetsWithRolloutOwner := fmt.Sprintf(queryFmtReplicaSetsWithRolloutOwner, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  449. resChReplicaSetsWithRolloutOwner := ctx.QueryAtTime(queryReplicaSetsWithRolloutOwner, end)
  450. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  451. resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
  452. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
  453. resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
  454. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
  455. resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
  456. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  457. resCPURequests, _ := resChCPURequests.Await()
  458. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  459. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  460. resRAMRequests, _ := resChRAMRequests.Await()
  461. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  462. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  463. resGPUsRequested, _ := resChGPUsRequested.Await()
  464. resGPUsUsageAvg, _ := resChGPUsUsageAvg.Await()
  465. resGPUsUsageMax, _ := resChGPUsUsageMax.Await()
  466. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  467. resIsGpuShared, _ := resChIsGpuShared.Await()
  468. resGetGPUInfo, _ := resChGetGPUInfo.Await()
  469. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  470. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  471. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  472. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  473. nodeExtendedData, _ := queryExtendedNodeData(ctx, start, end, durStr, resStr)
  474. resPVActiveMins, _ := resChPVActiveMins.Await()
  475. resPVBytes, _ := resChPVBytes.Await()
  476. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  477. resPVMeta, _ := resChPVMeta.Await()
  478. resPVCInfo, _ := resChPVCInfo.Await()
  479. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  480. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  481. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  482. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  483. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  484. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  485. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  486. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  487. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  488. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  489. var resNodeLabels []*prom.QueryResult
  490. if env.GetAllocationNodeLabelsEnabled() {
  491. if env.GetAllocationNodeLabelsEnabled() {
  492. resNodeLabels, _ = resChNodeLabels.Await()
  493. }
  494. }
  495. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  496. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  497. resPodLabels, _ := resChPodLabels.Await()
  498. resPodAnnotations, _ := resChPodAnnotations.Await()
  499. resServiceLabels, _ := resChServiceLabels.Await()
  500. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  501. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  502. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  503. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  504. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  505. resReplicaSetsWithRolloutOwner, _ := resChReplicaSetsWithRolloutOwner.Await()
  506. resJobLabels, _ := resChJobLabels.Await()
  507. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  508. resLBActiveMins, _ := resChLBActiveMins.Await()
  509. if ctx.HasErrors() {
  510. for _, err := range ctx.Errors() {
  511. log.Errorf("CostModel.ComputeAllocation: query context error %s", err)
  512. }
  513. return allocSet, nil, ctx.ErrorCollection()
  514. }
  515. // We choose to apply allocation before requests in the cases of RAM and
  516. // CPU so that we can assert that allocation should always be greater than
  517. // or equal to request.
  518. applyCPUCoresAllocated(podMap, resCPUCoresAllocated, podUIDKeyMap)
  519. applyCPUCoresRequested(podMap, resCPURequests, podUIDKeyMap)
  520. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg, podUIDKeyMap)
  521. applyCPUCoresUsedMax(podMap, resCPUUsageMax, podUIDKeyMap)
  522. applyRAMBytesAllocated(podMap, resRAMBytesAllocated, podUIDKeyMap)
  523. applyRAMBytesRequested(podMap, resRAMRequests, podUIDKeyMap)
  524. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg, podUIDKeyMap)
  525. applyRAMBytesUsedMax(podMap, resRAMUsageMax, podUIDKeyMap)
  526. applyGPUUsage(podMap, resGPUsUsageAvg, podUIDKeyMap, GpuUsageAverageMode)
  527. applyGPUUsage(podMap, resGPUsUsageMax, podUIDKeyMap, GpuUsageMaxMode)
  528. applyGPUUsage(podMap, resIsGpuShared, podUIDKeyMap, GpuIsSharedMode)
  529. applyGPUUsage(podMap, resGetGPUInfo, podUIDKeyMap, GpuInfoMode)
  530. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated, podUIDKeyMap)
  531. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes, podUIDKeyMap)
  532. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB, podUIDKeyMap, networkCrossZoneCost)
  533. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB, podUIDKeyMap, networkCrossRegionCost)
  534. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB, podUIDKeyMap, networkInternetCost)
  535. // In the case that a two pods with the same name had different containers,
  536. // we will double-count the containers. There is no way to associate each
  537. // container with the proper pod from the usage metrics above. This will
  538. // show up as a pod having two Allocations running for the whole pod runtime.
  539. // Other than that case, Allocations should be associated with pods by the
  540. // above functions.
  541. // At this point, we expect "Node" to be set by one of the above functions
  542. // (e.g. applyCPUCoresAllocated, etc.) -- otherwise, node labels will fail
  543. // to correctly apply to the pods.
  544. var nodeLabels map[nodeKey]map[string]string
  545. if env.GetAllocationNodeLabelsEnabled() {
  546. nodeLabels = resToNodeLabels(resNodeLabels)
  547. }
  548. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  549. podLabels := resToPodLabels(resPodLabels, podUIDKeyMap, ingestPodUID)
  550. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  551. podAnnotations := resToPodAnnotations(resPodAnnotations, podUIDKeyMap, ingestPodUID)
  552. applyLabels(podMap, nodeLabels, namespaceLabels, podLabels)
  553. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  554. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  555. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  556. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels, podUIDKeyMap, ingestPodUID)
  557. podJobMap := resToPodJobMap(resJobLabels, podUIDKeyMap, ingestPodUID)
  558. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners, resReplicaSetsWithRolloutOwner, podUIDKeyMap, ingestPodUID)
  559. applyControllersToPods(podMap, podDeploymentMap)
  560. applyControllersToPods(podMap, podStatefulSetMap)
  561. applyControllersToPods(podMap, podDaemonSetMap)
  562. applyControllersToPods(podMap, podJobMap)
  563. applyControllersToPods(podMap, podReplicaSetMap)
  564. serviceLabels := getServiceLabels(resServiceLabels)
  565. allocsByService := map[serviceKey][]*opencost.Allocation{}
  566. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  567. // TODO breakdown network costs?
  568. // Build out the map of all PVs with class, size and cost-per-hour.
  569. // Note: this does not record time running, which we may want to
  570. // include later for increased PV precision. (As long as the PV has
  571. // a PVC, we get time running there, so this is only inaccurate
  572. // for short-lived, unmounted PVs.)
  573. pvMap := map[pvKey]*pv{}
  574. buildPVMap(resolution, pvMap, resPVCostPerGiBHour, resPVActiveMins, resPVMeta, window)
  575. applyPVBytes(pvMap, resPVBytes)
  576. // Build out the map of all PVCs with time running, bytes requested,
  577. // and connect to the correct PV from pvMap. (If no PV exists, that
  578. // is noted, but does not result in any allocation/cost.)
  579. pvcMap := map[pvcKey]*pvc{}
  580. buildPVCMap(resolution, pvcMap, pvMap, resPVCInfo, window)
  581. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  582. // Build out the relationships of pods to their PVCs. This step
  583. // populates the pvc.Count field so that pvc allocation can be
  584. // split appropriately among each pod's container allocation.
  585. podPVCMap := map[podKey][]*pvc{}
  586. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation, podUIDKeyMap, ingestPodUID)
  587. applyPVCsToPods(window, podMap, podPVCMap, pvcMap)
  588. // Identify PVCs without pods and add pv costs to the unmounted Allocation for the pvc's cluster
  589. applyUnmountedPVCs(window, podMap, pvcMap)
  590. // Identify PVs without PVCs and add PV costs to the unmounted Allocation for the PV's cluster
  591. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  592. lbMap := make(map[serviceKey]*lbCost)
  593. getLoadBalancerCosts(lbMap, resLBCostPerHr, resLBActiveMins, resolution, window)
  594. applyLoadBalancersToPods(window, podMap, lbMap, allocsByService)
  595. // Build out a map of Nodes with resource costs, discounts, and node types
  596. // for converting resource allocation data to cumulative costs.
  597. nodeMap := map[nodeKey]*nodePricing{}
  598. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  599. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  600. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  601. applyNodeSpot(nodeMap, resNodeIsSpot)
  602. applyNodeDiscount(nodeMap, cm)
  603. applyExtendedNodeData(nodeMap, nodeExtendedData)
  604. cm.applyNodesToPod(podMap, nodeMap)
  605. // (3) Build out AllocationSet from Pod map
  606. for _, pod := range podMap {
  607. for _, alloc := range pod.Allocations {
  608. cluster := alloc.Properties.Cluster
  609. nodeName := alloc.Properties.Node
  610. namespace := alloc.Properties.Namespace
  611. podName := alloc.Properties.Pod
  612. container := alloc.Properties.Container
  613. // Make sure that the name is correct (node may not be present at this
  614. // point due to it missing from queryMinutes) then insert.
  615. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, podName, container)
  616. allocSet.Set(alloc)
  617. }
  618. }
  619. return allocSet, nodeMap, nil
  620. }