Quellcode durchsuchen

Merge branch 'master' of github.com:kubecost/cost-model

AjayTripathy vor 6 Jahren
Ursprung
Commit
532af972a7
4 geänderte Dateien mit 251 neuen und 300 gelöschten Zeilen
  1. 97 47
      costmodel/aggregations.go
  2. 149 4
      costmodel/costmodel.go
  3. 3 249
      costmodel/router.go
  4. 2 0
      main.go

+ 97 - 47
costmodel/aggregations.go

@@ -50,6 +50,24 @@ type Aggregation struct {
 	TotalCost            float64   `json:"totalCost"`
 	TotalCost            float64   `json:"totalCost"`
 }
 }
 
 
+// VectorJoinOp is an operation func that accepts a result vector pointer
+// for a specific timestamp and two float64 pointers representing the
+// input vectors for that timestamp. x or y inputs can be nil, but not
+// both. The op should use x and y values to set the Value on the result
+// ptr. If a result could not be generated, the op should return false,
+// which will omit the vector for the specific timestamp. Otherwise,
+// return true denoting a successful op.
+type VectorJoinOp func(result *Vector, x *float64, y *float64) bool
+
+// returns a nil ptr or valid float ptr based on the ok bool
+func VectorValue(v float64, ok bool) *float64 {
+	if !ok {
+		return nil
+	}
+
+	return &v
+}
+
 func (a *Aggregation) GetDataCount() int {
 func (a *Aggregation) GetDataCount() int {
 	length := 0
 	length := 0
 
 
@@ -107,7 +125,7 @@ func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, label
 	return sr
 	return sr
 }
 }
 
 
-func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset, resolution string) (map[string]float64, error) {
+func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset string) (map[string]float64, error) {
 	coefficients := make(map[string]float64)
 	coefficients := make(map[string]float64)
 
 
 	windowDuration, err := time.ParseDuration(windowString)
 	windowDuration, err := time.ParseDuration(windowString)
@@ -115,12 +133,6 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	resolutionDuration, err := ParseDuration(resolution)
-	resolutionCoefficient := resolutionDuration.Hours()
-	if resolutionCoefficient < 1 {
-		resolutionCoefficient = 1 // just use 1 hour here, for numbers less than 1.
-	}
-
 	allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
 	allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -153,11 +165,11 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 		for _, costDatum := range costData {
 		for _, costDatum := range costData {
 			if costDatum.ClusterID == cid {
 			if costDatum.ClusterID == cid {
 				cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
 				cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
-				totalContainerCost += totalVectors(cpuv) * resolutionCoefficient
-				totalContainerCost += totalVectors(ramv) * resolutionCoefficient
-				totalContainerCost += totalVectors(gpuv) * resolutionCoefficient
+				totalContainerCost += totalVectors(cpuv)
+				totalContainerCost += totalVectors(ramv)
+				totalContainerCost += totalVectors(gpuv)
 				for _, pv := range pvvs {
 				for _, pv := range pvvs {
-					totalContainerCost += totalVectors(pv) * resolutionCoefficient
+					totalContainerCost += totalVectors(pv)
 				}
 				}
 			}
 			}
 
 
@@ -171,14 +183,31 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 
 
 // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
 // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
 type AggregationOptions struct {
 type AggregationOptions struct {
-	DataCount             int                // number of cost data points expected; ensures proper rate calculation if data is incomplete
-	Discount              float64            // percent by which to discount CPU, RAM, and GPU cost
-	IdleCoefficients      map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
-	IncludeEfficiency     bool               // set to true to receive efficiency/usage data
-	IncludeTimeSeries     bool               // set to true to receive time series data
-	Rate                  string             // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
-	ResolutionCoefficient float64            // coefficient for converting hourly costs to per-resolution cost; e.g. 6 for a 6h resolution
-	SharedResourceInfo    *SharedResourceInfo
+	DataCount          int                // number of cost data points expected; ensures proper rate calculation if data is incomplete
+	Discount           float64            // percent by which to discount CPU, RAM, and GPU cost
+	IdleCoefficients   map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
+	IncludeEfficiency  bool               // set to true to receive efficiency/usage data
+	IncludeTimeSeries  bool               // set to true to receive time series data
+	Rate               string             // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
+	SharedResourceInfo *SharedResourceInfo
+}
+
+// Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
+// clamp is required
+func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
+	rAvg := requestsAvg
+	if rAvg > allocationAvg {
+		klog.V(3).Infof("Warning: Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
+		rAvg = allocationAvg
+	}
+
+	uAvg := usedAverage
+	if uAvg > allocationAvg {
+		klog.V(3).Infof("Warning: Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
+		uAvg = allocationAvg
+	}
+
+	return rAvg, uAvg
 }
 }
 
 
 // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
 // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
@@ -197,14 +226,6 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 		idleCoefficients = make(map[string]float64)
 		idleCoefficients = make(map[string]float64)
 	}
 	}
 
 
-	// resolution coefficient compensates for less-frequent-than-hourly samples by multiplying
-	// cumulative values by the hours between samples. does not apply to rate data and defaults
-	// to 1.0, which matches hourly sampling of hourly data.
-	resolutionCoefficient := opts.ResolutionCoefficient
-	if resolutionCoefficient == 0.0 || rate != "" {
-		resolutionCoefficient = 1.0
-	}
-
 	// aggregations collects key-value pairs of resource group-to-aggregated data
 	// aggregations collects key-value pairs of resource group-to-aggregated data
 	// e.g. namespace-to-data or label-value-to-data
 	// e.g. namespace-to-data or label-value-to-data
 	aggregations := make(map[string]*Aggregation)
 	aggregations := make(map[string]*Aggregation)
@@ -220,12 +241,12 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 		}
 		}
 		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
 		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
 			cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
 			cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
-			sharedResourceCost += totalVectors(cpuv) * resolutionCoefficient
-			sharedResourceCost += totalVectors(ramv) * resolutionCoefficient
-			sharedResourceCost += totalVectors(gpuv) * resolutionCoefficient
+			sharedResourceCost += totalVectors(cpuv)
+			sharedResourceCost += totalVectors(ramv)
+			sharedResourceCost += totalVectors(gpuv)
 			sharedResourceCost += totalVectors(netv)
 			sharedResourceCost += totalVectors(netv)
 			for _, pv := range pvvs {
 			for _, pv := range pvvs {
-				sharedResourceCost += totalVectors(pv) * resolutionCoefficient
+				sharedResourceCost += totalVectors(pv)
 			}
 			}
 		} else {
 		} else {
 			if field == "cluster" {
 			if field == "cluster" {
@@ -259,11 +280,11 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 		}
 		}
 	}
 	}
 
 
-	for _, agg := range aggregations {
-		agg.CPUCost = totalVectors(agg.CPUCostVector) * resolutionCoefficient
-		agg.RAMCost = totalVectors(agg.RAMCostVector) * resolutionCoefficient
-		agg.GPUCost = totalVectors(agg.GPUCostVector) * resolutionCoefficient
-		agg.PVCost = totalVectors(agg.PVCostVector) * resolutionCoefficient
+	for key, agg := range aggregations {
+		agg.CPUCost = totalVectors(agg.CPUCostVector)
+		agg.RAMCost = totalVectors(agg.RAMCostVector)
+		agg.GPUCost = totalVectors(agg.GPUCostVector)
+		agg.PVCost = totalVectors(agg.PVCostVector)
 		agg.NetworkCost = totalVectors(agg.NetworkCostVector)
 		agg.NetworkCost = totalVectors(agg.NetworkCostVector)
 		agg.SharedCost = sharedResourceCost / float64(len(aggregations))
 		agg.SharedCost = sharedResourceCost / float64(len(aggregations))
 
 
@@ -282,6 +303,13 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 
 
 		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
 		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
 
 
+		// Evicted and Completed Pods can still show up here, but have 0 cost.
+		// Filter these by default. Any reason to keep them?
+		if agg.TotalCost == 0 {
+			delete(aggregations, key)
+			continue
+		}
+
 		agg.CPUAllocationAverage = averageVectors(agg.CPUAllocationVectors)
 		agg.CPUAllocationAverage = averageVectors(agg.CPUAllocationVectors)
 		agg.GPUAllocationAverage = averageVectors(agg.GPUAllocationVectors)
 		agg.GPUAllocationAverage = averageVectors(agg.GPUAllocationVectors)
 		agg.RAMAllocationAverage = averageVectors(agg.RAMAllocationVectors)
 		agg.RAMAllocationAverage = averageVectors(agg.RAMAllocationVectors)
@@ -303,6 +331,10 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 			if agg.CPUAllocationAverage > 0.0 {
 			if agg.CPUAllocationAverage > 0.0 {
 				avgCPURequested := averageVectors(agg.CPURequestedVectors)
 				avgCPURequested := averageVectors(agg.CPURequestedVectors)
 				avgCPUUsed := averageVectors(agg.CPUUsedVectors)
 				avgCPUUsed := averageVectors(agg.CPUUsedVectors)
+
+				// Clamp averages, log range violations
+				avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationAverage, "CPU")
+
 				CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationAverage)
 				CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationAverage)
 				agg.CPUEfficiency = 1.0 - CPUIdle
 				agg.CPUEfficiency = 1.0 - CPUIdle
 			}
 			}
@@ -312,6 +344,10 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 			if agg.RAMAllocationAverage > 0.0 {
 			if agg.RAMAllocationAverage > 0.0 {
 				avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
 				avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
 				avgRAMUsed := averageVectors(agg.RAMUsedVectors)
 				avgRAMUsed := averageVectors(agg.RAMUsedVectors)
+
+				// Clamp averages, log range violations
+				avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationAverage, "RAM")
+
 				RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationAverage)
 				RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationAverage)
 				agg.RAMEfficiency = 1.0 - RAMIdle
 				agg.RAMEfficiency = 1.0 - RAMIdle
 			}
 			}
@@ -582,6 +618,24 @@ func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
 // Matching Vectors are summed, while unmatched Vectors are passed through.
 // Matching Vectors are summed, while unmatched Vectors are passed through.
 // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
 // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
 func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
 func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
+	sumOp := func(result *Vector, x *float64, y *float64) bool {
+		if x != nil && y != nil {
+			result.Value = *x + *y
+		} else if y != nil {
+			result.Value = *y
+		} else if x != nil {
+			result.Value = *x
+		}
+
+		return true
+	}
+
+	return ApplyVectorOp(xvs, yvs, sumOp)
+}
+
+// ApplyVectorOp accepts two vectors, synchronizes timestamps, and executes an operation
+// on each vector. See VectorJoinOp for details.
+func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
 	// round all non-zero timestamps to the nearest 10 second mark
 	// round all non-zero timestamps to the nearest 10 second mark
 	for _, yv := range yvs {
 	for _, yv := range yvs {
 		if yv.Timestamp != 0 {
 		if yv.Timestamp != 0 {
@@ -604,8 +658,8 @@ func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
 		return xvs
 		return xvs
 	}
 	}
 
 
-	// sum stores the sum of the vector slices xvs and yvs
-	var sum []*Vector
+	// result contains the final vector slice after joining xvs and yvs
+	var result []*Vector
 
 
 	// timestamps stores all timestamps present in both vector slices
 	// timestamps stores all timestamps present in both vector slices
 	// without duplicates
 	// without duplicates
@@ -633,21 +687,17 @@ func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
 		}
 		}
 	}
 	}
 
 
-	// iterate over each timestamp to produce a final summed vector slice
+	// iterate over each timestamp to produce a final op vector slice
 	sort.Float64s(timestamps)
 	sort.Float64s(timestamps)
 	for _, t := range timestamps {
 	for _, t := range timestamps {
 		x, okX := xMap[t]
 		x, okX := xMap[t]
 		y, okY := yMap[t]
 		y, okY := yMap[t]
 		sv := &Vector{Timestamp: t}
 		sv := &Vector{Timestamp: t}
-		if okX && okY {
-			sv.Value = x + y
-		} else if okX {
-			sv.Value = x
-		} else if okY {
-			sv.Value = y
+
+		if op(sv, VectorValue(x, okX), VectorValue(y, okY)) {
+			result = append(result, sv)
 		}
 		}
-		sum = append(sum, sv)
 	}
 	}
 
 
-	return sum
+	return result
 }
 }

+ 149 - 4
costmodel/costmodel.go

@@ -1365,6 +1365,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 
 func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
 	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
+	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
+	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
+	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
 	queryRAMAlloc := fmt.Sprintf(queryRAMAllocation, windowString, "", windowString, "")
 	queryRAMAlloc := fmt.Sprintf(queryRAMAllocation, windowString, "", windowString, "")
 	queryCPUAlloc := fmt.Sprintf(queryCPUAllocation, windowString, "", windowString, "")
 	queryCPUAlloc := fmt.Sprintf(queryCPUAllocation, windowString, "", windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "")
@@ -1402,9 +1406,33 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}
 	}
 
 
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
-	wg.Add(15)
+	wg.Add(19)
 
 
 	var promErr error
 	var promErr error
+	var resultRAMRequests interface{}
+	go func() {
+		defer wg.Done()
+
+		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
+	}()
+	var resultRAMUsage interface{}
+	go func() {
+		defer wg.Done()
+
+		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
+	}()
+	var resultCPURequests interface{}
+	go func() {
+		defer wg.Done()
+
+		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
+	}()
+	var resultCPUUsage interface{}
+	go func() {
+		defer wg.Done()
+
+		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
+	}()
 	var resultRAMAllocations interface{}
 	var resultRAMAllocations interface{}
 	go func() {
 	go func() {
 		defer wg.Done()
 		defer wg.Done()
@@ -1526,7 +1554,8 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 
 	normalizationValue, err := getNormalizations(normalizationResults)
 	normalizationValue, err := getNormalizations(normalizationResults)
 	if err != nil {
 	if err != nil {
-		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
+		return nil, fmt.Errorf("error computing normalization for start=%s, end=%s, window=%s: %s",
+			start, end, window, err.Error())
 	}
 	}
 
 
 	nodes, err := getNodeCost(cm.Cache, cp)
 	nodes, err := getNodeCost(cm.Cache, cp)
@@ -1605,6 +1634,36 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	containers := make(map[string]bool)
 	containers := make(map[string]bool)
 	otherClusterPVRecorded := make(map[string]bool)
 	otherClusterPVRecorded := make(map[string]bool)
 
 
+	RAMReqMap, err := GetContainerMetricVectors(resultRAMRequests, true, normalizationValue, clusterID)
+	if err != nil {
+		return nil, err
+	}
+	for key := range RAMReqMap {
+		containers[key] = true
+	}
+	RAMUsedMap, err := GetContainerMetricVectors(resultRAMUsage, true, normalizationValue, clusterID)
+	if err != nil {
+		return nil, err
+	}
+	for key := range RAMUsedMap {
+		containers[key] = true
+	}
+
+	CPUReqMap, err := GetContainerMetricVectors(resultCPURequests, true, normalizationValue, clusterID)
+	if err != nil {
+		return nil, err
+	}
+	for key := range CPUReqMap {
+		containers[key] = true
+	}
+	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, normalizationValue, clusterID) // No need to normalize here, as this comes from a counter
+	if err != nil {
+		return nil, err
+	}
+	for key := range CPUUsedMap {
+		containers[key] = true
+	}
+
 	RAMAllocMap, err := GetContainerMetricVectors(resultRAMAllocations, true, normalizationValue, clusterID)
 	RAMAllocMap, err := GetContainerMetricVectors(resultRAMAllocations, true, normalizationValue, clusterID)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -1627,6 +1686,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		containers[key] = true
 		containers[key] = true
 	}
 	}
 
 
+	// Request metrics can show up after pod eviction and completion.
+	// This method synchronizes requests to allocations such that when
+	// allocation is 0, so are requests
+	applyAllocationToRequests(RAMAllocMap, RAMReqMap)
+	applyAllocationToRequests(CPUAllocMap, CPUReqMap)
+
 	currentContainers := make(map[string]v1.Pod)
 	currentContainers := make(map[string]v1.Pod)
 	for _, pod := range podlist {
 	for _, pod := range podlist {
 		if pod.Status.Phase != v1.PodRunning {
 		if pod.Status.Phase != v1.PodRunning {
@@ -1712,6 +1777,26 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 				containerName := container.Name
 				containerName := container.Name
 
 
 				newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
 				newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
+				RAMReqV, ok := RAMReqMap[newKey]
+				if !ok {
+					klog.V(4).Info("no RAM requests for " + newKey)
+					RAMReqV = []*Vector{}
+				}
+				RAMUsedV, ok := RAMUsedMap[newKey]
+				if !ok {
+					klog.V(4).Info("no RAM usage for " + newKey)
+					RAMUsedV = []*Vector{}
+				}
+				CPUReqV, ok := CPUReqMap[newKey]
+				if !ok {
+					klog.V(4).Info("no CPU requests for " + newKey)
+					CPUReqV = []*Vector{}
+				}
+				CPUUsedV, ok := CPUUsedMap[newKey]
+				if !ok {
+					klog.V(4).Info("no CPU usage for " + newKey)
+					CPUUsedV = []*Vector{}
+				}
 				RAMAllocsV, ok := RAMAllocMap[newKey]
 				RAMAllocsV, ok := RAMAllocMap[newKey]
 				if !ok {
 				if !ok {
 					klog.V(4).Info("no RAM allocation for " + newKey)
 					klog.V(4).Info("no RAM allocation for " + newKey)
@@ -1746,6 +1831,10 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					Jobs:            getJobsOfPod(pod),
 					Jobs:            getJobsOfPod(pod),
 					Statefulsets:    getStatefulSetsOfPod(pod),
 					Statefulsets:    getStatefulSetsOfPod(pod),
 					NodeData:        nodeData,
 					NodeData:        nodeData,
+					RAMReq:          RAMReqV,
+					RAMUsed:         RAMUsedV,
+					CPUReq:          CPUReqV,
+					CPUUsed:         CPUUsedV,
 					RAMAllocation:   RAMAllocsV,
 					RAMAllocation:   RAMAllocsV,
 					CPUAllocation:   CPUAllocsV,
 					CPUAllocation:   CPUAllocsV,
 					GPUReq:          GPUReqV,
 					GPUReq:          GPUReqV,
@@ -1766,7 +1855,26 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			// Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
 			// Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
 			klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
 			klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
 			c, _ := NewContainerMetricFromKey(key)
 			c, _ := NewContainerMetricFromKey(key)
-
+			RAMReqV, ok := RAMReqMap[key]
+			if !ok {
+				klog.V(4).Info("no RAM requests for " + key)
+				RAMReqV = []*Vector{}
+			}
+			RAMUsedV, ok := RAMUsedMap[key]
+			if !ok {
+				klog.V(4).Info("no RAM usage for " + key)
+				RAMUsedV = []*Vector{}
+			}
+			CPUReqV, ok := CPUReqMap[key]
+			if !ok {
+				klog.V(4).Info("no CPU requests for " + key)
+				CPUReqV = []*Vector{}
+			}
+			CPUUsedV, ok := CPUUsedMap[key]
+			if !ok {
+				klog.V(4).Info("no CPU usage for " + key)
+				CPUUsedV = []*Vector{}
+			}
 			RAMAllocsV, ok := RAMAllocMap[key]
 			RAMAllocsV, ok := RAMAllocMap[key]
 			if !ok {
 			if !ok {
 				klog.V(4).Info("no RAM allocation for " + key)
 				klog.V(4).Info("no RAM allocation for " + key)
@@ -1869,6 +1977,10 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 				Namespace:       c.Namespace,
 				Namespace:       c.Namespace,
 				Services:        podServices,
 				Services:        podServices,
 				Deployments:     podDeployments,
 				Deployments:     podDeployments,
+				RAMReq:          RAMReqV,
+				RAMUsed:         RAMUsedV,
+				CPUReq:          CPUReqV,
+				CPUUsed:         CPUUsedV,
 				RAMAllocation:   RAMAllocsV,
 				RAMAllocation:   RAMAllocsV,
 				CPUAllocation:   CPUAllocsV,
 				CPUAllocation:   CPUAllocsV,
 				GPUReq:          GPUReqV,
 				GPUReq:          GPUReqV,
@@ -1899,6 +2011,39 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	return containerNameCost, err
 	return containerNameCost, err
 }
 }
 
 
+func applyAllocationToRequests(allocationMap map[string][]*Vector, requestMap map[string][]*Vector) {
+	// The result of the normalize operation will be a new []*Vector to replace the requests
+	normalizeOp := func(r *Vector, x *float64, y *float64) bool {
+		// Omit data (return false) if both x and y inputs don't exist
+		if x == nil || y == nil {
+			return false
+		}
+
+		// If the allocation value is 0, 0 out request value
+		if *x == 0 {
+			r.Value = 0
+		} else {
+			r.Value = *y
+		}
+
+		return true
+	}
+
+	// Run normalization on all request vectors in the mapping
+	for k, requests := range requestMap {
+
+		// Only run normalization where there are valid allocations
+		allocations, ok := allocationMap[k]
+		if !ok {
+			delete(requestMap, k)
+			continue
+		}
+
+		// Replace request map with normalized
+		requestMap[k] = ApplyVectorOp(allocations, requests, normalizeOp)
+	}
+}
+
 func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pvCostMap map[string]*costAnalyzerCloud.PV, cp costAnalyzerCloud.Provider) {
 func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pvCostMap map[string]*costAnalyzerCloud.PV, cp costAnalyzerCloud.Provider) {
 	cfg, err := cp.GetConfig()
 	cfg, err := cp.GetConfig()
 	if err != nil {
 	if err != nil {
@@ -2327,7 +2472,7 @@ func getNormalizations(qr interface{}) ([]*Vector, error) {
 		}
 		}
 		return vectors, nil
 		return vectors, nil
 	}
 	}
-	return nil, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
+	return nil, fmt.Errorf("normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running")
 }
 }
 
 
 //todo: don't cast, implement unmarshaler interface
 //todo: don't cast, implement unmarshaler interface

+ 3 - 249
costmodel/router.go

@@ -62,7 +62,6 @@ type Accesses struct {
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	ServiceSelectorRecorder       *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	DeploymentSelectorRecorder    *prometheus.GaugeVec
 	Model                         *CostModel
 	Model                         *CostModel
-	AggregateCache                *cache.Cache
 	CostDataCache                 *cache.Cache
 	CostDataCache                 *cache.Cache
 	OutOfClusterCache             *cache.Cache
 	OutOfClusterCache             *cache.Cache
 	SettingsCache                 *cache.Cache
 	SettingsCache                 *cache.Cache
@@ -182,11 +181,11 @@ func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
 	// in which case it shifts endTime back by given duration
 	// in which case it shifts endTime back by given duration
 	endTime := time.Now()
 	endTime := time.Now()
 	if offset != "" {
 	if offset != "" {
-		o, err := time.ParseDuration(offset)
+		o, err := ParseDuration(offset)
 		if err != nil {
 		if err != nil {
 			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
 			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
 		}
 		}
-		endTime = endTime.Add(-1 * o)
+		endTime = endTime.Add(-1 * *o)
 	}
 	}
 
 
 	// if duration is defined in terms of days, convert to hours
 	// if duration is defined in terms of days, convert to hours
@@ -379,249 +378,6 @@ func (a *Accesses) CustomPricingHasChanged() bool {
 	return true
 	return true
 }
 }
 
 
-// AggregateCostModel handles HTTP requests to the aggregated cost model API, which can be parametrized
-// by time period using window and offset, aggregation field and subfield (e.g. grouping by label.app
-// using aggregation=label, aggregationSubfield=app), and filtered by namespace and cluster.
-func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
-
-	w.Header().Set("Content-Type", "application/json")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-
-	duration := r.URL.Query().Get("window")
-	offset := r.URL.Query().Get("offset")
-	namespace := r.URL.Query().Get("namespace")
-	cluster := r.URL.Query().Get("cluster")
-	field := r.URL.Query().Get("aggregation")
-	subfieldStr := r.URL.Query().Get("aggregationSubfield")
-	rate := r.URL.Query().Get("rate")
-	allocateIdle := r.URL.Query().Get("allocateIdle") == "true"
-	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
-	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
-	sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
-	remote := r.URL.Query().Get("remote") != "false"
-
-	subfields := []string{}
-	if len(subfieldStr) > 0 {
-		subfields = strings.Split(r.URL.Query().Get("aggregationSubfield"), ",")
-	}
-
-	// timeSeries == true maintains the time series dimension of the data,
-	// which by default gets summed over the entire interval
-	includeTimeSeries := r.URL.Query().Get("timeSeries") == "true"
-
-	// efficiency == true aggregates and returns usage and efficiency data
-	includeEfficiency := r.URL.Query().Get("efficiency") == "true"
-
-	// disableCache, if set to "true", tells this function to recompute and
-	// cache the requested data
-	disableCache := r.URL.Query().Get("disableCache") == "true"
-
-	// clearCache, if set to "true", tells this function to flush the cache,
-	// then recompute and cache the requested data
-	clearCache := r.URL.Query().Get("clearCache") == "true"
-
-	// aggregation field is required
-	if field == "" {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Missing aggregation field parameter")))
-		return
-	}
-
-	// aggregation subfield is required when aggregation field is "label"
-	if field == "label" && len(subfields) == 0 {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Missing aggregation subfield parameter for aggregation by label")))
-		return
-	}
-
-	// enforce one of four available rate options
-	if rate != "" && rate != "hourly" && rate != "daily" && rate != "monthly" {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("If set, rate parameter must be one of: 'hourly', 'daily', 'monthly'")))
-		return
-	}
-
-	// if custom pricing has changed, then clear the cache and recompute data
-	if A.CustomPricingHasChanged() {
-		clearCache = true
-	}
-
-	// clear cache prior to checking the cache so that a clearCache=true
-	// request always returns a freshly computed value
-	if clearCache {
-		A.AggregateCache.Flush()
-		A.CostDataCache.Flush()
-	}
-
-	// parametrize cache key by all request parameters
-	aggKey := fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%t:%t:%t",
-		duration, offset, namespace, cluster, field, strings.Join(subfields, ","), rate,
-		allocateIdle, includeTimeSeries, includeEfficiency)
-
-	// check the cache for aggregated response; if cache is hit and not disabled, return response
-	if result, found := A.AggregateCache.Get(aggKey); found && !disableCache {
-		w.Write(WrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache hit: %s", aggKey)))
-		return
-	}
-
-	// enable remote if it is available and not disabled
-	remoteAvailable := os.Getenv(remoteEnabled) == "true"
-	remoteEnabled := remote && remoteAvailable
-
-	// Use Thanos Client if it exists (enabled) and remote flag set
-	var pClient prometheusClient.Client
-	if remote && A.ThanosClient != nil {
-		pClient = A.ThanosClient
-	} else {
-		pClient = A.PrometheusClient
-	}
-
-	// convert duration and offset to start and end times
-	startTime, endTime, err := ParseTimeRange(duration, offset)
-	if err != nil {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Error parsing duration (%s) and offset (%s)", duration, offset)))
-		return
-	}
-	durationHours := endTime.Sub(*startTime).Hours()
-
-	threeHoursAgo := time.Now().Add(-3 * time.Hour)
-	if A.ThanosClient != nil && endTime.After(threeHoursAgo) {
-		klog.Infof("Setting end time backwards to first present data")
-		*endTime = time.Now().Add(-3 * time.Hour)
-	}
-
-	// determine resolution by size of duration
-	resolution := duration
-	if durationHours >= 2160 {
-		// 90 days
-		resolution = "72h"
-	} else if durationHours >= 720 {
-		// 30 days
-		resolution = "24h"
-	} else if durationHours >= 168 {
-		// 7 days
-		resolution = "6h"
-	} else if durationHours >= 48 {
-		// 2 days
-		resolution = "2h"
-	} else if durationHours > 1 {
-		resolution = "1h"
-	}
-	resolutionDuration, err := ParseDuration(resolution)
-	resolutionHours := resolutionDuration.Hours()
-	if resolutionHours < 1 {
-		resolutionHours = 1
-	}
-	if err != nil {
-		w.WriteHeader(http.StatusBadRequest)
-		w.Write(WrapData(nil, fmt.Errorf("Error parsing resolution (%s)", resolution)))
-		return
-	}
-
-	// exclude the last window of the time frame to match Prometheus definitions of range, offset, and resolution
-	//   e.g. requesting duration=2d, offset=1d, resolution=1h on Jan 4 12:00:00 should provide data for Jan 1 12:00 - Jan 3 12:00
-	//        which has the equivalent start and end times of Jan 1 1:00 and Jan 3 12:00, respectively.
-	*startTime = startTime.Add(1 * *resolutionDuration)
-
-	// attempt to retrieve cost data from cache
-	var costData map[string]*CostData
-	key := fmt.Sprintf(`%s:%s:%s:%t`, duration, offset, resolution, remoteEnabled)
-	cacheData, found := A.CostDataCache.Get(key)
-	if found && !disableCache {
-		ok := false
-		costData, ok = cacheData.(map[string]*CostData)
-		if !ok {
-			klog.Errorf("caching error: failed to cast cost data to struct: %s", key)
-		}
-	} else {
-		start := startTime.Format(RFC3339Milli)
-		end := endTime.Format(RFC3339Milli)
-		costData, err = A.Model.ComputeCostDataRange(pClient, A.KubeClientSet, A.Cloud, start, end, resolution, "", "", remoteEnabled)
-		if err != nil {
-			w.Write(WrapData(nil, err))
-			return
-		}
-
-		A.CostDataCache.Set(key, costData, cache.DefaultExpiration)
-	}
-
-	c, err := A.Cloud.GetConfig()
-	if err != nil {
-		w.Write(WrapData(nil, err))
-		return
-	}
-	discount, err := ParsePercentString(c.Discount)
-	if err != nil {
-		w.Write(WrapData(nil, err))
-		return
-	}
-
-	idleCoefficients := make(map[string]float64)
-
-	if allocateIdle {
-		idleDurationCalcHours := durationHours
-		if durationHours < 1 {
-			idleDurationCalcHours = 1
-		}
-		windowStr := fmt.Sprintf("%dh", int(idleDurationCalcHours))
-		if A.ThanosClient != nil {
-			klog.Infof("Setting offset to 3h")
-			offset = "3h"
-		}
-		idleCoefficients, err = ComputeIdleCoefficient(costData, pClient, A.Cloud, discount, windowStr, offset, resolution)
-		if err != nil {
-			klog.Errorf("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
-			w.Write(WrapData(nil, err))
-			return
-		}
-	}
-
-	sn := []string{}
-	sln := []string{}
-	slv := []string{}
-	if sharedNamespaces != "" {
-		sn = strings.Split(sharedNamespaces, ",")
-	}
-	if sharedLabelNames != "" {
-		sln = strings.Split(sharedLabelNames, ",")
-		slv = strings.Split(sharedLabelValues, ",")
-		if len(sln) != len(slv) || slv[0] == "" {
-			w.Write(WrapData(nil, fmt.Errorf("Supply exacly one label value per label name")))
-			return
-		}
-	}
-	var sr *SharedResourceInfo
-	if len(sn) > 0 || len(sln) > 0 {
-		sr = NewSharedResourceInfo(true, sn, sln, slv)
-	}
-
-	for cid, idleCoefficient := range idleCoefficients {
-		klog.Infof("Idle Coeff: %s: %f", cid, idleCoefficient)
-	}
-
-	// filter cost data by namespace and cluster after caching for maximal cache hits
-	costData = FilterCostData(costData, namespace, cluster)
-
-	dataCount := int(durationHours / resolutionHours)
-
-	// aggregate cost model data by given fields and cache the result for the default expiration
-	opts := &AggregationOptions{
-		DataCount:             dataCount,
-		Discount:              discount,
-		IdleCoefficients:      idleCoefficients,
-		IncludeEfficiency:     includeEfficiency,
-		IncludeTimeSeries:     includeTimeSeries,
-		Rate:                  rate,
-		ResolutionCoefficient: resolutionHours,
-		SharedResourceInfo:    sr,
-	}
-	result := AggregateCostData(costData, field, subfields, A.Cloud, opts)
-	A.AggregateCache.Set(aggKey, result, cache.DefaultExpiration)
-
-	w.Write(WrapDataWithMessage(result, nil, fmt.Sprintf("aggregate cache miss: %s", aggKey)))
-}
-
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -1070,7 +826,7 @@ func (a *Accesses) recordPrices() {
 	}()
 	}()
 }
 }
 
 
-func init() {
+func Initialize() {
 	klog.InitFlags(nil)
 	klog.InitFlags(nil)
 	flag.Set("v", "3")
 	flag.Set("v", "3")
 	flag.Parse()
 	flag.Parse()
@@ -1205,7 +961,6 @@ func init() {
 	})
 	})
 
 
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
-	aggregateCache := cache.New(time.Minute*5, time.Minute*10)
 	costDataCache := cache.New(time.Minute*5, time.Minute*10)
 	costDataCache := cache.New(time.Minute*5, time.Minute*10)
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 	settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
 	settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
@@ -1228,7 +983,6 @@ func init() {
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
 		PersistentVolumePriceRecorder: pvGv,
 		PersistentVolumePriceRecorder: pvGv,
 		Model:                         NewCostModel(kubeClientset),
 		Model:                         NewCostModel(kubeClientset),
-		AggregateCache:                aggregateCache,
 		CostDataCache:                 costDataCache,
 		CostDataCache:                 costDataCache,
 		OutOfClusterCache:             outOfClusterCache,
 		OutOfClusterCache:             outOfClusterCache,
 		SettingsCache:                 settingsCache,
 		SettingsCache:                 settingsCache,

+ 2 - 0
main.go

@@ -16,6 +16,8 @@ func Healthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 }
 }
 
 
 func main() {
 func main() {
+	costmodel.Initialize()
+
 	rootMux := http.NewServeMux()
 	rootMux := http.NewServeMux()
 	costmodel.Router.GET("/healthz", Healthz)
 	costmodel.Router.GET("/healthz", Healthz)
 	rootMux.Handle("/", costmodel.Router)
 	rootMux.Handle("/", costmodel.Router)