Parcourir la source

Merge pull request #186 from kubecost/nikovacevic-caching

Add rate data to aggregated cost model
Niko Kovacevic il y a 6 ans
Parent
commit
7c43e66e91

+ 84 - 41
costmodel/aggregations.go

@@ -12,26 +12,32 @@ import (
 )
 
 type Aggregation struct {
-	Aggregator         string    `json:"aggregation"`
-	AggregatorSubField string    `json:"aggregationSubfield"`
-	Environment        string    `json:"environment"`
-	Cluster            string    `json:"cluster"`
-	CPUAllocation      []*Vector `json:"-"`
-	CPUCostVector      []*Vector `json:"cpuCostVector,omitempty"`
-	RAMAllocation      []*Vector `json:"-"`
-	RAMCostVector      []*Vector `json:"ramCostVector,omitempty"`
-	PVCostVector       []*Vector `json:"pvCostVector,omitempty"`
-	GPUAllocation      []*Vector `json:"-"`
-	GPUCostVector      []*Vector `json:"gpuCostVector,omitempty"`
-	CPUCost            float64   `json:"cpuCost"`
-	RAMCost            float64   `json:"ramCost"`
-	GPUCost            float64   `json:"gpuCost"`
-	PVCost             float64   `json:"pvCost"`
-	NetworkCost        float64   `json:"networkCost"`
-	SharedCost         float64   `json:"sharedCost"`
-	TotalCost          float64   `json:"totalCost"`
+	Aggregator        string    `json:"aggregation"`
+	Subfields         []string  `json:"subfields"`
+	Environment       string    `json:"environment"`
+	Cluster           string    `json:"cluster"`
+	CPUAllocation     []*Vector `json:"-"`
+	CPUCostVector     []*Vector `json:"cpuCostVector,omitempty"`
+	RAMAllocation     []*Vector `json:"-"`
+	RAMCostVector     []*Vector `json:"ramCostVector,omitempty"`
+	PVCostVector      []*Vector `json:"pvCostVector,omitempty"`
+	GPUAllocation     []*Vector `json:"-"`
+	GPUCostVector     []*Vector `json:"gpuCostVector,omitempty"`
+	NetworkCostVector []*Vector `json:"networkCostVector,omitempty"`
+	CPUCost           float64   `json:"cpuCost"`
+	RAMCost           float64   `json:"ramCost"`
+	GPUCost           float64   `json:"gpuCost"`
+	PVCost            float64   `json:"pvCost"`
+	NetworkCost       float64   `json:"networkCost"`
+	SharedCost        float64   `json:"sharedCost"`
+	TotalCost         float64   `json:"totalCost"`
 }
 
+const (
+	hoursPerDay   = 24.0
+	hoursPerMonth = 730.0
+)
+
 type SharedResourceInfo struct {
 	ShareResources  bool
 	SharedNamespace map[string]bool
@@ -84,10 +90,11 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 	totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours() * (1 - discount)
 	totalContainerCost := 0.0
 	for _, costDatum := range costData {
-		cpuv, ramv, gpuv, pvvs := getPriceVectors(cp, costDatum, discount, 1)
+		cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, "", discount, 1)
 		totalContainerCost += totalVector(cpuv)
 		totalContainerCost += totalVector(ramv)
 		totalContainerCost += totalVector(gpuv)
+		totalContainerCost += totalVector(netv)
 		for _, pv := range pvvs {
 			totalContainerCost += totalVector(pv)
 		}
@@ -96,9 +103,9 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 	return (totalContainerCost / totalClusterCostOverWindow), nil
 }
 
-// AggregateCostModel reduces the dimensions of raw cost data by field and, optionally, by time. The field parameter determines the field
+// AggregateCostData reduces the dimensions of raw cost data by field and, optionally, by time. The field parameter determines the field
 // by which to group data, with an optional subfield, e.g. for groupings like field="label" and subfield="app" for grouping by "label.app".
-func AggregateCostModel(cp cloud.Provider, costData map[string]*CostData, field string, subfield string, timeSeries bool, discount float64, idleCoefficient float64, sr *SharedResourceInfo) map[string]*Aggregation {
+func AggregateCostData(cp cloud.Provider, costData map[string]*CostData, dataCount int64, field string, subfields []string, rate string, timeSeries bool, discount float64, idleCoefficient float64, sr *SharedResourceInfo) map[string]*Aggregation {
 	// aggregations collects key-value pairs of resource group-to-aggregated data
 	// e.g. namespace-to-data or label-value-to-data
 	aggregations := make(map[string]*Aggregation)
@@ -109,30 +116,34 @@ func AggregateCostModel(cp cloud.Provider, costData map[string]*CostData, field
 
 	for _, costDatum := range costData {
 		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
-			cpuv, ramv, gpuv, pvvs := getPriceVectors(cp, costDatum, discount, idleCoefficient)
+			cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
 			sharedResourceCost += totalVector(cpuv)
 			sharedResourceCost += totalVector(ramv)
 			sharedResourceCost += totalVector(gpuv)
+			sharedResourceCost += totalVector(netv)
 			for _, pv := range pvvs {
 				sharedResourceCost += totalVector(pv)
 			}
 		} else {
 			if field == "cluster" {
-				aggregateDatum(cp, aggregations, costDatum, field, subfield, costDatum.ClusterID, discount, idleCoefficient)
+				aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.ClusterID, discount, idleCoefficient)
 			} else if field == "namespace" {
-				aggregateDatum(cp, aggregations, costDatum, field, subfield, costDatum.Namespace, discount, idleCoefficient)
+				aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, idleCoefficient)
 			} else if field == "service" {
 				if len(costDatum.Services) > 0 {
-					aggregateDatum(cp, aggregations, costDatum, field, subfield, costDatum.Services[0], discount, idleCoefficient)
+					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Services[0], discount, idleCoefficient)
 				}
 			} else if field == "deployment" {
 				if len(costDatum.Deployments) > 0 {
-					aggregateDatum(cp, aggregations, costDatum, field, subfield, costDatum.Deployments[0], discount, idleCoefficient)
+					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Deployments[0], discount, idleCoefficient)
 				}
 			} else if field == "label" {
 				if costDatum.Labels != nil {
-					if subfieldName, ok := costDatum.Labels[subfield]; ok {
-						aggregateDatum(cp, aggregations, costDatum, field, subfield, subfieldName, discount, idleCoefficient)
+					for _, sf := range subfields {
+						if subfieldName, ok := costDatum.Labels[sf]; ok {
+							aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, subfieldName, discount, idleCoefficient)
+							break
+						}
 					}
 				}
 			}
@@ -144,50 +155,67 @@ func AggregateCostModel(cp cloud.Provider, costData map[string]*CostData, field
 		agg.RAMCost = totalVector(agg.RAMCostVector)
 		agg.GPUCost = totalVector(agg.GPUCostVector)
 		agg.PVCost = totalVector(agg.PVCostVector)
+		agg.NetworkCost = totalVector(agg.NetworkCostVector)
 		agg.SharedCost = sharedResourceCost / float64(len(aggregations))
-		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.SharedCost
+
+		if rate != "" {
+			klog.V(1).Infof("scaling '%s' costs to '%s' rate by %d", agg.Environment, rate, dataCount)
+
+			if dataCount > 0 {
+				agg.CPUCost /= float64(dataCount)
+				agg.RAMCost /= float64(dataCount)
+				agg.GPUCost /= float64(dataCount)
+				agg.PVCost /= float64(dataCount)
+				agg.NetworkCost /= float64(dataCount)
+				agg.SharedCost /= float64(dataCount)
+			}
+		}
+
+		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
 
 		// remove time series data if it is not explicitly requested
 		if !timeSeries {
 			agg.CPUCostVector = nil
 			agg.RAMCostVector = nil
-			agg.PVCostVector = nil
 			agg.GPUCostVector = nil
+			agg.PVCostVector = nil
+			agg.NetworkCostVector = nil
 		}
 	}
 
 	return aggregations
 }
 
-func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfield string, key string, discount float64, idleCoefficient float64) {
+func aggregateDatum(cp cloud.Provider, aggregations map[string]*Aggregation, costDatum *CostData, field string, subfields []string, rate string, key string, discount float64, idleCoefficient float64) {
 	// add new entry to aggregation results if a new
 	if _, ok := aggregations[key]; !ok {
 		agg := &Aggregation{}
 		agg.Aggregator = field
-		agg.AggregatorSubField = subfield
+		agg.Subfields = subfields
 		agg.Environment = key
 		agg.Cluster = costDatum.ClusterID
 		aggregations[key] = agg
 	}
 
-	mergeVectors(cp, costDatum, aggregations[key], discount, idleCoefficient)
+	mergeVectors(cp, costDatum, aggregations[key], rate, discount, idleCoefficient)
 }
 
-func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, discount float64, idleCoefficient float64) {
+func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregation, rate string, discount float64, idleCoefficient float64) {
 	aggregation.CPUAllocation = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocation)
 	aggregation.RAMAllocation = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocation)
 	aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
 
-	cpuv, ramv, gpuv, pvvs := getPriceVectors(cp, costDatum, discount, idleCoefficient)
+	cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
 	aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
 	aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
 	aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
+	aggregation.NetworkCostVector = addVectors(netv, aggregation.NetworkCostVector)
 	for _, vectorList := range pvvs {
 		aggregation.PVCostVector = addVectors(aggregation.PVCostVector, vectorList)
 	}
 }
 
-func getPriceVectors(cp cloud.Provider, costDatum *CostData, discount float64, idleCoefficient float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
+func getPriceVectors(cp cloud.Provider, costDatum *CostData, rate string, discount float64, idleCoefficient float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector, []*Vector) {
 	cpuCostStr := costDatum.NodeData.VCPUCost
 	ramCostStr := costDatum.NodeData.RAMCost
 	gpuCostStr := costDatum.NodeData.GPUCost
@@ -217,11 +245,24 @@ func getPriceVectors(cp cloud.Provider, costDatum *CostData, discount float64, i
 	gpuCost, _ := strconv.ParseFloat(gpuCostStr, 64)
 	pvCost, _ := strconv.ParseFloat(pvCostStr, 64)
 
+	// rateCoeff scales the individual time series data values by the appropriate
+	// number. Each value is, by default, the daily value, so the scales convert
+	// from daily to the target rate.
+	rateCoeff := 1.0
+	switch rate {
+	case "daily":
+		rateCoeff = hoursPerDay
+	case "monthly":
+		rateCoeff = hoursPerMonth
+	case "hourly":
+	default:
+	}
+
 	cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
 	for _, val := range costDatum.CPUAllocation {
 		cpuv = append(cpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * cpuCost * (1 - discount) * 1 / idleCoefficient,
+			Value:     (val.Value * cpuCost * (1 - discount) / idleCoefficient) * rateCoeff,
 		})
 	}
 
@@ -229,7 +270,7 @@ func getPriceVectors(cp cloud.Provider, costDatum *CostData, discount float64, i
 	for _, val := range costDatum.RAMAllocation {
 		ramv = append(ramv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     (val.Value / 1024 / 1024 / 1024) * ramCost * (1 - discount) * 1 / idleCoefficient,
+			Value:     ((val.Value / 1024 / 1024 / 1024) * ramCost * (1 - discount) / idleCoefficient) * rateCoeff,
 		})
 	}
 
@@ -237,7 +278,7 @@ func getPriceVectors(cp cloud.Provider, costDatum *CostData, discount float64, i
 	for _, val := range costDatum.GPUReq {
 		gpuv = append(gpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * gpuCost * (1 - discount) * 1 / idleCoefficient,
+			Value:     (val.Value * gpuCost * (1 - discount) / idleCoefficient) * rateCoeff,
 		})
 	}
 
@@ -255,14 +296,16 @@ func getPriceVectors(cp cloud.Provider, costDatum *CostData, discount float64, i
 			for _, val := range pvcData.Values {
 				pvv = append(pvv, &Vector{
 					Timestamp: math.Round(val.Timestamp/10) * 10,
-					Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount) * 1 / idleCoefficient,
+					Value:     ((val.Value / 1024 / 1024 / 1024) * cost / idleCoefficient) * rateCoeff,
 				})
 			}
 			pvvs = append(pvvs, pvv)
 		}
 	}
 
-	return cpuv, ramv, gpuv, pvvs
+	netv := costDatum.NetworkData
+
+	return cpuv, ramv, gpuv, pvvs, netv
 }
 
 func totalVector(vectors []*Vector) float64 {

+ 133 - 44
costmodel/router.go

@@ -180,7 +180,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
 	aggregationField := r.URL.Query().Get("aggregation")
-	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
+	subfields := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
 
 	if offset != "" {
 		offset = "offset " + offset
@@ -191,13 +191,28 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		c, err := a.Cloud.GetConfig()
 		if err != nil {
 			w.Write(wrapData(nil, err))
+			return
 		}
+
 		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
 		if err != nil {
 			w.Write(wrapData(nil, err))
+			return
 		}
 		discount = discount * 0.01
-		agg := AggregateCostModel(a.Cloud, data, aggregationField, aggregationSubField, false, discount, 1.0, nil)
+
+		dur, err := time.ParseDuration(window)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+		// dataCount is the number of time series data expected for the given interval,
+		// which we compute because Prometheus time series vectors omit zero values.
+		// This assumes hourly data, incremented by one to capture the 0th data point.
+		dataCount := int64(dur.Hours()) + 1
+		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
+
+		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, 1.0, nil)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -242,8 +257,8 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 }
 
 // AggregateCostModel handles HTTP requests to the aggregated cost model API, which can be parametrized
-// by time period using window and offset, aggregation field using field and subfield (in cases like
-// field=label, subfield=app for grouping by label.app), and filtered by namespace.
+// by time period using window and offset, aggregation field and subfield (e.g. grouping by label.app
+// using aggregation=label, aggregationSubfield=app), and filtered by namespace and cluster.
 func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -253,8 +268,9 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	namespace := r.URL.Query().Get("namespace")
 	cluster := r.URL.Query().Get("cluster")
 	field := r.URL.Query().Get("aggregation")
-	subfield := r.URL.Query().Get("aggregationSubfield")
-	allocateIdle := r.URL.Query().Get("allocateIdle")
+	subfields := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
+	rate := r.URL.Query().Get("rate")
+	allocateIdle := r.URL.Query().Get("allocateIdle") == "true"
 	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
 	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
 	sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
@@ -272,53 +288,92 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	// then recompute and cache the requested data
 	clearCache := r.URL.Query().Get("clearCache") == "true"
 
-	// aggregation field is required
-	if field == "" {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation field parameter")))
-		return
-	}
+	// time window must be defined, whether by window and offset or by manually
+	// setting the start and end times as ISO time strings
+	var start, end string
+	var dur time.Duration
+	layout := "2006-01-02T15:04:05.000Z"
+	if window == "" {
+		start = r.URL.Query().Get("start")
+		startTime, err := time.Parse(layout, start)
+		if err != nil {
+			w.WriteHeader(http.StatusBadRequest)
+			w.Write(wrapData(nil, fmt.Errorf("Invalid start parameter: %s", start)))
+			return
+		}
 
-	// aggregation subfield is required when aggregation field is "label"
-	if field == "label" && subfield == "" {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation subfield parameter for aggregation by label")))
-		return
-	}
+		end = r.URL.Query().Get("end")
+		endTime, err := time.Parse(layout, end)
+		if err != nil {
+			w.WriteHeader(http.StatusBadRequest)
+			w.Write(wrapData(nil, fmt.Errorf("Invalid end parameter: %s", end)))
+			return
+		}
 
-	// endTime defaults to the current time, unless an offset is explicity declared,
-	// in which case it shifts endTime back by given duration
-	endTime := time.Now()
-	if offset != "" {
-		o, err := time.ParseDuration(offset)
+		dur = endTime.Sub(startTime)
+	} else {
+		// endTime defaults to the current time, unless an offset is explicity declared,
+		// in which case it shifts endTime back by given duration
+		endTime := time.Now()
+		if offset != "" {
+			o, err := time.ParseDuration(offset)
+			if err != nil {
+				klog.V(1).Infof("error parsing offset: %s", err)
+				w.Write(wrapData(nil, err))
+				return
+			}
+			endTime = endTime.Add(-1 * o)
+		}
+
+		// if window is defined in terms of days, convert to hours
+		// e.g. convert "2d" to "48h"
+		window, err := normalizeTimeParam(window)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+
+		// convert time window into start and end times, formatted
+		// as ISO datetime strings
+		dur, err = time.ParseDuration(window)
 		if err != nil {
 			w.Write(wrapData(nil, err))
 			return
 		}
+		startTime := endTime.Add(-1 * dur)
+
+		start = startTime.Format(layout)
+		end = endTime.Format(layout)
 
-		endTime = endTime.Add(-1 * o)
+		klog.V(1).Infof("start: %s, end: %s", start, end)
 	}
 
-	// if window is defined in terms of days, convert to hours
-	// e.g. convert "2d" to "48h"
-	window, err := normalizeTimeParam(window)
-	if err != nil {
-		w.Write(wrapData(nil, err))
+	// dataCount is the number of time series data expected for the given interval,
+	// which we compute because Prometheus time series vectors omit zero values.
+	// This assumes hourly data, incremented by one to capture the 0th data point.
+	dataCount := int64(dur.Hours()) + 1
+	klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
+
+	// aggregation field is required
+	if field == "" {
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation field parameter")))
 		return
 	}
 
-	// convert time window into start and end times, formatted
-	// as ISO datetime strings
-	d, err := time.ParseDuration(window)
-	if err != nil {
-		w.Write(wrapData(nil, err))
+	// aggregation subfield is required when aggregation field is "label"
+	if field == "label" && len(subfields) == 0 {
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation subfield parameter for aggregation by label")))
 		return
 	}
 
-	startTime := endTime.Add(-1 * d)
-	layout := "2006-01-02T15:04:05.000Z"
-	start := startTime.Format(layout)
-	end := endTime.Format(layout)
+	// enforce one of four available rate options
+	if rate != "" && rate != "hourly" && rate != "daily" && rate != "monthly" {
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapData(nil, fmt.Errorf("If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'")))
+		return
+	}
 
 	// clear cache prior to checking the cache so that a clearCache=true
 	// request always returns a freshly computed value
@@ -326,7 +381,9 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		a.Cache.Flush()
 	}
 
-	aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s:%s:%t", window, offset, namespace, cluster, field, subfield, timeSeries)
+	// parametrize cache key by all request parameters
+	aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s:%s:%s:%t:%t",
+		window, offset, namespace, cluster, field, strings.Join(subfields, ","), rate, timeSeries, allocateIdle)
 
 	// check the cache for aggregated response; if cache is hit and not disabled, return response
 	if result, found := a.Cache.Get(aggKey); found && !disableCache {
@@ -351,6 +408,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 
 	data, err := a.Model.ComputeCostDataRange(pClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, cluster, remoteEnabled)
 	if err != nil {
+		klog.V(1).Infof("error computing cost data range: start=%s, end=%s, err=%s", start, end, err)
 		w.Write(wrapData(nil, err))
 		return
 	}
@@ -368,10 +426,13 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	discount = discount * 0.01
 
 	idleCoefficient := 1.0
-	if allocateIdle == "true" {
-		idleCoefficient, err = ComputeIdleCoefficient(data, a.PrometheusClient, a.Cloud, discount, fmt.Sprintf("%dh", int(d.Hours())), offset)
+	if allocateIdle {
+		windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
+		idleCoefficient, err = ComputeIdleCoefficient(data, a.PrometheusClient, a.Cloud, discount, windowStr, offset)
 		if err != nil {
+			klog.V(1).Infof("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
 			w.Write(wrapData(nil, err))
+			return
 		}
 	}
 
@@ -395,7 +456,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	}
 
 	// aggregate cost model data by given fields and cache the result for the default expiration
-	result := AggregateCostModel(a.Cloud, data, field, subfield, timeSeries, discount, idleCoefficient, sr)
+	result := AggregateCostData(a.Cloud, data, dataCount, field, subfields, rate, timeSeries, discount, idleCoefficient, sr)
 	a.Cache.Set(aggKey, result, cache.DefaultExpiration)
 
 	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss: %s", aggKey)))
@@ -412,7 +473,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	namespace := r.URL.Query().Get("namespace")
 	cluster := r.URL.Query().Get("cluster")
 	aggregationField := r.URL.Query().Get("aggregation")
-	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
+	subfields := strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
 	remote := r.URL.Query().Get("remote")
 
 	remoteAvailable := os.Getenv(remoteEnabled)
@@ -437,13 +498,41 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		c, err := a.Cloud.GetConfig()
 		if err != nil {
 			w.Write(wrapData(nil, err))
+			return
 		}
+
 		discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
 		if err != nil {
 			w.Write(wrapData(nil, err))
 		}
 		discount = discount * 0.01
-		agg := AggregateCostModel(a.Cloud, data, aggregationField, aggregationSubField, false, discount, 1.0, nil)
+
+		layout := "2006-01-02T15:04:05.000Z"
+		startTime, err := time.Parse(layout, start)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+		endTime, err := time.Parse(layout, end)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+
+		dur := endTime.Sub(startTime)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+			return
+		}
+		windowHrs, err := strconv.ParseInt(window[:len(window)-1], 10, 64)
+
+		// dataCount is the number of time series data expected for the given interval,
+		// which we compute because Prometheus time series vectors omit zero values.
+		// This assumes hourly data, incremented by one to capture the 0th data point.
+		dataCount := (int64(dur.Hours()) / windowHrs) + 1
+		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
+
+		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, 1.0, nil)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {

+ 10 - 1
test/aggregation_test.go

@@ -11,6 +11,8 @@ import (
 )
 
 func TestAggregation(t *testing.T) {
+	cp := &cloud.CustomProvider{}
+
 	cd1 := &costModel.CostData{
 		Namespace: "test1",
 		NodeName:  "testnode",
@@ -101,7 +103,14 @@ func TestAggregation(t *testing.T) {
 	costData := make(map[string]*costModel.CostData)
 	costData["test1,foo,nginx,testnode"] = cd1
 	costData["test1,bar,nginx,testnode"] = cd2
-	agg := costModel.AggregateCostModel(costData, "namespace", "", false, 0.0, 1.0, nil)
+
+	dataCount := int64(1)
+
+	field := "namespace"
+	subfields := []string{""}
+	rate := ""
+
+	agg := costModel.AggregateCostData(cp, costData, dataCount, field, subfields, rate, false, 0.0, 1.0, nil)
 	log.Printf("agg: %+v", agg["test1"])
 	assert.Equal(t, agg["test1"].TotalCost, 8.0)
 }

+ 6 - 3
test/historical_pod_test.go

@@ -226,11 +226,13 @@ func TestPodUpDown(t *testing.T) {
 	log.Printf("Starting at %s \n", startStr)
 	log.Printf("Ending at %s \n", endStr)
 	provider.DownloadPricingData()
+
 	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1m", "", "", false)
 	if err != nil {
 		panic(err)
 	}
-	agg := costModel.AggregateCostModel(data, "namespace", "", false, 0.0, 1.0, nil)
+
+	agg := costModel.AggregateCostData(provider, data, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
 	_, ok := agg["test"]
 	assert.Assert(t, ok)
 
@@ -238,11 +240,12 @@ func TestPodUpDown(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-	agg2 := costModel.AggregateCostModel(data2, "namespace", "", false, 0.0, 1.0, nil)
+
+	agg2 := costModel.AggregateCostData(provider, data2, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
 	_, ok2 := agg2["test"]
 	assert.Assert(t, ok2)
 
-	agg3 := costModel.AggregateCostModel(data, "label", "testaggregation", false, 0.0, 1.0, nil)
+	agg3 := costModel.AggregateCostData(provider, data, 1, "label", []string{"testaggregation"}, "", false, 0.0, 1.0, nil)
 	_, ok3 := agg3["foo"]
 	assert.Assert(t, ok3)
 }

+ 2 - 2
test/remote_cluster_test.go

@@ -68,8 +68,8 @@ func TestClusterConvergence(t *testing.T) {
 		panic(err)
 	}
 
-	agg := costModel.AggregateCostModel(data, "namespace", "", false, 0.0, 1.0, nil)
-	agg2 := costModel.AggregateCostModel(data2, "namespace", "", false, 0.0, 1.0, nil)
+	agg := costModel.AggregateCostData(provider, data, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
+	agg2 := costModel.AggregateCostData(provider, data2, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
 
 	assert.Equal(t, agg["kubecost"].TotalCost, agg2["kubecost"].TotalCost)