| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- package costmodel
- import (
- "fmt"
- "time"
- "github.com/opencost/opencost/core/pkg/opencost"
- "github.com/opencost/opencost/core/pkg/source"
- "github.com/opencost/opencost/core/pkg/util/timeutil"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/pkg/env"
- )
- // CanCompute should return true if CostModel can act as a valid source for the
- // given time range. In the case of CostModel we want to attempt to compute as
- // long as the range starts in the past. If the CostModel ends up not having
- // data to match, that's okay, and should be communicated with an error
- // response from ComputeAllocation.
- func (cm *CostModel) CanCompute(start, end time.Time) bool {
- return start.Before(time.Now())
- }
- // Name returns the name of the Source
- func (cm *CostModel) Name() string {
- return "CostModel"
- }
- // ComputeAllocation uses the CostModel instance to compute an AllocationSet
- // for the window defined by the given start and end times. The Allocations
- // returned are unaggregated (i.e. down to the container level).
- func (cm *CostModel) ComputeAllocation(start, end time.Time) (*opencost.AllocationSet, error) {
- // If the duration is short enough, compute the AllocationSet directly
- if end.Sub(start) <= cm.BatchDuration {
- as, _, err := cm.computeAllocation(start, end)
- return as, err
- }
- // If the duration exceeds the configured MaxPrometheusQueryDuration, then
- // query for maximum-sized AllocationSets, collect them, and accumulate.
- // s and e track the coverage of the entire given window over multiple
- // internal queries.
- s, e := start, start
- // Collect AllocationSets in a range, then accumulate
- // TODO optimize by collecting consecutive AllocationSets, accumulating as we go
- asr := opencost.NewAllocationSetRange()
- for e.Before(end) {
- // By default, query for the full remaining duration. But do not let
- // any individual query duration exceed the configured max Prometheus
- // query duration.
- duration := end.Sub(e)
- if duration > cm.BatchDuration {
- duration = cm.BatchDuration
- }
- // Set start and end parameters (s, e) for next individual computation.
- e = s.Add(duration)
- // Compute the individual AllocationSet for just (s, e)
- as, _, err := cm.computeAllocation(s, e)
- if err != nil {
- return opencost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", opencost.NewClosedWindow(s, e), err)
- }
- // Append to the range
- asr.Append(as)
- // Set s equal to e to set up the next query, if one exists.
- s = e
- }
- // Populate annotations, labels, and services on each Allocation. This is
- // necessary because Properties.Intersection does not propagate any values
- // stored in maps or slices for performance reasons. In this case, however,
- // it is both acceptable and necessary to do so.
- allocationAnnotations := map[string]map[string]string{}
- allocationLabels := map[string]map[string]string{}
- allocationServices := map[string]map[string]bool{}
- // Also record errors and warnings, then append them to the results later.
- errors := []string{}
- warnings := []string{}
- for _, as := range asr.Allocations {
- for k, a := range as.Allocations {
- if len(a.Properties.Annotations) > 0 {
- if _, ok := allocationAnnotations[k]; !ok {
- allocationAnnotations[k] = map[string]string{}
- }
- for name, val := range a.Properties.Annotations {
- allocationAnnotations[k][name] = val
- }
- }
- if len(a.Properties.Labels) > 0 {
- if _, ok := allocationLabels[k]; !ok {
- allocationLabels[k] = map[string]string{}
- }
- for name, val := range a.Properties.Labels {
- allocationLabels[k][name] = val
- }
- }
- if len(a.Properties.Services) > 0 {
- if _, ok := allocationServices[k]; !ok {
- allocationServices[k] = map[string]bool{}
- }
- for _, val := range a.Properties.Services {
- allocationServices[k][val] = true
- }
- }
- }
- errors = append(errors, as.Errors...)
- warnings = append(warnings, as.Warnings...)
- }
- // Accumulate to yield the result AllocationSet. After this step, we will
- // be nearly complete, but without the raw allocation data, which must be
- // recomputed.
- resultASR, err := asr.Accumulate(opencost.AccumulateOptionAll)
- if err != nil {
- return opencost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", opencost.NewClosedWindow(s, e), err)
- }
- if resultASR != nil && len(resultASR.Allocations) == 0 {
- return opencost.NewAllocationSet(start, end), nil
- }
- if length := len(resultASR.Allocations); length != 1 {
- return opencost.NewAllocationSet(start, end), fmt.Errorf("expected 1 accumulated allocation set, found %d sets", length)
- }
- result := resultASR.Allocations[0]
- // Apply the annotations, labels, and services to the post-accumulation
- // results. (See above for why this is necessary.)
- for k, a := range result.Allocations {
- if annotations, ok := allocationAnnotations[k]; ok {
- a.Properties.Annotations = annotations
- }
- if labels, ok := allocationLabels[k]; ok {
- a.Properties.Labels = labels
- }
- if services, ok := allocationServices[k]; ok {
- a.Properties.Services = []string{}
- for s := range services {
- a.Properties.Services = append(a.Properties.Services, s)
- }
- }
- // Expand the Window of all Allocations within the AllocationSet
- // to match the Window of the AllocationSet, which gets expanded
- // at the end of this function.
- a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
- }
- // Maintain RAM and CPU max usage values by iterating over the range,
- // computing maximums on a rolling basis, and setting on the result set.
- for _, as := range asr.Allocations {
- for key, alloc := range as.Allocations {
- resultAlloc := result.Get(key)
- if resultAlloc == nil {
- continue
- }
- if resultAlloc.RawAllocationOnly == nil {
- resultAlloc.RawAllocationOnly = &opencost.RawAllocationOnlyData{}
- }
- if alloc.RawAllocationOnly == nil {
- // This will happen inevitably for unmounted disks, but should
- // ideally not happen for any allocation with CPU and RAM data.
- if !alloc.IsUnmounted() {
- log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
- }
- continue
- }
- if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
- resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
- }
- if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
- resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
- }
- if alloc.RawAllocationOnly.GPUUsageMax != nil && resultAlloc.RawAllocationOnly.GPUUsageMax != nil {
- if *alloc.RawAllocationOnly.GPUUsageMax > *resultAlloc.RawAllocationOnly.GPUUsageMax {
- resultAlloc.RawAllocationOnly.GPUUsageMax = alloc.RawAllocationOnly.GPUUsageMax
- }
- }
- }
- }
- // Expand the window to match the queried time range.
- result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
- // Append errors and warnings
- result.Errors = errors
- result.Warnings = warnings
- // Convert any NaNs to 0 to avoid JSON marshaling issues and avoid cascading NaN appearances elsewhere
- result.SanitizeNaN()
- return result, nil
- }
- // DateRange checks the data (up to 90 days in the past), and returns the oldest and newest sample timestamp from opencost scraping metric
- // it supposed to be a good indicator of available allocation data
- func (cm *CostModel) DateRange(limitDays int) (time.Time, time.Time, error) {
- return cm.DataSource.Metrics().QueryDataCoverage(limitDays)
- }
- func (cm *CostModel) computeAllocation(start, end time.Time) (*opencost.AllocationSet, map[nodeKey]*nodePricing, error) {
- // 1. Build out Pod map from resolution-tuned, batched Pod start/end query
- // 2. Run and apply the results of the remaining queries to
- // 3. Build out AllocationSet from completed Pod map
- resolution := cm.DataSource.Resolution()
- // Create a window spanning the requested query
- window := opencost.NewWindow(&start, &end)
- // Create an empty AllocationSet. For safety, in the case of an error, we
- // should prefer to return this empty set with the error. (In the case of
- // no error, of course we populate the set and return it.)
- allocSet := opencost.NewAllocationSet(start, end)
- // (1) Build out Pod map
- // Build out a map of Allocations as a mapping from pod-to-container-to-
- // underlying-Allocation instance, starting with (start, end) so that we
- // begin with minutes, from which we compute resource allocation and cost
- // totals from measured rate data.
- podMap := map[podKey]*pod{}
- // If ingesting pod UID, we query kube_pod_container_status_running avg
- // by uid as well as the default values, and all podKeys/pods have their
- // names changed to "<pod_name> <pod_uid>". Because other metrics need
- // to generate keys to match pods but don't have UIDs, podUIDKeyMap
- // stores values of format:
- // default podKey : []{edited podkey 1, edited podkey 2}
- // This is because ingesting UID allows us to catch uncontrolled pods
- // with the same names. However, this will lead to a many-to-one metric
- // to podKey relation, so this map allows us to map the metric's
- // "<pod_name>" key to the edited "<pod_name> <pod_uid>" keys in podMap.
- ingestPodUID := env.IsIngestingPodUID()
- podUIDKeyMap := make(map[podKey][]podKey)
- if ingestPodUID {
- log.Debugf("CostModel.ComputeAllocation: ingesting UID data from KSM metrics...")
- }
- err := cm.buildPodMap(window, podMap, ingestPodUID, podUIDKeyMap)
- if err != nil {
- log.Errorf("CostModel.ComputeAllocation: failed to build pod map: %s", err.Error())
- }
- // (2) Run and apply remaining queries
- // Query for the duration between start and end
- durStr := timeutil.DurationString(end.Sub(start))
- if durStr == "" {
- return allocSet, nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
- }
- grp := source.NewQueryGroup()
- ds := cm.DataSource.Metrics()
- resChRAMBytesAllocated := source.WithGroup(grp, ds.QueryRAMBytesAllocated(start, end))
- resChRAMRequests := source.WithGroup(grp, ds.QueryRAMRequests(start, end))
- resChRAMLimits := source.WithGroup(grp, ds.QueryRAMLimits(start, end))
- resChRAMUsageAvg := source.WithGroup(grp, ds.QueryRAMUsageAvg(start, end))
- resChRAMUsageMax := source.WithGroup(grp, ds.QueryRAMUsageMax(start, end))
- resChCPUCoresAllocated := source.WithGroup(grp, ds.QueryCPUCoresAllocated(start, end))
- resChCPURequests := source.WithGroup(grp, ds.QueryCPURequests(start, end))
- resChCPULimits := source.WithGroup(grp, ds.QueryCPULimits(start, end))
- resChCPUUsageAvg := source.WithGroup(grp, ds.QueryCPUUsageAvg(start, end))
- resChCPUUsageMax := source.WithGroup(grp, ds.QueryCPUUsageMax(start, end))
- resCPUUsageMax, _ := resChCPUUsageMax.Await()
- // This avoids logspam if there is no data for either metric (e.g. if
- // the Prometheus didn't exist in the queried window of time).
- if len(resCPUUsageMax) > 0 {
- log.Debugf("CPU usage recording rule query returned an empty result when queried at %s over %s. Fell back to subquery. Consider setting up Kubecost CPU usage recording role to reduce query load on Prometheus; subqueries are expensive.", end.String(), durStr)
- }
- // GPU Queries
- resChIsGpuShared := source.WithGroup(grp, ds.QueryIsGPUShared(start, end))
- resChGPUsAllocated := source.WithGroup(grp, ds.QueryGPUsAllocated(start, end))
- resChGPUsRequested := source.WithGroup(grp, ds.QueryGPUsRequested(start, end))
- resChGPUsUsageAvg := source.WithGroup(grp, ds.QueryGPUsUsageAvg(start, end))
- resChGPUsUsageMax := source.WithGroup(grp, ds.QueryGPUsUsageMax(start, end))
- resChGetGPUInfo := source.WithGroup(grp, ds.QueryGPUInfo(start, end))
- resChNodeCostPerCPUHr := source.WithGroup(grp, ds.QueryNodeCPUPricePerHr(start, end))
- resChNodeCostPerRAMGiBHr := source.WithGroup(grp, ds.QueryNodeRAMPricePerGiBHr(start, end))
- resChNodeCostPerGPUHr := source.WithGroup(grp, ds.QueryNodeGPUPricePerHr(start, end))
- resChNodeIsSpot := source.WithGroup(grp, ds.QueryNodeIsSpot(start, end))
- resChPVCInfo := source.WithGroup(grp, ds.QueryPVCInfo(start, end))
- resChPodPVCAllocation := source.WithGroup(grp, ds.QueryPodPVCAllocation(start, end))
- resChPVCBytesRequested := source.WithGroup(grp, ds.QueryPVCBytesRequested(start, end))
- resChPVActiveMins := source.WithGroup(grp, ds.QueryPVActiveMinutes(start, end))
- resChPVBytes := source.WithGroup(grp, ds.QueryPVBytes(start, end))
- resChPVCostPerGiBHour := source.WithGroup(grp, ds.QueryPVPricePerGiBHour(start, end))
- resChPVMeta := source.WithGroup(grp, ds.QueryPVInfo(start, end))
- resChNetTransferBytes := source.WithGroup(grp, ds.QueryNetTransferBytes(start, end))
- resChNetReceiveBytes := source.WithGroup(grp, ds.QueryNetReceiveBytes(start, end))
- resChNetZoneGiB := source.WithGroup(grp, ds.QueryNetZoneGiB(start, end))
- resChNetZonePricePerGiB := source.WithGroup(grp, ds.QueryNetZonePricePerGiB(start, end))
- resChNetRegionGiB := source.WithGroup(grp, ds.QueryNetRegionGiB(start, end))
- resChNetRegionPricePerGiB := source.WithGroup(grp, ds.QueryNetRegionPricePerGiB(start, end))
- resChNetInternetGiB := source.WithGroup(grp, ds.QueryNetInternetGiB(start, end))
- resChNetInternetPricePerGiB := source.WithGroup(grp, ds.QueryNetInternetPricePerGiB(start, end))
- resChNetNatGatewayGiB := source.WithGroup(grp, ds.QueryNetNatGatewayGiB(start, end))
- resChNetNatGatewayEgressPricePerGiB := source.WithGroup(grp, ds.QueryNetNatGatewayPricePerGiB(start, end))
- resChNetNatGatewayIngressGiB := source.WithGroup(grp, ds.QueryNetNatGatewayIngressGiB(start, end))
- resChNetNatGatewayIngressPricePerGiB := source.WithGroup(grp, ds.QueryNetNatGatewayIngressPricePerGiB(start, end))
- var resChNodeLabels *source.QueryGroupFuture[source.NodeLabelsResult]
- if env.IsAllocationNodeLabelsEnabled() {
- resChNodeLabels = source.WithGroup(grp, ds.QueryNodeLabels(start, end))
- }
- resChNamespaceLabels := source.WithGroup(grp, ds.QueryNamespaceLabels(start, end))
- resChNamespaceAnnotations := source.WithGroup(grp, ds.QueryNamespaceAnnotations(start, end))
- resChPodLabels := source.WithGroup(grp, ds.QueryPodLabels(start, end))
- resChPodAnnotations := source.WithGroup(grp, ds.QueryPodAnnotations(start, end))
- resChServiceLabels := source.WithGroup(grp, ds.QueryServiceLabels(start, end))
- resChDeploymentLabels := source.WithGroup(grp, ds.QueryDeploymentLabels(start, end))
- resChStatefulSetLabels := source.WithGroup(grp, ds.QueryStatefulSetLabels(start, end))
- resChDaemonSetLabels := source.WithGroup(grp, ds.QueryDaemonSetLabels(start, end))
- resChPodsWithReplicaSetOwner := source.WithGroup(grp, ds.QueryPodsWithReplicaSetOwner(start, end))
- resChReplicaSetsWithoutOwners := source.WithGroup(grp, ds.QueryReplicaSetsWithoutOwners(start, end))
- resChReplicaSetsWithRolloutOwner := source.WithGroup(grp, ds.QueryReplicaSetsWithRollout(start, end))
- resChJobLabels := source.WithGroup(grp, ds.QueryJobLabels(start, end))
- resChLBCostPerHr := source.WithGroup(grp, ds.QueryLBPricePerHr(start, end))
- resChLBActiveMins := source.WithGroup(grp, ds.QueryLBActiveMinutes(start, end))
- resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
- resCPURequests, _ := resChCPURequests.Await()
- resCPULimits, _ := resChCPULimits.Await()
- resCPUUsageAvg, _ := resChCPUUsageAvg.Await()
- resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
- resRAMRequests, _ := resChRAMRequests.Await()
- resRAMLimits, _ := resChRAMLimits.Await()
- resRAMUsageAvg, _ := resChRAMUsageAvg.Await()
- resRAMUsageMax, _ := resChRAMUsageMax.Await()
- resGPUsRequested, _ := resChGPUsRequested.Await()
- resGPUsUsageAvg, _ := resChGPUsUsageAvg.Await()
- resGPUsUsageMax, _ := resChGPUsUsageMax.Await()
- resGPUsAllocated, _ := resChGPUsAllocated.Await()
- resIsGpuShared, _ := resChIsGpuShared.Await()
- resGetGPUInfo, _ := resChGetGPUInfo.Await()
- resNodeCostPerCPUHr, _ := resChNodeCostPerCPUHr.Await()
- resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
- resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
- resNodeIsSpot, _ := resChNodeIsSpot.Await()
- nodeExtendedData, _ := queryExtendedNodeData(grp, ds, start, end)
- resPVActiveMins, _ := resChPVActiveMins.Await()
- resPVBytes, _ := resChPVBytes.Await()
- resPVCostPerGiBHour, _ := resChPVCostPerGiBHour.Await()
- resPVMeta, _ := resChPVMeta.Await()
- resPVCInfo, _ := resChPVCInfo.Await()
- resPVCBytesRequested, _ := resChPVCBytesRequested.Await()
- resPodPVCAllocation, _ := resChPodPVCAllocation.Await()
- resNetTransferBytes, _ := resChNetTransferBytes.Await()
- resNetReceiveBytes, _ := resChNetReceiveBytes.Await()
- resNetZoneGiB, _ := resChNetZoneGiB.Await()
- resNetZonePricePerGiB, _ := resChNetZonePricePerGiB.Await()
- resNetRegionGiB, _ := resChNetRegionGiB.Await()
- resNetRegionPricePerGiB, _ := resChNetRegionPricePerGiB.Await()
- resNetInternetGiB, _ := resChNetInternetGiB.Await()
- resNetInternetPricePerGiB, _ := resChNetInternetPricePerGiB.Await()
- resNetNatGatewayGiB, _ := resChNetNatGatewayGiB.Await()
- resNetNatGatewayEgressPricePerGiB, _ := resChNetNatGatewayEgressPricePerGiB.Await()
- resNetNatGatewayIngressGiB, _ := resChNetNatGatewayIngressGiB.Await()
- resNetNatGatewayIngressPricePerGiB, _ := resChNetNatGatewayIngressPricePerGiB.Await()
- var resNodeLabels []*source.NodeLabelsResult
- if env.IsAllocationNodeLabelsEnabled() {
- resNodeLabels, _ = resChNodeLabels.Await()
- }
- resNamespaceLabels, _ := resChNamespaceLabels.Await()
- resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
- resPodLabels, _ := resChPodLabels.Await()
- resPodAnnotations, _ := resChPodAnnotations.Await()
- resServiceLabels, _ := resChServiceLabels.Await()
- resDeploymentLabels, _ := resChDeploymentLabels.Await()
- resStatefulSetLabels, _ := resChStatefulSetLabels.Await()
- resDaemonSetLabels, _ := resChDaemonSetLabels.Await()
- resPodsWithReplicaSetOwner, _ := resChPodsWithReplicaSetOwner.Await()
- resReplicaSetsWithoutOwners, _ := resChReplicaSetsWithoutOwners.Await()
- resReplicaSetsWithRolloutOwner, _ := resChReplicaSetsWithRolloutOwner.Await()
- resJobLabels, _ := resChJobLabels.Await()
- resLBCostPerHr, _ := resChLBCostPerHr.Await()
- resLBActiveMins, _ := resChLBActiveMins.Await()
- if grp.HasErrors() {
- for _, err := range grp.Errors() {
- log.Errorf("CostModel.ComputeAllocation: query context error %s", err)
- }
- return allocSet, nil, grp.Error()
- }
- // We choose to apply allocation before requests in the cases of RAM and
- // CPU so that we can assert that allocation should always be greater than
- // or equal to request.
- applyCPUCoresAllocated(podMap, resCPUCoresAllocated, podUIDKeyMap)
- applyCPUCoresRequested(podMap, resCPURequests, podUIDKeyMap)
- applyCPUCoresLimits(podMap, resCPULimits, podUIDKeyMap)
- applyCPUCoresUsedAvg(podMap, resCPUUsageAvg, podUIDKeyMap)
- applyCPUCoresUsedMax(podMap, resCPUUsageMax, podUIDKeyMap)
- applyRAMBytesAllocated(podMap, resRAMBytesAllocated, podUIDKeyMap)
- applyRAMBytesRequested(podMap, resRAMRequests, podUIDKeyMap)
- applyRAMBytesLimits(podMap, resRAMLimits, podUIDKeyMap)
- applyRAMBytesUsedAvg(podMap, resRAMUsageAvg, podUIDKeyMap)
- applyRAMBytesUsedMax(podMap, resRAMUsageMax, podUIDKeyMap)
- applyGPUUsageAvg(podMap, resGPUsUsageAvg, podUIDKeyMap)
- applyGPUUsageMax(podMap, resGPUsUsageMax, podUIDKeyMap)
- applyGPUUsageShared(podMap, resIsGpuShared, podUIDKeyMap)
- applyGPUInfo(podMap, resGetGPUInfo, podUIDKeyMap)
- applyGPUsAllocated(podMap, resGPUsRequested, resGPUsAllocated, podUIDKeyMap)
- applyNetworkTotals(podMap, resNetTransferBytes, resNetReceiveBytes, podUIDKeyMap)
- applyNetworkAllocation(podMap, resNetZoneGiB, resNetZonePricePerGiB, podUIDKeyMap, applyCrossZoneNetworkAllocation)
- applyNetworkAllocation(podMap, resNetRegionGiB, resNetRegionPricePerGiB, podUIDKeyMap, applyCrossRegionNetworkAllocation)
- applyNetworkAllocation(podMap, resNetInternetGiB, resNetInternetPricePerGiB, podUIDKeyMap, applyInternetNetworkAllocation)
- applyNetworkAllocation(podMap, resNetNatGatewayGiB, resNetNatGatewayEgressPricePerGiB, podUIDKeyMap, applyNatGatewayEgressAllocation)
- applyNetworkAllocation(podMap, resNetNatGatewayIngressGiB, resNetNatGatewayIngressPricePerGiB, podUIDKeyMap, applyNatGatewayIngressAllocation)
- // In the case that a two pods with the same name had different containers,
- // we will double-count the containers. There is no way to associate each
- // container with the proper pod from the usage metrics above. This will
- // show up as a pod having two Allocations running for the whole pod runtime.
- // Other than that case, Allocations should be associated with pods by the
- // above functions.
- // At this point, we expect "Node" to be set by one of the above functions
- // (e.g. applyCPUCoresAllocated, etc.) -- otherwise, node labels will fail
- // to correctly apply to the pods.
- var nodeLabels map[nodeKey]map[string]string
- if env.IsAllocationNodeLabelsEnabled() {
- nodeLabels = resToNodeLabels(resNodeLabels)
- }
- namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
- podLabels := resToPodLabels(resPodLabels, podUIDKeyMap, ingestPodUID)
- namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
- podAnnotations := resToPodAnnotations(resPodAnnotations, podUIDKeyMap, ingestPodUID)
- applyLabels(podMap, nodeLabels, namespaceLabels, podLabels)
- applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
- podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))
- podStatefulSetMap := labelsToPodControllerMap(podLabels, resToStatefulSetLabels(resStatefulSetLabels))
- podDaemonSetMap := resToPodDaemonSetMap(resDaemonSetLabels, podUIDKeyMap, ingestPodUID)
- podJobMap := resToPodJobMap(resJobLabels, podUIDKeyMap, ingestPodUID)
- podReplicaSetMap := resToPodReplicaSetMap(resPodsWithReplicaSetOwner, resReplicaSetsWithoutOwners, resReplicaSetsWithRolloutOwner, podUIDKeyMap, ingestPodUID)
- applyControllersToPods(podMap, podDeploymentMap)
- applyControllersToPods(podMap, podStatefulSetMap)
- applyControllersToPods(podMap, podDaemonSetMap)
- applyControllersToPods(podMap, podJobMap)
- applyControllersToPods(podMap, podReplicaSetMap)
- serviceLabels := getServiceLabels(resServiceLabels)
- allocsByService := map[serviceKey][]*opencost.Allocation{}
- applyServicesToPods(podMap, podLabels, allocsByService, serviceLabels)
- // TODO breakdown network costs?
- // Build out the map of all PVs with class, size and cost-per-hour.
- // Note: this does not record time running, which we may want to
- // include later for increased PV precision. (As long as the PV has
- // a PVC, we get time running there, so this is only inaccurate
- // for short-lived, unmounted PVs.)
- pvMap := map[pvKey]*pv{}
- buildPVMap(resolution, pvMap, resPVCostPerGiBHour, resPVActiveMins, resPVMeta, window)
- applyPVBytes(pvMap, resPVBytes)
- // Build out the map of all PVCs with time running, bytes requested,
- // and connect to the correct PV from pvMap. (If no PV exists, that
- // is noted, but does not result in any allocation/cost.)
- pvcMap := map[pvcKey]*pvc{}
- buildPVCMap(resolution, pvcMap, pvMap, resPVCInfo, window)
- applyPVCBytesRequested(pvcMap, resPVCBytesRequested)
- // Build out the relationships of pods to their PVCs. This step
- // populates the pvc.Count field so that pvc allocation can be
- // split appropriately among each pod's container allocation.
- podPVCMap := map[podKey][]*pvc{}
- buildPodPVCMap(podPVCMap, pvMap, pvcMap, podMap, resPodPVCAllocation, podUIDKeyMap, ingestPodUID)
- applyPVCsToPods(window, podMap, podPVCMap, pvcMap)
- // Identify PVCs without pods and add pv costs to the unmounted Allocation for the pvc's cluster
- applyUnmountedPVCs(window, podMap, pvcMap)
- // Identify PVs without PVCs and add PV costs to the unmounted Allocation for the PV's cluster
- applyUnmountedPVs(window, podMap, pvMap, pvcMap)
- lbMap := make(map[serviceKey]*lbCost)
- getLoadBalancerCosts(lbMap, resLBCostPerHr, resLBActiveMins, resolution, window)
- applyLoadBalancersToPods(window, podMap, lbMap, allocsByService)
- // Build out a map of Nodes with resource costs, discounts, and node types
- // for converting resource allocation data to cumulative costs.
- nodeMap := map[nodeKey]*nodePricing{}
- applyNodeCostPerCPUHr(nodeMap, resNodeCostPerCPUHr)
- applyNodeCostPerRAMGiBHr(nodeMap, resNodeCostPerRAMGiBHr)
- applyNodeCostPerGPUHr(nodeMap, resNodeCostPerGPUHr)
- applyNodeSpot(nodeMap, resNodeIsSpot)
- applyNodeDiscount(nodeMap, cm)
- applyExtendedNodeData(nodeMap, nodeExtendedData)
- cm.applyNodesToPod(podMap, nodeMap)
- // (3) Build out AllocationSet from Pod map
- for _, pod := range podMap {
- for _, alloc := range pod.Allocations {
- cluster := alloc.Properties.Cluster
- nodeName := alloc.Properties.Node
- namespace := alloc.Properties.Namespace
- podName := alloc.Properties.Pod
- container := alloc.Properties.Container
- // Make sure that the name is correct (node may not be present at this
- // point due to it missing from queryMinutes) then insert.
- alloc.Name = fmt.Sprintf("%s/%s/%s/%s/%s", cluster, nodeName, namespace, podName, container)
- allocSet.Set(alloc)
- }
- }
- return allocSet, nodeMap, nil
- }
|