allocation.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. package costmodel
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/opencost/opencost/pkg/util/timeutil"
  6. "github.com/opencost/opencost/pkg/env"
  7. "github.com/opencost/opencost/pkg/kubecost"
  8. "github.com/opencost/opencost/pkg/log"
  9. "github.com/opencost/opencost/pkg/prom"
  10. )
  11. const (
  12. queryFmtPods = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
  13. queryFmtPodsUID = `avg(kube_pod_container_status_running{}) by (pod, namespace, uid, %s)[%s:%s]`
  14. queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
  15. queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  16. queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  17. queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  18. queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  19. queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  20. queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  21. queryFmtCPUUsageMax = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
  22. queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  23. queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
  24. queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  25. queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  26. queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
  27. queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot[%s])`
  28. queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
  29. queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
  30. queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
  31. queryFmtPVActiveMins = `count(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%s]`
  32. queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
  33. queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
  34. queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  35. queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
  36. queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  37. queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
  38. queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
  39. queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
  40. queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  41. queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
  42. queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels[%s])`
  43. queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations[%s])`
  44. queryFmtPodLabels = `avg_over_time(kube_pod_labels[%s])`
  45. queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations[%s])`
  46. queryFmtServiceLabels = `avg_over_time(service_selector_labels[%s])`
  47. queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels[%s])`
  48. queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels[%s])`
  49. queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
  50. queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
  51. queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
  52. queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
  53. queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
  54. queryFmtLBActiveMins = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
  55. )
  56. // Constants for Network Cost Subtype
  57. const (
  58. networkCrossZoneCost = "NetworkCrossZoneCost"
  59. networkCrossRegionCost = "NetworkCrossRegionCost"
  60. networkInternetCost = "NetworkInternetCost"
  61. )
  62. // CanCompute should return true if CostModel can act as a valid source for the
  63. // given time range. In the case of CostModel we want to attempt to compute as
  64. // long as the range starts in the past. If the CostModel ends up not having
  65. // data to match, that's okay, and should be communicated with an error
  66. // response from ComputeAllocation.
  67. func (cm *CostModel) CanCompute(start, end time.Time) bool {
  68. return start.Before(time.Now())
  69. }
  70. // Name returns the name of the Source
  71. func (cm *CostModel) Name() string {
  72. return "CostModel"
  73. }
  74. // ComputeAllocation uses the CostModel instance to compute an AllocationSet
  75. // for the window defined by the given start and end times. The Allocations
  76. // returned are unaggregated (i.e. down to the container level).
  77. func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  78. // If the duration is short enough, compute the AllocationSet directly
  79. if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
  80. return cm.computeAllocation(start, end, resolution)
  81. }
  82. // If the duration exceeds the configured MaxPrometheusQueryDuration, then
  83. // query for maximum-sized AllocationSets, collect them, and accumulate.
  84. // s and e track the coverage of the entire given window over multiple
  85. // internal queries.
  86. s, e := start, start
  87. // Collect AllocationSets in a range, then accumulate
  88. // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
  89. asr := kubecost.NewAllocationSetRange()
  90. for e.Before(end) {
  91. // By default, query for the full remaining duration. But do not let
  92. // any individual query duration exceed the configured max Prometheus
  93. // query duration.
  94. duration := end.Sub(e)
  95. if duration > cm.MaxPrometheusQueryDuration {
  96. duration = cm.MaxPrometheusQueryDuration
  97. }
  98. // Set start and end parameters (s, e) for next individual computation.
  99. e = s.Add(duration)
  100. // Compute the individual AllocationSet for just (s, e)
  101. as, err := cm.computeAllocation(s, e, resolution)
  102. if err != nil {
  103. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", kubecost.NewClosedWindow(s, e), err)
  104. }
  105. // Append to the range
  106. asr.Append(as)
  107. // Set s equal to e to set up the next query, if one exists.
  108. s = e
  109. }
  110. // Populate annotations, labels, and services on each Allocation. This is
  111. // necessary because Properties.Intersection does not propagate any values
  112. // stored in maps or slices for performance reasons. In this case, however,
  113. // it is both acceptable and necessary to do so.
  114. allocationAnnotations := map[string]map[string]string{}
  115. allocationLabels := map[string]map[string]string{}
  116. allocationServices := map[string]map[string]bool{}
  117. // Also record errors and warnings, then append them to the results later.
  118. errors := []string{}
  119. warnings := []string{}
  120. for _, as := range asr.Allocations {
  121. for k, a := range as.Allocations {
  122. if len(a.Properties.Annotations) > 0 {
  123. if _, ok := allocationAnnotations[k]; !ok {
  124. allocationAnnotations[k] = map[string]string{}
  125. }
  126. for name, val := range a.Properties.Annotations {
  127. allocationAnnotations[k][name] = val
  128. }
  129. }
  130. if len(a.Properties.Labels) > 0 {
  131. if _, ok := allocationLabels[k]; !ok {
  132. allocationLabels[k] = map[string]string{}
  133. }
  134. for name, val := range a.Properties.Labels {
  135. allocationLabels[k][name] = val
  136. }
  137. }
  138. if len(a.Properties.Services) > 0 {
  139. if _, ok := allocationServices[k]; !ok {
  140. allocationServices[k] = map[string]bool{}
  141. }
  142. for _, val := range a.Properties.Services {
  143. allocationServices[k][val] = true
  144. }
  145. }
  146. }
  147. errors = append(errors, as.Errors...)
  148. warnings = append(warnings, as.Warnings...)
  149. }
  150. // Accumulate to yield the result AllocationSet. After this step, we will
  151. // be nearly complete, but without the raw allocation data, which must be
  152. // recomputed.
  153. result, err := asr.Accumulate()
  154. if err != nil {
  155. return kubecost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", kubecost.NewClosedWindow(s, e), err)
  156. }
  157. // Apply the annotations, labels, and services to the post-accumulation
  158. // results. (See above for why this is necessary.)
  159. for k, a := range result.Allocations {
  160. if annotations, ok := allocationAnnotations[k]; ok {
  161. a.Properties.Annotations = annotations
  162. }
  163. if labels, ok := allocationLabels[k]; ok {
  164. a.Properties.Labels = labels
  165. }
  166. if services, ok := allocationServices[k]; ok {
  167. a.Properties.Services = []string{}
  168. for s := range services {
  169. a.Properties.Services = append(a.Properties.Services, s)
  170. }
  171. }
  172. // Expand the Window of all Allocations within the AllocationSet
  173. // to match the Window of the AllocationSet, which gets expanded
  174. // at the end of this function.
  175. a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
  176. }
  177. // Maintain RAM and CPU max usage values by iterating over the range,
  178. // computing maximums on a rolling basis, and setting on the result set.
  179. for _, as := range asr.Allocations {
  180. for key, alloc := range as.Allocations {
  181. resultAlloc := result.Get(key)
  182. if resultAlloc == nil {
  183. continue
  184. }
  185. if resultAlloc.RawAllocationOnly == nil {
  186. resultAlloc.RawAllocationOnly = &kubecost.RawAllocationOnlyData{}
  187. }
  188. if alloc.RawAllocationOnly == nil {
  189. // This will happen inevitably for unmounted disks, but should
  190. // ideally not happen for any allocation with CPU and RAM data.
  191. if !alloc.IsUnmounted() {
  192. log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
  193. }
  194. continue
  195. }
  196. if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
  197. resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
  198. }
  199. if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
  200. resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
  201. }
  202. }
  203. }
  204. // Expand the window to match the queried time range.
  205. result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
  206. // Append errors and warnings
  207. result.Errors = errors
  208. result.Warnings = warnings
  209. return result, nil
  210. }
  211. func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
  212. // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
  213. // 2. Run and apply the results of the remaining queries to
  214. // 3. Build out AllocationSet from completed Pod map
  215. // Create a window spanning the requested query
  216. window := kubecost.NewWindow(&start, &end)
  217. // Create an empty AllocationSet. For safety, in the case of an error, we
  218. // should prefer to return this empty set with the error. (In the case of
  219. // no error, of course we populate the set and return it.)
  220. allocSet := kubecost.NewAllocationSet(start, end)
  221. // (1) Build out Pod map
  222. // Build out a map of Allocations as a mapping from pod-to-container-to-
  223. // underlying-Allocation instance, starting with (start, end) so that we
  224. // begin with minutes, from which we compute resource allocation and cost
  225. // totals from measured rate data.
  226. podMap := map[podKey]*pod{}
  227. // clusterStarts and clusterEnds record the earliest start and latest end
  228. // times, respectively, on a cluster-basis. These are used for unmounted
  229. // PVs and other "virtual" Allocations so that minutes are maximally
  230. // accurate during start-up or spin-down of a cluster
  231. clusterStart := map[string]time.Time{}
  232. clusterEnd := map[string]time.Time{}
  233. // If ingesting pod UID, we query kube_pod_container_status_running avg
  234. // by uid as well as the default values, and all podKeys/pods have their
  235. // names changed to "<pod_name> <pod_uid>". Because other metrics need
  236. // to generate keys to match pods but don't have UIDs, podUIDKeyMap
  237. // stores values of format:
  238. // default podKey : []{edited podkey 1, edited podkey 2}
  239. // This is because ingesting UID allows us to catch uncontrolled pods
  240. // with the same names. However, this will lead to a many-to-one metric
  241. // to podKey relation, so this map allows us to map the metric's
  242. // "<pod_name>" key to the edited "<pod_name> <pod_uid>" keys in podMap.
  243. ingestPodUID := env.IsIngestingPodUID()
  244. podUIDKeyMap := make(map[podKey][]podKey)
  245. if ingestPodUID {
  246. log.Debugf("CostModel.ComputeAllocation: ingesting UID data from KSM metrics...")
  247. }
  248. // TODO:CLEANUP remove "max batch" idea and clusterStart/End
  249. err := cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
  250. if err != nil {
  251. log.Errorf("CostModel.ComputeAllocation: failed to build pod map: %s", err.Error())
  252. }
  253. // (2) Run and apply remaining queries
  254. // Query for the duration between start and end
  255. durStr := timeutil.DurationString(end.Sub(start))
  256. if durStr == "" {
  257. return allocSet, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
  258. }
  259. // Convert resolution duration to a query-ready string
  260. resStr := timeutil.DurationString(resolution)
  261. ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
  262. queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, env.GetPromClusterLabel())
  263. resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
  264. queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, env.GetPromClusterLabel())
  265. resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
  266. queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, env.GetPromClusterLabel())
  267. resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
  268. queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, env.GetPromClusterLabel())
  269. resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
  270. queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, env.GetPromClusterLabel())
  271. resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
  272. queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, env.GetPromClusterLabel())
  273. resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
  274. queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, env.GetPromClusterLabel())
  275. resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
  276. queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, env.GetPromClusterLabel())
  277. resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
  278. queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, env.GetPromClusterLabel())
  279. resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
  280. queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, env.GetPromClusterLabel())
  281. resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
  282. queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, env.GetPromClusterLabel())
  283. resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
  284. queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, env.GetPromClusterLabel())
  285. resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
  286. queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, env.GetPromClusterLabel())
  287. resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
  288. queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr)
  289. resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
  290. queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr)
  291. resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
  292. queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, env.GetPromClusterLabel())
  293. resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
  294. queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, env.GetPromClusterLabel())
  295. resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
  296. queryPVActiveMins := fmt.Sprintf(queryFmtPVActiveMins, env.GetPromClusterLabel(), durStr, resStr)
  297. resChPVActiveMins := ctx.QueryAtTime(queryPVActiveMins, end)
  298. queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, env.GetPromClusterLabel())
  299. resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
  300. queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, env.GetPromClusterLabel())
  301. resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
  302. queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, env.GetPromClusterLabel())
  303. resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
  304. queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, env.GetPromClusterLabel())
  305. resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
  306. queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, env.GetPromClusterLabel())
  307. resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
  308. queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, env.GetPromClusterLabel())
  309. resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
  310. queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, env.GetPromClusterLabel())
  311. resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
  312. queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, env.GetPromClusterLabel())
  313. resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
  314. queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, env.GetPromClusterLabel())
  315. resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
  316. queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
  317. resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
  318. queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
  319. resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
  320. queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr)
  321. resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
  322. queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr)
  323. resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
  324. queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr)
  325. resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
  326. queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr)
  327. resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
  328. queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr)
  329. resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
  330. queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr)
  331. resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
  332. queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, env.GetPromClusterLabel())
  333. resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
  334. queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, env.GetPromClusterLabel())
  335. resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
  336. queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, env.GetPromClusterLabel())
  337. resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
  338. queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, env.GetPromClusterLabel())
  339. resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
  340. queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, env.GetPromClusterLabel())
  341. resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
  342. queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr)
  343. resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
  344. resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
  345. resCPURequests, _ := resChCPURequests.Await()
  346. resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
  347. resCPUUsageMax, _ := resChCPUUsageMax.Await()
  348. resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
  349. resRAMRequests, _ := resChRAMRequests.Await()
  350. resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
  351. resRAMUsageMax, _ := resChRAMUsageMax.Await()
  352. resGPUsRequested, _ := resChGPUsRequested.Await()
  353. resGPUsAllocated, _ := resChGPUsAllocated.Await()
  354. resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
  355. resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
  356. resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
  357. resNodeIsSpot, _ := resChNodeIsSpot.Await()
  358. resPVActiveMins, _ := resChPVActiveMins.Await()
  359. resPVBytes, _ := resChPVBytes.Await()
  360. resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
  361. resPVCInfo, _ := resChPVCInfo.Await()
  362. resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
  363. resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
  364. resNetTransferBytes, _ := resChNetTransferBytes.Await()
  365. resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
  366. resNetZoneGiB, _ := resChNetZoneGiB.Await()
  367. resNetZoneCostPerGiB, _ := resChNetZoneCostPerGiB.Await()
  368. resNetRegionGiB, _ := resChNetRegionGiB.Await()
  369. resNetRegionCostPerGiB, _ := resChNetRegionCostPerGiB.Await()
  370. resNetInternetGiB, _ := resChNetInternetGiB.Await()
  371. resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
  372. resNamespaceLabels, _ := resChNamespaceLabels.Await()
  373. resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
  374. resPodLabels, _ := resChPodLabels.Await()
  375. resPodAnnotations, _ := resChPodAnnotations.Await()
  376. resServiceLabels, _ := resChServiceLabels.Await()
  377. resDeploymentLabels, _ := resChDeploymentLabels.Await()
  378. resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
  379. resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
  380. resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
  381. resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
  382. resJobLabels, _ := resChJobLabels.Await()
  383. resLBCostPerHr, _ := resChLBCostPerHr.Await()
  384. resLBActiveMins, _ := resChLBActiveMins.Await()
  385. if ctx.HasErrors() {
  386. for _, err := range ctx.Errors() {
  387. log.Errorf("CostModel.ComputeAllocation: query context error %s", err)
  388. }
  389. return allocSet, ctx.ErrorCollection()
  390. }
  391. // We choose to apply allocation before requests in the cases of RAM and
  392. // CPU so that we can assert that allocation should always be greater than
  393. // or equal to request.
  394. applyCPUCoresAllocated(podMap, resCPUCoresAllocated, podUIDKeyMap)
  395. applyCPUCoresRequested(podMap, resCPURequests, podUIDKeyMap)
  396. applyCPUCoresUsedAvg(podMap, resCPUUsageAvg, podUIDKeyMap)
  397. applyCPUCoresUsedMax(podMap, resCPUUsageMax, podUIDKeyMap)
  398. applyRAMBytesAllocated(podMap, resRAMBytesAllocated, podUIDKeyMap)
  399. applyRAMBytesRequested(podMap, resRAMRequests, podUIDKeyMap)
  400. applyRAMBytesUsedAvg(podMap, resRAMUsageAvg, podUIDKeyMap)
  401. applyRAMBytesUsedMax(podMap, resRAMUsageMax, podUIDKeyMap)
  402. applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated, podUIDKeyMap)
  403. applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes, podUIDKeyMap)
  404. applyNetworkAllocation(podMap, resNetZoneGiB, resNetZoneCostPerGiB, podUIDKeyMap, networkCrossZoneCost)
  405. applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionCostPerGiB, podUIDKeyMap, networkCrossRegionCost)
  406. applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetCostPerGiB, podUIDKeyMap, networkInternetCost)
  407. // In the case that a two pods with the same name had different containers,
  408. // we will double-count the containers. There is no way to associate each
  409. // container with the proper pod from the usage metrics above. This will
  410. // show up as a pod having two Allocations running for the whole pod runtime.
  411. // Other than that case, Allocations should be associated with pods by the
  412. // above functions.
  413. namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
  414. podLabels := resToPodLabels(resPodLabels, podUIDKeyMap, ingestPodUID)
  415. namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
  416. podAnnotations := resToPodAnnotations(resPodAnnotations, podUIDKeyMap, ingestPodUID)
  417. applyLabels(podMap, namespaceLabels, podLabels)
  418. applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
  419. podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
  420. podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
  421. podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels, podUIDKeyMap, ingestPodUID)
  422. podJobMap := resToPodJobMap(resJobLabels, podUIDKeyMap, ingestPodUID)
  423. podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners, podUIDKeyMap, ingestPodUID)
  424. applyControllersToPods(podMap, podDeploymentMap)
  425. applyControllersToPods(podMap, podStatefulSetMap)
  426. applyControllersToPods(podMap, podDaemonSetMap)
  427. applyControllersToPods(podMap, podJobMap)
  428. applyControllersToPods(podMap, podReplicaSetMap)
  429. serviceLabels := getServiceLabels(resServiceLabels)
  430. allocsByService := map[serviceKey][]*kubecost.Allocation{}
  431. applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
  432. // TODO breakdown network costs?
  433. // Build out the map of all PVs with class, size and cost-per-hour.
  434. // Note: this does not record time running, which we may want to
  435. // include later for increased PV precision. (As long as the PV has
  436. // a PVC, we get time running there, so this is only inaccurate
  437. // for short-lived, unmounted PVs.)
  438. pvMap := map[pvKey]*pv{}
  439. buildPVMap(resolution, pvMap, resPVCostPerGiBHour, resPVActiveMins)
  440. applyPVBytes(pvMap, resPVBytes)
  441. // Build out the map of all PVCs with time running, bytes requested,
  442. // and connect to the correct PV from pvMap. (If no PV exists, that
  443. // is noted, but does not result in any allocation/cost.)
  444. pvcMap := map[pvcKey]*pvc{}
  445. buildPVCMap(resolution, pvcMap, pvMap, resPVCInfo)
  446. applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
  447. // Build out the relationships of pods to their PVCs. This step
  448. // populates the pvc.Count field so that pvc allocation can be
  449. // split appropriately among each pod's container allocation.
  450. podPVCMap := map[podKey][]*pvc{}
  451. buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation, podUIDKeyMap, ingestPodUID)
  452. applyPVCsToPods(window, podMap, podPVCMap, pvcMap)
  453. // Identify PVCs without pods and add pv costs to the unmounted Allocation for the pvc's cluster
  454. applyUnmountedPVCs(window, podMap, pvcMap)
  455. // Identify PVs without PVCs and add PV costs to the unmounted Allocation for the PV's cluster
  456. applyUnmountedPVs(window, podMap, pvMap, pvcMap)
  457. lbMap := make(map[serviceKey]*lbCost)
  458. getLoadBalancerCosts(lbMap, resLBCostPerHr, resLBActiveMins, resolution)
  459. applyLoadBalancersToPods(window, podMap, lbMap, allocsByService)
  460. // Build out a map of Nodes with resource costs, discounts, and node types
  461. // for converting resource allocation data to cumulative costs.
  462. nodeMap := map[nodeKey]*nodePricing{}
  463. applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
  464. applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
  465. applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
  466. applyNodeSpot(nodeMap, resNodeIsSpot)
  467. applyNodeDiscount(nodeMap, cm)
  468. cm.applyNodesToPod(podMap, nodeMap)
  469. // (3) Build out AllocationSet from Pod map
  470. for _, pod := range podMap {
  471. for _, alloc := range pod.Allocations {
  472. cluster := alloc.Properties.Cluster
  473. nodeName := alloc.Properties.Node
  474. namespace := alloc.Properties.Namespace
  475. podName := alloc.Properties.Pod
  476. container := alloc.Properties.Container
  477. // Make sure that the name is correct (node may not be present at this
  478. // point due to it missing from queryMinutes) then insert.
  479. alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, podName, container)
  480. allocSet.Set(alloc)
  481. }
  482. }
  483. return allocSet, nil
  484. }