Quellcode durchsuchen

Normalize price/cost capture for lbs, add cluster management to opencost.

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt vor 1 Jahr
Ursprung
Commit
119df30825

+ 63 - 0
core/pkg/clusters/util.go

@@ -0,0 +1,63 @@
+package clusters
+
+import "fmt"
+
+// MapToClusterInfo returns a ClusterInfo using parsed data from a string map. If
+// parsing the map fails for id and/or name, an error is returned.
+func MapToClusterInfo(info map[string]string) (*ClusterInfo, error) {
+	var id string
+	var name string
+
+	if i, ok := info[ClusterInfoIdKey]; ok {
+		id = i
+	} else {
+		return nil, fmt.Errorf("cluster info missing id")
+	}
+	if n, ok := info[ClusterInfoNameKey]; ok {
+		name = n
+	} else {
+		name = id
+	}
+
+	var clusterProfile string
+	var provider string
+	var account string
+	var project string
+	var region string
+	var provisioner string
+
+	if cp, ok := info[ClusterInfoProfileKey]; ok {
+		clusterProfile = cp
+	}
+
+	if pvdr, ok := info[ClusterInfoProviderKey]; ok {
+		provider = pvdr
+	}
+
+	if acct, ok := info[ClusterInfoAccountKey]; ok {
+		account = acct
+	}
+
+	if proj, ok := info[ClusterInfoProjectKey]; ok {
+		project = proj
+	}
+
+	if reg, ok := info[ClusterInfoRegionKey]; ok {
+		region = reg
+	}
+
+	if pvsr, ok := info[ClusterInfoProvisionerKey]; ok {
+		provisioner = pvsr
+	}
+
+	return &ClusterInfo{
+		ID:          id,
+		Name:        name,
+		Profile:     clusterProfile,
+		Provider:    provider,
+		Account:     account,
+		Project:     project,
+		Region:      region,
+		Provisioner: provisioner,
+	}, nil
+}

+ 5 - 3
core/pkg/source/datasource.go

@@ -46,9 +46,13 @@ type ClusterMetricsQuerier interface {
 	QueryNodeRAMUserPercent(start, end time.Time) QueryResultsChan
 
 	// Load Balancers
-	QueryLBCost(start, end time.Time) QueryResultsChan
+	QueryLBPricePerHr(start, end time.Time) QueryResultsChan
 	QueryLBActiveMinutes(start, end time.Time) QueryResultsChan
 
+	// Cluster Management
+	QueryClusterManagementDuration(start, end time.Time) QueryResultsChan
+	QueryClusterManagementPricePerHr(start, end time.Time) QueryResultsChan
+
 	// Cluster Costs
 	QueryDataCount(start, end time.Time) QueryResultsChan
 	QueryTotalGPU(start, end time.Time) QueryResultsChan
@@ -114,8 +118,6 @@ type AllocationMetricsQuerier interface {
 	QueryPodsWithReplicaSetOwner(start, end time.Time) QueryResultsChan
 	QueryReplicaSetsWithoutOwners(start, end time.Time) QueryResultsChan
 	QueryReplicaSetsWithRollout(start, end time.Time) QueryResultsChan
-	QueryLBCostPerHr(start, end time.Time) QueryResultsChan
-	QueryLBActiveMins(start, end time.Time) QueryResultsChan
 	QueryDataCoverage(limitDays int) (time.Time, time.Time, error)
 	QueryIsGPUShared(start, end time.Time) QueryResultsChan
 	QueryGetGPUInfo(start, end time.Time) QueryResultsChan

+ 2 - 62
modules/prometheus-source/pkg/prom/clustermap.go

@@ -166,9 +166,9 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*clusters.ClusterInf
 func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*clusters.ClusterInfo, error) {
 	info := pcm.clusterInfo.GetClusterInfo()
 
-	clusterInfo, err := MapToClusterInfo(info)
+	clusterInfo, err := clusters.MapToClusterInfo(info)
 	if err != nil {
-		return nil, fmt.Errorf("Parsing Local Cluster Info Failed: %s", err)
+		return nil, fmt.Errorf("parsing local cluster info failed: %w", err)
 	}
 
 	return clusterInfo, nil
@@ -276,63 +276,3 @@ func (pcm *PrometheusClusterMap) StopRefresh() {
 		pcm.stop = nil
 	}
 }
-
-// MapToClusterInfo returns a ClusterInfo using parsed data from a string map. If
-// parsing the map fails for id and/or name, an error is returned.
-func MapToClusterInfo(info map[string]string) (*clusters.ClusterInfo, error) {
-	var id string
-	var name string
-
-	if i, ok := info[clusters.ClusterInfoIdKey]; ok {
-		id = i
-	} else {
-		return nil, fmt.Errorf("Cluster Info Missing ID")
-	}
-	if n, ok := info[clusters.ClusterInfoNameKey]; ok {
-		name = n
-	} else {
-		name = id
-	}
-
-	var clusterProfile string
-	var provider string
-	var account string
-	var project string
-	var region string
-	var provisioner string
-
-	if cp, ok := info[clusters.ClusterInfoProfileKey]; ok {
-		clusterProfile = cp
-	}
-
-	if pvdr, ok := info[clusters.ClusterInfoProviderKey]; ok {
-		provider = pvdr
-	}
-
-	if acct, ok := info[clusters.ClusterInfoAccountKey]; ok {
-		account = acct
-	}
-
-	if proj, ok := info[clusters.ClusterInfoProjectKey]; ok {
-		project = proj
-	}
-
-	if reg, ok := info[clusters.ClusterInfoRegionKey]; ok {
-		region = reg
-	}
-
-	if pvsr, ok := info[clusters.ClusterInfoProvisionerKey]; ok {
-		provisioner = pvsr
-	}
-
-	return &clusters.ClusterInfo{
-		ID:          id,
-		Name:        name,
-		Profile:     clusterProfile,
-		Provider:    provider,
-		Account:     account,
-		Project:     project,
-		Region:      region,
-		Provisioner: provisioner,
-	}, nil
-}

+ 64 - 44
modules/prometheus-source/pkg/prom/datasource.go

@@ -519,6 +519,30 @@ func (pds *PrometheusDataSource) prometheusMetrics(w http.ResponseWriter, _ *htt
 	proto.WriteResponse(w, proto.ToResponse(result, nil))
 }
 
+func (pds *PrometheusDataSource) PrometheusClient() prometheus.Client {
+	return pds.promClient
+}
+
+func (pds *PrometheusDataSource) PrometheusConfig() *OpenCostPrometheusConfig {
+	return pds.promConfig
+}
+
+func (pds *PrometheusDataSource) PrometheusContexts() *ContextFactory {
+	return pds.promContexts
+}
+
+func (pds *PrometheusDataSource) ThanosClient() prometheus.Client {
+	return pds.thanosClient
+}
+
+func (pds *PrometheusDataSource) ThanosConfig() *OpenCostThanosConfig {
+	return pds.thanosConfig
+}
+
+func (pds *PrometheusDataSource) ThanosContexts() *ContextFactory {
+	return pds.thanosContexts
+}
+
 func (pds *PrometheusDataSource) NewClusterMap(clusterInfoProvider clusters.ClusterInfoProvider) clusters.ClusterMap {
 	if pds.thanosClient != nil {
 		return newPrometheusClusterMap(pds.thanosContexts, clusterInfoProvider, 10*time.Minute)
@@ -1120,27 +1144,25 @@ func (pds *PrometheusDataSource) QueryNodeRAMUserPercent(start, end time.Time) s
 	return ctx.QueryAtTime(queryRAMUserPct, end)
 }
 
-func (pds *PrometheusDataSource) QueryLBCost(start, end time.Time) source.QueryResultsChan {
-	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
-
-	const lbCostQuery = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, %s, ingress_ip)`
+func (pds *PrometheusDataSource) QueryLBPricePerHr(start, end time.Time) source.QueryResultsChan {
+	const queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, ingress_ip, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
 
 	cfg := pds.promConfig
 
 	durStr := timeutil.DurationString(end.Sub(start))
 	if durStr == "" {
-		panic("failed to parse duration string passed to QueryLBCost")
+		panic("failed to parse duration string passed to QueryLBPricePerHr")
 	}
 
-	queryLBCost := fmt.Sprintf(lbCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
-	ctx := pds.promContexts.NewNamedContext(ClusterContextName)
-	return ctx.QueryAtTime(queryLBCost, end)
+	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryLBCostPerHr, end)
 }
 
 func (pds *PrometheusDataSource) QueryLBActiveMinutes(start, end time.Time) source.QueryResultsChan {
-	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
-
 	const lbActiveMinutesQuery = `avg(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`
+	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, minsPerResolution)
 
 	cfg := pds.promConfig
 	minsPerResolution := cfg.DataResolutionMinutes
@@ -1155,6 +1177,38 @@ func (pds *PrometheusDataSource) QueryLBActiveMinutes(start, end time.Time) sour
 	return ctx.QueryAtTime(queryLBActiveMins, end)
 }
 
+func (pds *PrometheusDataSource) QueryClusterManagementDuration(start, end time.Time) source.QueryResultsChan {
+	const clusterManagementDurationQuery = `avg(kubecost_cluster_management_cost{%s}) by (%s, provisioner_name)[%s:%dm]`
+
+	cfg := pds.promConfig
+	minsPerResolution := cfg.DataResolutionMinutes
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryClusterManagementDuration")
+	}
+
+	queryClusterManagementDuration := fmt.Sprintf(clusterManagementDurationQuery, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
+	ctx := pds.promContexts.NewNamedContext(ClusterContextName)
+	return ctx.QueryAtTime(queryClusterManagementDuration, end)
+}
+
+func (pds *PrometheusDataSource) QueryClusterManagementPricePerHr(start, end time.Time) source.QueryResultsChan {
+	const clusterManagementCostQuery = `avg(avg_over_time(kubecost_cluster_management_cost{%s}[%s])) by (%s, provisioner_name)`
+	// env.GetPromClusterFilter(), durationStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryClusterManagementCost")
+	}
+
+	queryClusterManagementCost := fmt.Sprintf(clusterManagementCostQuery, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(ClusterContextName)
+	return ctx.QueryAtTime(queryClusterManagementCost, end)
+}
+
 func (pds *PrometheusDataSource) QueryDataCount(start, end time.Time) source.QueryResultsChan {
 	const fmtQueryDataCount = `
 		count_over_time(sum(kube_node_status_capacity_cpu_cores{%s}) by (%s)[%s:%dm]) * %d
@@ -2210,40 +2264,6 @@ func (pds *PrometheusDataSource) QueryReplicaSetsWithRollout(start, end time.Tim
 	return ctx.QueryAtTime(queryReplicaSetsWithRolloutOwner, end)
 }
 
-func (pds *PrometheusDataSource) QueryLBCostPerHr(start, end time.Time) source.QueryResultsChan {
-	const queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, ingress_ip, %s)`
-	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-
-	cfg := pds.promConfig
-
-	durStr := timeutil.DurationString(end.Sub(start))
-	if durStr == "" {
-		panic("failed to parse duration string passed to QueryLBCostPerHr")
-	}
-
-	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
-	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
-	return ctx.QueryAtTime(queryLBCostPerHr, end)
-}
-
-func (pds *PrometheusDataSource) QueryLBActiveMins(start, end time.Time) source.QueryResultsChan {
-	const queryFmtLBActiveMins = `count(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s)[%s:%s]`
-	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
-
-	cfg := pds.promConfig
-	resolution := cfg.DataResolution
-	resStr := timeutil.DurationString(resolution)
-
-	durStr := timeutil.DurationString(end.Sub(start))
-	if durStr == "" {
-		panic("failed to parse duration string passed to QueryLBActiveMins")
-	}
-
-	queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
-	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
-	return ctx.QueryAtTime(queryLBActiveMins, end)
-}
-
 func (pds *PrometheusDataSource) QueryDataCoverage(limitDays int) (time.Time, time.Time, error) {
 	const (
 		queryFmtOldestSample = `min_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`

+ 0 - 614
pkg/costmodel/aggregation.go

@@ -2,15 +2,11 @@ package costmodel
 
 import (
 	"fmt"
-	"math"
 	"net/http"
-	"sort"
-	"strconv"
 	"strings"
 	"time"
 
 	"github.com/julienschmidt/httprouter"
-	"github.com/opencost/opencost/pkg/cloud/provider"
 	"github.com/opencost/opencost/pkg/errors"
 
 	"github.com/opencost/opencost/core/pkg/log"
@@ -18,9 +14,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/util"
 	"github.com/opencost/opencost/core/pkg/util/httputil"
 	"github.com/opencost/opencost/core/pkg/util/json"
-	"github.com/opencost/opencost/core/pkg/util/promutil"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
-	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/env"
 )
 
@@ -35,614 +29,6 @@ const (
 	UnallocatedSubfield = "__unallocated__"
 )
 
-// Aggregation describes aggregated cost data, containing cumulative cost and
-// allocation data per resource, vectors of rate data per resource, efficiency
-// data, and metadata describing the type of aggregation operation.
-type Aggregation struct {
-	Aggregator                 string                         `json:"aggregation"`
-	Subfields                  []string                       `json:"subfields,omitempty"`
-	Environment                string                         `json:"environment"`
-	Cluster                    string                         `json:"cluster,omitempty"`
-	Properties                 *opencost.AllocationProperties `json:"-"`
-	Start                      time.Time                      `json:"-"`
-	End                        time.Time                      `json:"-"`
-	CPUAllocationHourlyAverage float64                        `json:"cpuAllocationAverage"`
-	CPUAllocationVectors       []*util.Vector                 `json:"-"`
-	CPUAllocationTotal         float64                        `json:"-"`
-	CPUCost                    float64                        `json:"cpuCost"`
-	CPUCostVector              []*util.Vector                 `json:"cpuCostVector,omitempty"`
-	CPUEfficiency              float64                        `json:"cpuEfficiency"`
-	CPURequestedVectors        []*util.Vector                 `json:"-"`
-	CPUUsedVectors             []*util.Vector                 `json:"-"`
-	Efficiency                 float64                        `json:"efficiency"`
-	GPUAllocationHourlyAverage float64                        `json:"gpuAllocationAverage"`
-	GPUAllocationVectors       []*util.Vector                 `json:"-"`
-	GPUCost                    float64                        `json:"gpuCost"`
-	GPUCostVector              []*util.Vector                 `json:"gpuCostVector,omitempty"`
-	GPUAllocationTotal         float64                        `json:"-"`
-	RAMAllocationHourlyAverage float64                        `json:"ramAllocationAverage"`
-	RAMAllocationVectors       []*util.Vector                 `json:"-"`
-	RAMAllocationTotal         float64                        `json:"-"`
-	RAMCost                    float64                        `json:"ramCost"`
-	RAMCostVector              []*util.Vector                 `json:"ramCostVector,omitempty"`
-	RAMEfficiency              float64                        `json:"ramEfficiency"`
-	RAMRequestedVectors        []*util.Vector                 `json:"-"`
-	RAMUsedVectors             []*util.Vector                 `json:"-"`
-	PVAllocationHourlyAverage  float64                        `json:"pvAllocationAverage"`
-	PVAllocationVectors        []*util.Vector                 `json:"-"`
-	PVAllocationTotal          float64                        `json:"-"`
-	PVCost                     float64                        `json:"pvCost"`
-	PVCostVector               []*util.Vector                 `json:"pvCostVector,omitempty"`
-	NetworkCost                float64                        `json:"networkCost"`
-	NetworkCostVector          []*util.Vector                 `json:"networkCostVector,omitempty"`
-	SharedCost                 float64                        `json:"sharedCost"`
-	TotalCost                  float64                        `json:"totalCost"`
-	TotalCostVector            []*util.Vector                 `json:"totalCostVector,omitempty"`
-}
-
-// TotalHours determines the amount of hours the Aggregation covers, as a
-// function of the cost vectors and the resolution of those vectors' data
-func (a *Aggregation) TotalHours(resolutionHours float64) float64 {
-	length := 1
-
-	if length < len(a.CPUCostVector) {
-		length = len(a.CPUCostVector)
-	}
-	if length < len(a.RAMCostVector) {
-		length = len(a.RAMCostVector)
-	}
-	if length < len(a.PVCostVector) {
-		length = len(a.PVCostVector)
-	}
-	if length < len(a.GPUCostVector) {
-		length = len(a.GPUCostVector)
-	}
-	if length < len(a.NetworkCostVector) {
-		length = len(a.NetworkCostVector)
-	}
-
-	return float64(length) * resolutionHours
-}
-
-// RateCoefficient computes the coefficient by which the total cost needs to be
-// multiplied in order to convert totals costs into per-rate costs.
-func (a *Aggregation) RateCoefficient(rateStr string, resolutionHours float64) float64 {
-	// monthly rate = (730.0)*(total cost)/(total hours)
-	// daily rate = (24.0)*(total cost)/(total hours)
-	// hourly rate = (1.0)*(total cost)/(total hours)
-
-	// default to hourly rate
-	coeff := 1.0
-	switch rateStr {
-	case "daily":
-		coeff = timeutil.HoursPerDay
-	case "monthly":
-		coeff = timeutil.HoursPerMonth
-	}
-
-	return coeff / a.TotalHours(resolutionHours)
-}
-
-type SharedResourceInfo struct {
-	ShareResources  bool
-	SharedNamespace map[string]bool
-	LabelSelectors  map[string]map[string]bool
-}
-
-type SharedCostInfo struct {
-	Name      string
-	Cost      float64
-	ShareType string
-}
-
-func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
-	// exists in a shared namespace
-	if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
-		return true
-	}
-	// has at least one shared label (OR, not AND in the case of multiple labels)
-	for labelName, labelValues := range s.LabelSelectors {
-		if val, ok := costDatum.Labels[labelName]; ok && labelValues[val] {
-			return true
-		}
-	}
-	return false
-}
-
-func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelNames []string, labelValues []string) *SharedResourceInfo {
-	sr := &SharedResourceInfo{
-		ShareResources:  shareResources,
-		SharedNamespace: make(map[string]bool),
-		LabelSelectors:  make(map[string]map[string]bool),
-	}
-
-	for _, ns := range sharedNamespaces {
-		sr.SharedNamespace[strings.Trim(ns, " ")] = true
-	}
-
-	// Creating a map of label name to label value, but only if
-	// the cardinality matches
-	if len(labelNames) == len(labelValues) {
-		for i := range labelNames {
-			cleanedLname := promutil.SanitizeLabelName(strings.Trim(labelNames[i], " "))
-			if values, ok := sr.LabelSelectors[cleanedLname]; ok {
-				values[strings.Trim(labelValues[i], " ")] = true
-			} else {
-				sr.LabelSelectors[cleanedLname] = map[string]bool{strings.Trim(labelValues[i], " "): true}
-			}
-		}
-	}
-
-	return sr
-}
-
-func GetTotalContainerCost(costData map[string]*CostData, rate string, cp models.Provider, discount float64, customDiscount float64, idleCoefficients map[string]float64) float64 {
-	totalContainerCost := 0.0
-	for _, costDatum := range costData {
-		clusterID := costDatum.ClusterID
-		cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, discount, customDiscount, idleCoefficients[clusterID])
-		totalContainerCost += totalVectors(cpuv)
-		totalContainerCost += totalVectors(ramv)
-		totalContainerCost += totalVectors(gpuv)
-		for _, pv := range pvvs {
-			totalContainerCost += totalVectors(pv)
-		}
-		totalContainerCost += totalVectors(netv)
-	}
-	return totalContainerCost
-}
-
-func (a *Accesses) ComputeIdleCoefficient(costData map[string]*CostData, discount float64, customDiscount float64, window, offset time.Duration) (map[string]float64, error) {
-	coefficients := make(map[string]float64)
-
-	profileName := "ComputeIdleCoefficient: ComputeClusterCosts"
-	profileStart := time.Now()
-
-	var clusterCosts map[string]*ClusterCosts
-	var err error
-	fmtWindow, fmtOffset := timeutil.DurationOffsetStrings(window, offset)
-	key := fmt.Sprintf("%s:%s", fmtWindow, fmtOffset)
-	if data, valid := a.ClusterCostsCache.Get(key); valid {
-		clusterCosts = data.(map[string]*ClusterCosts)
-	} else {
-		clusterCosts, err = a.ComputeClusterCosts(a.DataSource, a.CloudProvider, window, offset, false)
-		if err != nil {
-			return nil, err
-		}
-	}
-
-	measureTime(profileStart, profileThreshold, profileName)
-
-	for cid, costs := range clusterCosts {
-		if costs.CPUCumulative == 0 && costs.RAMCumulative == 0 && costs.StorageCumulative == 0 {
-			log.Warnf("No ClusterCosts data for cluster '%s'. Is it emitting data?", cid)
-			coefficients[cid] = 1.0
-			continue
-		}
-
-		if costs.TotalCumulative == 0 {
-			return nil, fmt.Errorf("TotalCumulative cluster cost for cluster '%s' returned 0 over window '%s' offset '%s'", cid, fmtWindow, fmtOffset)
-		}
-
-		totalContainerCost := 0.0
-		for _, costDatum := range costData {
-			if costDatum.ClusterID == cid {
-				cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(a.CloudProvider, costDatum, discount, customDiscount, 1)
-				totalContainerCost += totalVectors(cpuv)
-				totalContainerCost += totalVectors(ramv)
-				totalContainerCost += totalVectors(gpuv)
-				for _, pv := range pvvs {
-					totalContainerCost += totalVectors(pv)
-				}
-			}
-		}
-
-		coeff := totalContainerCost / costs.TotalCumulative
-		coefficients[cid] = coeff
-	}
-
-	return coefficients, nil
-}
-
-// AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
-type AggregationOptions struct {
-	Discount               float64            // percent by which to discount CPU, RAM, and GPU cost
-	CustomDiscount         float64            // additional custom discount applied to all prices
-	IdleCoefficients       map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
-	IncludeEfficiency      bool               // set to true to receive efficiency/usage data
-	IncludeTimeSeries      bool               // set to true to receive time series data
-	Rate                   string             // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
-	ResolutionHours        float64
-	SharedResourceInfo     *SharedResourceInfo
-	SharedCosts            map[string]*SharedCostInfo
-	FilteredContainerCount int
-	FilteredEnvironments   map[string]int
-	SharedSplit            string
-	TotalContainerCost     float64
-}
-
-// Returns the blended discounts applied to the node as a result of global discounts and reserved instance
-// discounts
-func getDiscounts(costDatum *CostData, cpuCost float64, ramCost float64, discount float64) (float64, float64) {
-	if costDatum.NodeData == nil {
-		return discount, discount
-	}
-	if costDatum.NodeData.IsSpot() {
-		return 0, 0
-	}
-
-	reserved := costDatum.NodeData.Reserved
-
-	// blended discounts
-	blendedCPUDiscount := discount
-	blendedRAMDiscount := discount
-
-	if reserved != nil && reserved.CPUCost > 0 && reserved.RAMCost > 0 {
-		reservedCPUDiscount := 0.0
-		if cpuCost == 0 {
-			log.Warnf("No cpu cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
-		} else {
-			reservedCPUDiscount = 1.0 - (reserved.CPUCost / cpuCost)
-		}
-		reservedRAMDiscount := 0.0
-		if ramCost == 0 {
-			log.Warnf("No ram cost found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
-		} else {
-			reservedRAMDiscount = 1.0 - (reserved.RAMCost / ramCost)
-		}
-
-		// AWS passes the # of reserved CPU and RAM as -1 to represent "All"
-		if reserved.ReservedCPU < 0 && reserved.ReservedRAM < 0 {
-			blendedCPUDiscount = reservedCPUDiscount
-			blendedRAMDiscount = reservedRAMDiscount
-		} else {
-			nodeCPU, ierr := strconv.ParseInt(costDatum.NodeData.VCPU, 10, 64)
-			nodeRAM, ferr := strconv.ParseFloat(costDatum.NodeData.RAMBytes, 64)
-			if ierr == nil && ferr == nil {
-				nodeRAMGB := nodeRAM / 1024 / 1024 / 1024
-				reservedRAMGB := float64(reserved.ReservedRAM) / 1024 / 1024 / 1024
-				nonReservedCPU := nodeCPU - reserved.ReservedCPU
-				nonReservedRAM := nodeRAMGB - reservedRAMGB
-
-				if nonReservedCPU == 0 {
-					blendedCPUDiscount = reservedCPUDiscount
-				} else {
-					if nodeCPU == 0 {
-						log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
-					} else {
-						blendedCPUDiscount = (float64(reserved.ReservedCPU) * reservedCPUDiscount) + (float64(nonReservedCPU)*discount)/float64(nodeCPU)
-					}
-				}
-
-				if nonReservedRAM == 0 {
-					blendedRAMDiscount = reservedRAMDiscount
-				} else {
-					if nodeRAMGB == 0 {
-						log.Warnf("No ram found for cluster '%s' node '%s'", costDatum.ClusterID, costDatum.NodeName)
-					} else {
-						blendedRAMDiscount = (reservedRAMGB * reservedRAMDiscount) + (nonReservedRAM*discount)/nodeRAMGB
-					}
-				}
-			}
-		}
-	}
-
-	return blendedCPUDiscount, blendedRAMDiscount
-}
-
-func parseVectorPricing(cfg *models.CustomPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr string) (float64, float64, float64, float64, bool) {
-	usesCustom := false
-	cpuCost, err := strconv.ParseFloat(cpuCostStr, 64)
-	if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) || cpuCost == 0 {
-		cpuCost, err = strconv.ParseFloat(cfg.CPU, 64)
-		usesCustom = true
-		if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
-			cpuCost = 0
-		}
-	}
-	ramCost, err := strconv.ParseFloat(ramCostStr, 64)
-	if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) || ramCost == 0 {
-		ramCost, err = strconv.ParseFloat(cfg.RAM, 64)
-		usesCustom = true
-		if err != nil || math.IsNaN(ramCost) || math.IsInf(ramCost, 0) {
-			ramCost = 0
-		}
-	}
-	gpuCost, err := strconv.ParseFloat(gpuCostStr, 64)
-	if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
-		gpuCost, err = strconv.ParseFloat(cfg.GPU, 64)
-		if err != nil || math.IsNaN(gpuCost) || math.IsInf(gpuCost, 0) {
-			gpuCost = 0
-		}
-	}
-	pvCost, err := strconv.ParseFloat(pvCostStr, 64)
-	if err != nil || math.IsNaN(cpuCost) || math.IsInf(cpuCost, 0) {
-		pvCost, err = strconv.ParseFloat(cfg.Storage, 64)
-		if err != nil || math.IsNaN(pvCost) || math.IsInf(pvCost, 0) {
-			pvCost = 0
-		}
-	}
-	return cpuCost, ramCost, gpuCost, pvCost, usesCustom
-}
-
-func getPriceVectors(cp models.Provider, costDatum *CostData, discount float64, customDiscount float64, idleCoefficient float64) ([]*util.Vector, []*util.Vector, []*util.Vector, [][]*util.Vector, []*util.Vector) {
-
-	var cpuCost float64
-	var ramCost float64
-	var gpuCost float64
-	var pvCost float64
-	var usesCustom bool
-
-	// If custom pricing is enabled and can be retrieved, replace
-	// default cost values with custom values
-	customPricing, err := cp.GetConfig()
-	if err != nil {
-		log.Errorf("failed to load custom pricing: %s", err)
-	}
-	if provider.CustomPricesEnabled(cp) && err == nil {
-		var cpuCostStr string
-		var ramCostStr string
-		var gpuCostStr string
-		var pvCostStr string
-		if costDatum.NodeData.IsSpot() {
-			cpuCostStr = customPricing.SpotCPU
-			ramCostStr = customPricing.SpotRAM
-			gpuCostStr = customPricing.SpotGPU
-		} else {
-			cpuCostStr = customPricing.CPU
-			ramCostStr = customPricing.RAM
-			gpuCostStr = customPricing.GPU
-		}
-		pvCostStr = customPricing.Storage
-		cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
-	} else if costDatum.NodeData == nil && err == nil {
-		cpuCostStr := customPricing.CPU
-		ramCostStr := customPricing.RAM
-		gpuCostStr := customPricing.GPU
-		pvCostStr := customPricing.Storage
-		cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
-	} else {
-		cpuCostStr := costDatum.NodeData.VCPUCost
-		ramCostStr := costDatum.NodeData.RAMCost
-		gpuCostStr := costDatum.NodeData.GPUCost
-		pvCostStr := costDatum.NodeData.StorageCost
-		cpuCost, ramCost, gpuCost, pvCost, usesCustom = parseVectorPricing(customPricing, cpuCostStr, ramCostStr, gpuCostStr, pvCostStr)
-	}
-
-	if usesCustom {
-		log.DedupedWarningf(5, "No pricing data found for node `%s` , using custom pricing", costDatum.NodeName)
-	}
-
-	cpuDiscount, ramDiscount := getDiscounts(costDatum, cpuCost, ramCost, discount)
-
-	log.Debugf("Node Name: %s", costDatum.NodeName)
-	log.Debugf("Blended CPU Discount: %f", cpuDiscount)
-	log.Debugf("Blended RAM Discount: %f", ramDiscount)
-
-	// TODO should we try to apply the rate coefficient here or leave it as a totals-only metric?
-	rateCoeff := 1.0
-
-	if idleCoefficient == 0 {
-		idleCoefficient = 1.0
-	}
-
-	cpuv := make([]*util.Vector, 0, len(costDatum.CPUAllocation))
-	for _, val := range costDatum.CPUAllocation {
-		cpuv = append(cpuv, &util.Vector{
-			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     (val.Value * cpuCost * (1 - cpuDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
-		})
-	}
-
-	ramv := make([]*util.Vector, 0, len(costDatum.RAMAllocation))
-	for _, val := range costDatum.RAMAllocation {
-		ramv = append(ramv, &util.Vector{
-			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - ramDiscount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
-		})
-	}
-
-	gpuv := make([]*util.Vector, 0, len(costDatum.GPUReq))
-	for _, val := range costDatum.GPUReq {
-		gpuv = append(gpuv, &util.Vector{
-			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     (val.Value * gpuCost * (1 - discount) * (1 - customDiscount) / idleCoefficient) * rateCoeff,
-		})
-	}
-
-	pvvs := make([][]*util.Vector, 0, len(costDatum.PVCData))
-	for _, pvcData := range costDatum.PVCData {
-		pvv := make([]*util.Vector, 0, len(pvcData.Values))
-		if pvcData.Volume != nil {
-			cost, _ := strconv.ParseFloat(pvcData.Volume.Cost, 64)
-
-			// override with custom pricing if enabled
-			if provider.CustomPricesEnabled(cp) {
-				cost = pvCost
-			}
-
-			for _, val := range pvcData.Values {
-				pvv = append(pvv, &util.Vector{
-					Timestamp: math.Round(val.Timestamp/10) * 10,
-					Value:     ((val.Value / 1024 / 1024 / 1024) * cost * (1 - customDiscount) / idleCoefficient) * rateCoeff,
-				})
-			}
-			pvvs = append(pvvs, pvv)
-		}
-	}
-
-	netv := make([]*util.Vector, 0, len(costDatum.NetworkData))
-	for _, val := range costDatum.NetworkData {
-		netv = append(netv, &util.Vector{
-			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value,
-		})
-	}
-
-	return cpuv, ramv, gpuv, pvvs, netv
-}
-
-func totalVectors(vectors []*util.Vector) float64 {
-	total := 0.0
-	for _, vector := range vectors {
-		total += vector.Value
-	}
-	return total
-}
-
-// EmptyDataError describes an error caused by empty cost data for some
-// defined interval
-type EmptyDataError struct {
-	err    error
-	window opencost.Window
-}
-
-// Error implements the error interface
-func (ede *EmptyDataError) Error() string {
-	err := fmt.Sprintf("empty data for range: %s", ede.window)
-	if ede.err != nil {
-		err += fmt.Sprintf(": %s", ede.err)
-	}
-	return err
-}
-
-// ScaleHourlyCostData converts per-hour cost data to per-resolution data. If the target resolution is higher (i.e. < 1.0h)
-// then we can do simple multiplication by the fraction-of-an-hour and retain accuracy. If the target resolution is
-// lower (i.e. > 1.0h) then we sum groups of hourly data by resolution to maintain fidelity.
-// e.g. (100 hours of per-hour hourly data, resolutionHours=10) => 10 data points, grouped and summed by 10-hour window
-// e.g. (20 minutes of per-minute hourly data, resolutionHours=1/60) => 20 data points, scaled down by a factor of 60
-func ScaleHourlyCostData(data map[string]*CostData, resolutionHours float64) map[string]*CostData {
-	scaled := map[string]*CostData{}
-
-	for key, datum := range data {
-		datum.RAMReq = scaleVectorSeries(datum.RAMReq, resolutionHours)
-		datum.RAMUsed = scaleVectorSeries(datum.RAMUsed, resolutionHours)
-		datum.RAMAllocation = scaleVectorSeries(datum.RAMAllocation, resolutionHours)
-		datum.CPUReq = scaleVectorSeries(datum.CPUReq, resolutionHours)
-		datum.CPUUsed = scaleVectorSeries(datum.CPUUsed, resolutionHours)
-		datum.CPUAllocation = scaleVectorSeries(datum.CPUAllocation, resolutionHours)
-		datum.GPUReq = scaleVectorSeries(datum.GPUReq, resolutionHours)
-		datum.NetworkData = scaleVectorSeries(datum.NetworkData, resolutionHours)
-
-		for _, pvcDatum := range datum.PVCData {
-			pvcDatum.Values = scaleVectorSeries(pvcDatum.Values, resolutionHours)
-		}
-
-		scaled[key] = datum
-	}
-
-	return scaled
-}
-
-func scaleVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
-	// if scaling to a lower resolution, compress the hourly data for maximum accuracy
-	if resolutionHours > 1.0 {
-		return compressVectorSeries(vs, resolutionHours)
-	}
-
-	// if scaling to a higher resolution, simply scale each value down by the fraction of an hour
-	for _, v := range vs {
-		v.Value *= resolutionHours
-	}
-	return vs
-}
-
-func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Vector {
-	if len(vs) == 0 {
-		return vs
-	}
-
-	compressed := []*util.Vector{}
-
-	threshold := float64(60 * 60 * resolutionHours)
-	var acc *util.Vector
-
-	for i, v := range vs {
-		if acc == nil {
-			// start a new accumulation from current datum
-			acc = &util.Vector{
-				Value:     vs[i].Value,
-				Timestamp: vs[i].Timestamp,
-			}
-			continue
-		}
-		if v.Timestamp-acc.Timestamp < threshold {
-			// v should be accumulated in current datum
-			acc.Value += v.Value
-		} else {
-			// v falls outside current datum's threshold; append and start a new one
-			compressed = append(compressed, acc)
-			acc = &util.Vector{
-				Value:     vs[i].Value,
-				Timestamp: vs[i].Timestamp,
-			}
-		}
-	}
-	// append any remaining, incomplete accumulation
-	if acc != nil {
-		compressed = append(compressed, acc)
-	}
-
-	return compressed
-}
-
-// ScaleAggregationTimeSeries reverses the scaling done by ScaleHourlyCostData, returning
-// the aggregation's time series to hourly data.
-func ScaleAggregationTimeSeries(aggregation *Aggregation, resolutionHours float64) {
-	for _, v := range aggregation.CPUCostVector {
-		v.Value /= resolutionHours
-	}
-
-	for _, v := range aggregation.GPUCostVector {
-		v.Value /= resolutionHours
-	}
-
-	for _, v := range aggregation.RAMCostVector {
-		v.Value /= resolutionHours
-	}
-
-	for _, v := range aggregation.PVCostVector {
-		v.Value /= resolutionHours
-	}
-
-	for _, v := range aggregation.NetworkCostVector {
-		v.Value /= resolutionHours
-	}
-
-	for _, v := range aggregation.TotalCostVector {
-		v.Value /= resolutionHours
-	}
-}
-
-// String returns a string representation of the encapsulated shared resources, which
-// can be used to uniquely identify a set of shared resources. Sorting sets of shared
-// resources ensures that strings representing permutations of the same combination match.
-func (s *SharedResourceInfo) String() string {
-	if s == nil {
-		return ""
-	}
-
-	nss := []string{}
-	for ns := range s.SharedNamespace {
-		nss = append(nss, ns)
-	}
-	sort.Strings(nss)
-	nsStr := strings.Join(nss, ",")
-
-	labels := []string{}
-	for lbl, vals := range s.LabelSelectors {
-		for val := range vals {
-			if lbl != "" && val != "" {
-				labels = append(labels, fmt.Sprintf("%s=%s", lbl, val))
-			}
-		}
-	}
-	sort.Strings(labels)
-	labelStr := strings.Join(labels, ",")
-
-	return fmt.Sprintf("%s:%s", nsStr, labelStr)
-}
-
 // ParseAggregationProperties attempts to parse and return aggregation properties
 // encoded under the given key. If none exist, or if parsing fails, an error
 // is returned with empty AllocationProperties.

+ 0 - 189
pkg/costmodel/aggregation_test.go

@@ -4,197 +4,8 @@ import (
 	"testing"
 
 	"github.com/opencost/opencost/core/pkg/opencost"
-	"github.com/opencost/opencost/core/pkg/util"
 )
 
-func TestScaleHourlyCostData(t *testing.T) {
-	costData := map[string]*CostData{}
-
-	start := 1570000000
-	oneHour := 60 * 60
-
-	generateVectorSeries := func(start, count, _ int, value float64) []*util.Vector {
-		vs := []*util.Vector{}
-		for i := 0; i < count; i++ {
-			v := &util.Vector{
-				Timestamp: float64(start + (i * oneHour)),
-				Value:     value,
-			}
-			vs = append(vs, v)
-		}
-		return vs
-	}
-
-	costData["default"] = &CostData{
-		RAMReq:        generateVectorSeries(start, 100, oneHour, 1.0),
-		RAMUsed:       generateVectorSeries(start, 0, oneHour, 1.0),
-		RAMAllocation: generateVectorSeries(start, 100, oneHour, 107.226),
-		CPUReq:        generateVectorSeries(start, 100, oneHour, 0.00317),
-		CPUUsed:       generateVectorSeries(start, 95, oneHour, 1.0),
-		CPUAllocation: generateVectorSeries(start, 2, oneHour, 123.456),
-		PVCData: []*PersistentVolumeClaimData{
-			{Values: generateVectorSeries(start, 100, oneHour, 1.34)},
-		},
-	}
-
-	compressedData := ScaleHourlyCostData(costData, 10)
-
-	act, ok := compressedData["default"]
-	if !ok {
-		t.Errorf("compressed data should have key \"default\"")
-	}
-
-	// RAMReq
-	if len(act.RAMReq) != 10 {
-		t.Errorf("expected RAMReq to have length %d, was actually %d", 10, len(act.RAMReq))
-	}
-	for _, val := range act.RAMReq {
-		if val.Value != 10.0 {
-			t.Errorf("expected each RAMReq Vector to have Value %f, was actually %f", 10.0, val.Value)
-		}
-	}
-
-	// RAMUsed
-	if len(act.RAMUsed) != 0 {
-		t.Errorf("expected RAMUsed to have length %d, was actually %d", 0, len(act.RAMUsed))
-	}
-
-	// RAMAllocation
-	if len(act.RAMAllocation) != 10 {
-		t.Errorf("expected RAMAllocation to have length %d, was actually %d", 10, len(act.RAMAllocation))
-	}
-	for _, val := range act.RAMAllocation {
-		if val.Value != 1072.26 {
-			t.Errorf("expected each RAMAllocation Vector to have Value %f, was actually %f", 1072.26, val.Value)
-		}
-	}
-
-	// CPUReq
-	if len(act.CPUReq) != 10 {
-		t.Errorf("expected CPUReq to have length %d, was actually %d", 10, len(act.CPUReq))
-	}
-	for _, val := range act.CPUReq {
-		if val.Value != 0.0317 {
-			t.Errorf("expected each CPUReq Vector to have Value %f, was actually %f", 0.0317, val.Value)
-		}
-	}
-
-	// CPUUsed
-	if len(act.CPUUsed) != 10 {
-		t.Errorf("expected CPUUsed to have length %d, was actually %d", 10, len(act.CPUUsed))
-	}
-	for _, val := range act.CPUUsed[:len(act.CPUUsed)-1] {
-		if val.Value != 10.0 {
-			t.Errorf("expected each CPUUsed Vector to have Value %f, was actually %f", 10.0, val.Value)
-		}
-	}
-	if act.CPUUsed[len(act.CPUUsed)-1].Value != 5.0 {
-		t.Errorf("expected each CPUUsed Vector to have Value %f, was actually %f", 5.0, act.CPUUsed[len(act.CPUUsed)-1].Value)
-	}
-
-	// CPUAllocation
-	if len(act.CPUAllocation) != 1 {
-		t.Errorf("expected CPUAllocation to have length %d, was actually %d", 1, len(act.CPUAllocation))
-	}
-	if act.CPUAllocation[0].Value != 246.912 {
-		t.Errorf("expected each CPUAllocation Vector to have Value %f, was actually %f", 246.912, act.CPUAllocation[len(act.CPUAllocation)-1].Value)
-	}
-
-	// PVCData
-	if len(act.PVCData[0].Values) != 10 {
-		t.Errorf("expected PVCData[0] to have length %d, was actually %d", 10, len(act.PVCData[0].Values))
-	}
-	for _, val := range act.PVCData[0].Values {
-		if val.Value != 13.4 {
-			t.Errorf("expected each PVCData[0] Vector to have Value %f, was actually %f", 13.4, val.Value)
-		}
-	}
-
-	costData["default"] = &CostData{
-		RAMReq:        generateVectorSeries(start, 100, oneHour, 1.0),
-		RAMUsed:       generateVectorSeries(start, 0, oneHour, 1.0),
-		RAMAllocation: generateVectorSeries(start, 100, oneHour, 107.226),
-		CPUReq:        generateVectorSeries(start, 100, oneHour, 0.00317),
-		CPUUsed:       generateVectorSeries(start, 95, oneHour, 1.0),
-		CPUAllocation: generateVectorSeries(start, 2, oneHour, 124.6),
-		PVCData: []*PersistentVolumeClaimData{
-			{Values: generateVectorSeries(start, 100, oneHour, 1.34)},
-		},
-	}
-
-	scaledData := ScaleHourlyCostData(costData, 0.1)
-
-	act, ok = scaledData["default"]
-	if !ok {
-		t.Errorf("scaled data should have key \"default\"")
-	}
-
-	// RAMReq
-	if len(act.RAMReq) != 100 {
-		t.Errorf("expected RAMReq to have length %d, was actually %d", 100, len(act.RAMReq))
-	}
-	for _, val := range act.RAMReq {
-		if val.Value != 0.1 {
-			t.Errorf("expected each RAMReq Vector to have Value %f, was actually %f", 0.1, val.Value)
-		}
-	}
-
-	// RAMUsed
-	if len(act.RAMUsed) != 0 {
-		t.Errorf("expected RAMUsed to have length %d, was actually %d", 0, len(act.RAMUsed))
-	}
-
-	// RAMAllocation
-	if len(act.RAMAllocation) != 100 {
-		t.Errorf("expected RAMAllocation to have length %d, was actually %d", 100, len(act.RAMAllocation))
-	}
-	for _, val := range act.RAMAllocation {
-		if val.Value != 10.7226 {
-			t.Errorf("expected each RAMAllocation Vector to have Value %f, was actually %f", 10.7226, val.Value)
-		}
-	}
-
-	// CPUReq
-	if len(act.CPUReq) != 100 {
-		t.Errorf("expected CPUReq to have length %d, was actually %d", 100, len(act.CPUReq))
-	}
-	for _, val := range act.CPUReq {
-		if val.Value != 0.000317 {
-			t.Errorf("expected each CPUReq Vector to have Value %f, was actually %f", 0.000317, val.Value)
-		}
-	}
-
-	// CPUUsed
-	if len(act.CPUUsed) != 95 {
-		t.Errorf("expected CPUUsed to have length %d, was actually %d", 95, len(act.CPUUsed))
-	}
-	for _, val := range act.CPUUsed {
-		if val.Value != 0.1 {
-			t.Errorf("expected each CPUUsed Vector to have Value %f, was actually %f", 0.1, val.Value)
-		}
-	}
-
-	// CPUAllocation
-	if len(act.CPUAllocation) != 2 {
-		t.Errorf("expected CPUAllocation to have length %d, was actually %d", 2, len(act.CPUAllocation))
-	}
-	for _, val := range act.CPUAllocation {
-		if val.Value != 12.46 {
-			t.Errorf("expected each CPUAllocation Vector to have Value %f, was actually %f", 12.46, val.Value)
-		}
-	}
-
-	// PVCData
-	if len(act.PVCData[0].Values) != 100 {
-		t.Errorf("expected PVCData[0] to have length %d, was actually %d", 100, len(act.PVCData[0].Values))
-	}
-	for _, val := range act.PVCData[0].Values {
-		if val.Value != .134 {
-			t.Errorf("expected each PVCData[0] Vector to have Value %f, was actually %f", .134, val.Value)
-		}
-	}
-}
-
 func TestParseAggregationProperties_Default(t *testing.T) {
 	got, err := ParseAggregationProperties([]string{})
 	expected := []string{

+ 2 - 2
pkg/costmodel/allocation.go

@@ -358,8 +358,8 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 
 	resChJobLabels := grp.With(ds.QueryJobLabels(start, end))
 
-	resChLBCostPerHr := grp.With(ds.QueryLBCostPerHr(start, end))
-	resChLBActiveMins := grp.With(ds.QueryLBActiveMins(start, end))
+	resChLBCostPerHr := grp.With(ds.QueryLBPricePerHr(start, end))
+	resChLBActiveMins := grp.With(ds.QueryLBActiveMinutes(start, end))
 
 	resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
 	resCPURequests, _ := resChCPURequests.Await()

+ 17 - 1
pkg/costmodel/assets.go

@@ -14,7 +14,6 @@ func (cm *CostModel) ComputeAssets(start, end time.Time) (*opencost.AssetSet, er
 	nodeMap, err := cm.ClusterNodes(start, end)
 	if err != nil {
 		return nil, fmt.Errorf("error computing node assets for %s: %w", opencost.NewClosedWindow(start, end), err)
-
 	}
 
 	lbMap, err := cm.ClusterLoadBalancers(start, end)
@@ -27,6 +26,11 @@ func (cm *CostModel) ComputeAssets(start, end time.Time) (*opencost.AssetSet, er
 		return nil, fmt.Errorf("error computing disk assets for %s: %w", opencost.NewClosedWindow(start, end), err)
 	}
 
+	clusterManagement, err := cm.ClusterManagement(start, end)
+	if err != nil {
+		return nil, fmt.Errorf("error computing cluster management assets for %s: %w", opencost.NewClosedWindow(start, end), err)
+	}
+
 	for _, d := range diskMap {
 		s := d.Start
 		if s.Before(start) || s.After(end) {
@@ -90,6 +94,14 @@ func (cm *CostModel) ComputeAssets(start, end time.Time) (*opencost.AssetSet, er
 		assetSet.Insert(loadBalancer, nil)
 	}
 
+	for _, cman := range clusterManagement {
+		cmAsset := opencost.NewClusterManagement(cman.Provisioner, cman.Cluster, opencost.NewClosedWindow(start, end)) //src.CostModel.PropertiesFromCluster(cm.Properties)
+		cm.PropertiesFromCluster(cmAsset.Properties)
+		cmAsset.Cost = cman.Cost
+
+		assetSet.Insert(cmAsset, nil)
+	}
+
 	for _, n := range nodeMap {
 		// check label, to see if node from fargate, if so ignore.
 		if n.Labels != nil {
@@ -167,6 +179,10 @@ func (cm *CostModel) ClusterNodes(start, end time.Time) (map[NodeIdentifier]*Nod
 	return ClusterNodes(cm.DataSource, cm.Provider, start, end)
 }
 
+func (cm *CostModel) ClusterManagement(start, end time.Time) (map[ClusterManagementIdentifier]*ClusterManagementCost, error) {
+	return ClusterManagement(cm.DataSource, start, end)
+}
+
 // propertiesFromCluster populates static cluster properties to individual asset properties
 func (cm *CostModel) PropertiesFromCluster(props *opencost.AssetProperties) {
 	// If properties does not have cluster value, do nothing

+ 73 - 92
pkg/costmodel/cluster.go

@@ -533,6 +533,18 @@ type nodeIdentifierNoProviderID struct {
 	Name    string
 }
 
+type ClusterManagementIdentifier struct {
+	Cluster     string
+	Provisioner string
+}
+
+type ClusterManagementCost struct {
+	Cluster     string
+	Provisioner string
+
+	Cost float64
+}
+
 func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64, resourceCountMap map[nodeIdentifierNoProviderID]float64) {
 	for k, v := range activeDataMap {
 		keyNon := nodeIdentifierNoProviderID{
@@ -550,7 +562,7 @@ func costTimesMinuteAndCount(activeDataMap map[NodeIdentifier]activeData, costMa
 	}
 }
 
-func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[NodeIdentifier]float64) {
+func costTimesMinute[T comparable](activeDataMap map[T]activeData, costMap map[T]float64) {
 	for k, v := range activeDataMap {
 		if cost, ok := costMap[k]; ok {
 			minutes := v.minutes
@@ -611,7 +623,7 @@ func ClusterNodes(dataSource source.OpenCostDataSource, cp models.Provider, star
 		return nil, requiredGrp.Error()
 	}
 
-	activeDataMap := buildActiveDataMap(resActiveMins, resolution, opencost.NewClosedWindow(start, end))
+	activeDataMap := buildActiveDataMap(resActiveMins, nodeKeyGen, resolution, opencost.NewClosedWindow(start, end))
 
 	gpuCountMap := buildGPUCountMap(resNodeGPUCount)
 	preemptibleMap := buildPreemptibleMap(resIsSpot)
@@ -684,6 +696,7 @@ type LoadBalancerIdentifier struct {
 	Cluster   string
 	Namespace string
 	Name      string
+	IngressIP string
 }
 
 type LoadBalancer struct {
@@ -704,7 +717,7 @@ func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.
 
 	grp := source.NewQueryGroup()
 
-	resChLBCost := grp.With(dataSource.QueryLBCost(start, end))
+	resChLBCost := grp.With(dataSource.QueryLBPricePerHr(start, end))
 	resChActiveMins := grp.With(dataSource.QueryLBActiveMinutes(start, end))
 
 	resLBCost, _ := resChLBCost.Await()
@@ -715,118 +728,86 @@ func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.
 	}
 
 	loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
+	activeMap := buildActiveDataMap(resActiveMins, loadBalancerKeyGen, resolution, opencost.NewClosedWindow(start, end))
 
-	for _, result := range resActiveMins {
-		cluster, err := result.GetCluster()
-		if err != nil {
-			cluster = env.GetClusterID()
-		}
-		namespace, err := result.GetNamespace()
-		if err != nil {
-			log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
-			continue
-		}
-		name, err := result.GetString("service_name")
-		if err != nil {
-			log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
+	for _, result := range resLBCost {
+		key, ok := loadBalancerKeyGen(result)
+		if !ok {
 			continue
 		}
-		providerID, err := result.GetString("ingress_ip")
-		if err != nil {
-			log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
-			providerID = ""
-		}
 
-		key := LoadBalancerIdentifier{
-			Cluster:   cluster,
-			Namespace: namespace,
-			Name:      name,
-		}
+		lbPricePerHr := result.Values[0].Value
 
-		// Skip if there are no data
-		if len(result.Values) == 0 {
-			continue
+		lb := &LoadBalancer{
+			Cluster:   key.Cluster,
+			Namespace: key.Namespace,
+			Name:      key.Name,
+			Cost:      lbPricePerHr, // default to hourly cost, overwrite if active entry exists
+			Ip:        key.IngressIP,
+			Private:   privateIPCheck(key.IngressIP),
 		}
 
-		// Add load balancer to the set of load balancers
-		if _, ok := loadBalancerMap[key]; !ok {
-			loadBalancerMap[key] = &LoadBalancer{
-				Cluster:    cluster,
-				Namespace:  namespace,
-				Name:       fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
-				ProviderID: provider.ParseLBID(providerID),
+		if active, ok := activeMap[key]; ok {
+			lb.Start = active.start
+			lb.End = active.end
+			lb.Minutes = active.minutes
+
+			if active.minutes > 0 {
+				lb.Cost = lbPricePerHr * (active.minutes / 60.0)
+			} else {
+				log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
 			}
 		}
 
-		// Append start, end, and minutes. This should come before all other data.
-		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
-		loadBalancerMap[key].Start = s
-		loadBalancerMap[key].End = e
-		loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
+		loadBalancerMap[key] = lb
+	}
 
-		// Fill in Provider ID if it is available and missing in the loadBalancerMap
-		// Prevents there from being a duplicate LoadBalancers on the same day
-		if providerID != "" && loadBalancerMap[key].ProviderID == "" {
-			loadBalancerMap[key].ProviderID = providerID
-		}
+	return loadBalancerMap, nil
+}
+
+func ClusterManagement(dataSource source.OpenCostDataSource, start, end time.Time) (map[ClusterManagementIdentifier]*ClusterManagementCost, error) {
+	resolution := env.GetETLResolution()
+
+	grp := source.NewQueryGroup()
+
+	resChCMPrice := grp.With(dataSource.QueryClusterManagementPricePerHr(start, end))
+	resChCMDur := grp.With(dataSource.QueryClusterManagementDuration(start, end))
+
+	resCMPrice, _ := resChCMPrice.Await()
+	resCMDur, _ := resChCMDur.Await()
+
+	if grp.HasErrors() {
+		return nil, grp.Error()
 	}
 
-	for _, result := range resLBCost {
-		cluster, err := result.GetCluster()
-		if err != nil {
-			cluster = env.GetClusterID()
-		}
-		namespace, err := result.GetNamespace()
-		if err != nil {
-			log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
-			continue
-		}
-		name, err := result.GetString("service_name")
-		if err != nil {
-			log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
-			continue
-		}
+	clusterManagementPriceMap := make(map[ClusterManagementIdentifier]*ClusterManagementCost, len(resCMDur))
+	activeMap := buildActiveDataMap(resCMDur, clusterManagementKeyGen, resolution, opencost.NewClosedWindow(start, end))
 
-		providerID, err := result.GetString("ingress_ip")
-		if err != nil {
-			log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
-			// only update asset cost when an actual IP was returned
+	for _, result := range resCMPrice {
+		key, ok := clusterManagementKeyGen(result)
+		if !ok {
 			continue
 		}
-		key := LoadBalancerIdentifier{
-			Cluster:   cluster,
-			Namespace: namespace,
-			Name:      name,
-		}
 
-		// Apply cost as price-per-hour * hours
-		if lb, ok := loadBalancerMap[key]; ok {
-			lbPricePerHr := result.Values[0].Value
-
-			// interpolate any missing data
-			resultMins := lb.Minutes
-			if resultMins > 0 {
-				scaleFactor := (resultMins + resolution.Minutes()) / resultMins
+		cmPricePerHr := result.Values[0].Value
+		cm := &ClusterManagementCost{
+			Cluster:     key.Cluster,
+			Provisioner: key.Provisioner,
+			Cost:        cmPricePerHr, // default to hourly cost, overwrite if active entry exists
+		}
 
-				hrs := (lb.Minutes * scaleFactor) / 60.0
-				lb.Cost += lbPricePerHr * hrs
+		if active, ok := activeMap[key]; ok {
+			if active.minutes > 0 {
+				cm.Cost = cmPricePerHr * (active.minutes / 60.0)
 			} else {
-				log.DedupedWarningf(20, "ClusterLoadBalancers: found zero minutes for key: %v", key)
-			}
-
-			if lb.Ip != "" && lb.Ip != providerID {
-				log.DedupedWarningf(5, "ClusterLoadBalancers: multiple IPs per load balancer not supported, using most recent IP")
+				log.DedupedWarningf(20, "ClusterManagement: found zero minutes for key: %v", key)
 			}
-			lb.Ip = providerID
-
-			lb.Private = privateIPCheck(providerID)
-		} else {
-			log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %v", key)
 		}
+
+		clusterManagementPriceMap[key] = cm
 	}
 
-	return loadBalancerMap, nil
+	return clusterManagementPriceMap, nil
 }
 
 // Check if an ip is private.

+ 76 - 59
pkg/costmodel/cluster_helpers.go

@@ -34,11 +34,7 @@ func buildCPUCostMap(
 	resNodeCPUCost []*source.QueryResult,
 	cp models.Provider,
 	preemptible map[NodeIdentifier]bool,
-) (
-	map[NodeIdentifier]float64,
-	map[nodeIdentifierNoProviderID]string,
-) {
-
+) (map[NodeIdentifier]float64, map[nodeIdentifierNoProviderID]string) {
 	cpuCostMap := make(map[NodeIdentifier]float64)
 	clusterAndNameToType := make(map[nodeIdentifierNoProviderID]string)
 
@@ -108,11 +104,7 @@ func buildRAMCostMap(
 	resNodeRAMCost []*source.QueryResult,
 	cp models.Provider,
 	preemptible map[NodeIdentifier]bool,
-) (
-	map[NodeIdentifier]float64,
-	map[nodeIdentifierNoProviderID]string,
-) {
-
+) (map[NodeIdentifier]float64, map[nodeIdentifierNoProviderID]string) {
 	ramCostMap := make(map[NodeIdentifier]float64)
 	clusterAndNameToType := make(map[nodeIdentifierNoProviderID]string)
 
@@ -183,10 +175,7 @@ func buildGPUCostMap(
 	gpuCountMap map[NodeIdentifier]float64,
 	cp models.Provider,
 	preemptible map[NodeIdentifier]bool,
-) (
-	map[NodeIdentifier]float64,
-	map[nodeIdentifierNoProviderID]string,
-) {
+) (map[NodeIdentifier]float64, map[nodeIdentifierNoProviderID]string) {
 
 	gpuCostMap := make(map[NodeIdentifier]float64)
 	clusterAndNameToType := make(map[nodeIdentifierNoProviderID]string)
@@ -259,10 +248,7 @@ func buildGPUCostMap(
 	return gpuCostMap, clusterAndNameToType
 }
 
-func buildGPUCountMap(
-	resNodeGPUCount []*source.QueryResult,
-) map[NodeIdentifier]float64 {
-
+func buildGPUCountMap(resNodeGPUCount []*source.QueryResult) map[NodeIdentifier]float64 {
 	gpuCountMap := make(map[NodeIdentifier]float64)
 
 	for _, result := range resNodeGPUCount {
@@ -291,10 +277,7 @@ func buildGPUCountMap(
 	return gpuCountMap
 }
 
-func buildCPUCoresMap(
-	resNodeCPUCores []*source.QueryResult,
-) map[nodeIdentifierNoProviderID]float64 {
-
+func buildCPUCoresMap(resNodeCPUCores []*source.QueryResult) map[nodeIdentifierNoProviderID]float64 {
 	m := make(map[nodeIdentifierNoProviderID]float64)
 
 	for _, result := range resNodeCPUCores {
@@ -322,7 +305,6 @@ func buildCPUCoresMap(
 }
 
 func buildRAMBytesMap(resNodeRAMBytes []*source.QueryResult) map[nodeIdentifierNoProviderID]float64 {
-
 	m := make(map[nodeIdentifierNoProviderID]float64)
 
 	for _, result := range resNodeRAMBytes {
@@ -351,7 +333,6 @@ func buildRAMBytesMap(resNodeRAMBytes []*source.QueryResult) map[nodeIdentifierN
 
 // Mapping of cluster/node=cpu for computing resource efficiency
 func buildCPUBreakdownMap(resNodeCPUModeTotal []*source.QueryResult) map[nodeIdentifierNoProviderID]*ClusterCostsBreakdown {
-
 	cpuBreakdownMap := make(map[nodeIdentifierNoProviderID]*ClusterCostsBreakdown)
 
 	// Mapping of cluster/node=cpu for computing resource efficiency
@@ -528,38 +509,87 @@ type activeData struct {
 	minutes float64
 }
 
-func buildActiveDataMap(resActiveMins []*source.QueryResult, resolution time.Duration, window opencost.Window) map[NodeIdentifier]activeData {
+// cluster management key gen
+func clusterManagementKeyGen(result *source.QueryResult) (ClusterManagementIdentifier, bool) {
+	cluster, err := result.GetCluster()
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
 
-	m := make(map[NodeIdentifier]activeData)
+	provisionerName, _ := result.GetString("provisioner_name")
+	return ClusterManagementIdentifier{
+		Cluster:     cluster,
+		Provisioner: provisionerName,
+	}, true
+}
 
-	for _, result := range resActiveMins {
-		cluster, err := result.GetCluster()
-		if err != nil {
-			cluster = env.GetClusterID()
-		}
+// node key gen
+func nodeKeyGen(result *source.QueryResult) (NodeIdentifier, bool) {
+	cluster, err := result.GetCluster()
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
 
-		name, err := result.GetNode()
-		if err != nil {
-			log.Warnf("ClusterNodes: active mins missing node")
-			continue
-		}
+	name, err := result.GetNode()
+	if err != nil {
+		log.Warnf("ClusterNodes: active mins missing node")
+		return NodeIdentifier{}, false
+	}
 
-		providerID, _ := result.GetProviderID()
+	providerID, _ := result.GetProviderID()
 
-		key := NodeIdentifier{
-			Cluster:    cluster,
-			Name:       name,
-			ProviderID: provider.ParseID(providerID),
-		}
+	return NodeIdentifier{
+		Cluster:    cluster,
+		Name:       name,
+		ProviderID: provider.ParseID(providerID),
+	}, true
+}
+
+func loadBalancerKeyGen(result *source.QueryResult) (LoadBalancerIdentifier, bool) {
+	cluster, err := result.GetCluster()
+	if err != nil {
+		cluster = env.GetClusterID()
+	}
+
+	namespace, err := result.GetNamespace()
+	if err != nil {
+		log.Warnf("ClusterLoadBalancers: LB cost data missing namespace")
+		return LoadBalancerIdentifier{}, false
+	}
+
+	name, err := result.GetString("service_name")
+	if err != nil {
+		log.Warnf("ClusterLoadBalancers: LB cost data missing service_name")
+		return LoadBalancerIdentifier{}, false
+	}
+
+	ingressIp, err := result.GetString("ingress_ip")
+	if err != nil {
+		log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
+		// only update asset cost when an actual IP was returned
+		return LoadBalancerIdentifier{}, false
+	}
+
+	return LoadBalancerIdentifier{
+		Cluster:   cluster,
+		Namespace: namespace,
+		Name:      name,
+		IngressIP: ingressIp,
+	}, true
+}
 
-		if len(result.Values) == 0 {
+func buildActiveDataMap[T comparable](results []*source.QueryResult, keyGen func(*source.QueryResult) (T, bool), resolution time.Duration, window opencost.Window) map[T]activeData {
+	m := make(map[T]activeData)
+
+	for _, result := range results {
+		key, ok := keyGen(result)
+		if !ok || len(result.Values) == 0 {
 			continue
 		}
 
 		s, e := calculateStartAndEnd(result, resolution, window)
 		mins := e.Sub(s).Minutes()
 
-		// TODO niko/assets if mins >= threshold, interpolate for missing data?
 		m[key] = activeData{
 			start:   s,
 			end:     e,
@@ -579,27 +609,14 @@ func buildPreemptibleMap(
 	m := make(map[NodeIdentifier]bool)
 
 	for _, result := range resIsSpot {
-		nodeName, err := result.GetNode()
-		if err != nil {
+		key, ok := nodeKeyGen(result)
+		if !ok {
 			continue
 		}
 
 		// GCP preemptible label
 		pre := result.Values[0].Value
 
-		cluster, err := result.GetCluster()
-		if err != nil {
-			cluster = env.GetClusterID()
-		}
-
-		providerID, _ := result.GetProviderID()
-
-		key := NodeIdentifier{
-			Cluster:    cluster,
-			Name:       nodeName,
-			ProviderID: provider.ParseID(providerID),
-		}
-
 		// TODO(michaelmdresser): check this condition at merge time?
 		// if node, ok := nodeMap[key]; pre > 0.0 && ok {
 		// 	node.Preemptible = true