Просмотр исходного кода

Merge pull request #237 from kubecost/Bolt-add-requests-usage-back

Efficiency Fixes for Cost Model
Matt Bolt 6 лет назад
Родитель
Сommit
af70aeca63
2 измененных файлов с 223 добавлено и 14 удалено
  1. 77 12
      costmodel/aggregations.go
  2. 146 2
      costmodel/costmodel.go

+ 77 - 12
costmodel/aggregations.go

@@ -50,6 +50,24 @@ type Aggregation struct {
 	TotalCost            float64   `json:"totalCost"`
 }
 
+// VectorJoinOp is an operation func that accepts a result vector pointer
+// for a specific timestamp and two float64 pointers representing the
+// input vectors for that timestamp. x or y inputs can be nil, but not
+// both. The op should use x and y values to set the Value on the result
+// ptr. If a result could not be generated, the op should return false,
+// which will omit the vector for the specific timestamp. Otherwise,
+// return true denoting a successful op.
+type VectorJoinOp func(result *Vector, x *float64, y *float64) bool
+
+// returns a nil ptr or valid float ptr based on the ok bool
+func VectorValue(v float64, ok bool) *float64 {
+	if !ok {
+		return nil
+	}
+
+	return &v
+}
+
 func (a *Aggregation) GetDataCount() int {
 	length := 0
 
@@ -174,6 +192,24 @@ type AggregationOptions struct {
 	SharedResourceInfo *SharedResourceInfo
 }
 
+// Helper method to test request/usgae values against allocation averages for efficiency scores. Generate a warning log if
+// clamp is required
+func clampAverage(requestsAvg float64, usedAverage float64, allocationAvg float64, resource string) (float64, float64) {
+	rAvg := requestsAvg
+	if rAvg > allocationAvg {
+		klog.V(3).Infof("Warning: Average %s Requested (%f) > Average %s Allocated (%f). Clamping.", resource, rAvg, resource, allocationAvg)
+		rAvg = allocationAvg
+	}
+
+	uAvg := usedAverage
+	if uAvg > allocationAvg {
+		klog.V(3).Infof("Warning: Average %s Used (%f) > Average %s Allocated (%f). Clamping.", resource, uAvg, resource, allocationAvg)
+		uAvg = allocationAvg
+	}
+
+	return rAvg, uAvg
+}
+
 // AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
 // must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
 // See AggregationOptions for optional parameters.
@@ -244,7 +280,7 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 		}
 	}
 
-	for _, agg := range aggregations {
+	for key, agg := range aggregations {
 		agg.CPUCost = totalVectors(agg.CPUCostVector)
 		agg.RAMCost = totalVectors(agg.RAMCostVector)
 		agg.GPUCost = totalVectors(agg.GPUCostVector)
@@ -267,6 +303,13 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 
 		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.NetworkCost + agg.SharedCost
 
+		// Evicted and Completed Pods can still show up here, but have 0 cost.
+		// Filter these by default. Any reason to keep them?
+		if agg.TotalCost == 0 {
+			delete(aggregations, key)
+			continue
+		}
+
 		agg.CPUAllocationAverage = averageVectors(agg.CPUAllocationVectors)
 		agg.GPUAllocationAverage = averageVectors(agg.GPUAllocationVectors)
 		agg.RAMAllocationAverage = averageVectors(agg.RAMAllocationVectors)
@@ -288,6 +331,10 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 			if agg.CPUAllocationAverage > 0.0 {
 				avgCPURequested := averageVectors(agg.CPURequestedVectors)
 				avgCPUUsed := averageVectors(agg.CPUUsedVectors)
+
+				// Clamp averages, log range violations
+				avgCPURequested, avgCPUUsed = clampAverage(avgCPURequested, avgCPUUsed, agg.CPUAllocationAverage, "CPU")
+
 				CPUIdle = ((avgCPURequested - avgCPUUsed) / agg.CPUAllocationAverage)
 				agg.CPUEfficiency = 1.0 - CPUIdle
 			}
@@ -297,6 +344,10 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 			if agg.RAMAllocationAverage > 0.0 {
 				avgRAMRequested := averageVectors(agg.RAMRequestedVectors)
 				avgRAMUsed := averageVectors(agg.RAMUsedVectors)
+
+				// Clamp averages, log range violations
+				avgRAMRequested, avgRAMUsed = clampAverage(avgRAMRequested, avgRAMUsed, agg.RAMAllocationAverage, "RAM")
+
 				RAMIdle = ((avgRAMRequested - avgRAMUsed) / agg.RAMAllocationAverage)
 				agg.RAMEfficiency = 1.0 - RAMIdle
 			}
@@ -567,6 +618,24 @@ func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
 // Matching Vectors are summed, while unmatched Vectors are passed through.
 // e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
 func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
+	sumOp := func(result *Vector, x *float64, y *float64) bool {
+		if x != nil && y != nil {
+			result.Value = *x + *y
+		} else if y != nil {
+			result.Value = *y
+		} else if x != nil {
+			result.Value = *x
+		}
+
+		return true
+	}
+
+	return ApplyVectorOp(xvs, yvs, sumOp)
+}
+
+// ApplyVectorOp accepts two vectors, synchronizes timestamps, and executes an operation
+// on each vector. See VectorJoinOp for details.
+func ApplyVectorOp(xvs []*Vector, yvs []*Vector, op VectorJoinOp) []*Vector {
 	// round all non-zero timestamps to the nearest 10 second mark
 	for _, yv := range yvs {
 		if yv.Timestamp != 0 {
@@ -589,8 +658,8 @@ func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
 		return xvs
 	}
 
-	// sum stores the sum of the vector slices xvs and yvs
-	var sum []*Vector
+	// result contains the final vector slice after joining xvs and yvs
+	var result []*Vector
 
 	// timestamps stores all timestamps present in both vector slices
 	// without duplicates
@@ -618,21 +687,17 @@ func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
 		}
 	}
 
-	// iterate over each timestamp to produce a final summed vector slice
+	// iterate over each timestamp to produce a final op vector slice
 	sort.Float64s(timestamps)
 	for _, t := range timestamps {
 		x, okX := xMap[t]
 		y, okY := yMap[t]
 		sv := &Vector{Timestamp: t}
-		if okX && okY {
-			sv.Value = x + y
-		} else if okX {
-			sv.Value = x
-		} else if okY {
-			sv.Value = y
+
+		if op(sv, VectorValue(x, okX), VectorValue(y, okY)) {
+			result = append(result, sv)
 		}
-		sum = append(sum, sv)
 	}
 
-	return sum
+	return result
 }

+ 146 - 2
costmodel/costmodel.go

@@ -1365,6 +1365,10 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
+	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
+	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
+	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
+	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
 	queryRAMAlloc := fmt.Sprintf(queryRAMAllocation, windowString, "", windowString, "")
 	queryCPUAlloc := fmt.Sprintf(queryCPUAllocation, windowString, "", windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "")
@@ -1402,9 +1406,33 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}
 
 	var wg sync.WaitGroup
-	wg.Add(15)
+	wg.Add(19)
 
 	var promErr error
+	var resultRAMRequests interface{}
+	go func() {
+		defer wg.Done()
+
+		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
+	}()
+	var resultRAMUsage interface{}
+	go func() {
+		defer wg.Done()
+
+		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
+	}()
+	var resultCPURequests interface{}
+	go func() {
+		defer wg.Done()
+
+		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
+	}()
+	var resultCPUUsage interface{}
+	go func() {
+		defer wg.Done()
+
+		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
+	}()
 	var resultRAMAllocations interface{}
 	go func() {
 		defer wg.Done()
@@ -1606,6 +1634,36 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	containers := make(map[string]bool)
 	otherClusterPVRecorded := make(map[string]bool)
 
+	RAMReqMap, err := GetContainerMetricVectors(resultRAMRequests, true, normalizationValue, clusterID)
+	if err != nil {
+		return nil, err
+	}
+	for key := range RAMReqMap {
+		containers[key] = true
+	}
+	RAMUsedMap, err := GetContainerMetricVectors(resultRAMUsage, true, normalizationValue, clusterID)
+	if err != nil {
+		return nil, err
+	}
+	for key := range RAMUsedMap {
+		containers[key] = true
+	}
+
+	CPUReqMap, err := GetContainerMetricVectors(resultCPURequests, true, normalizationValue, clusterID)
+	if err != nil {
+		return nil, err
+	}
+	for key := range CPUReqMap {
+		containers[key] = true
+	}
+	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, normalizationValue, clusterID) // No need to normalize here, as this comes from a counter
+	if err != nil {
+		return nil, err
+	}
+	for key := range CPUUsedMap {
+		containers[key] = true
+	}
+
 	RAMAllocMap, err := GetContainerMetricVectors(resultRAMAllocations, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
@@ -1628,6 +1686,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		containers[key] = true
 	}
 
+	// Request metrics can show up after pod eviction and completion.
+	// This method synchronizes requests to allocations such that when
+	// allocation is 0, so are requests
+	applyAllocationToRequests(RAMAllocMap, RAMReqMap)
+	applyAllocationToRequests(CPUAllocMap, CPUReqMap)
+
 	currentContainers := make(map[string]v1.Pod)
 	for _, pod := range podlist {
 		if pod.Status.Phase != v1.PodRunning {
@@ -1713,6 +1777,26 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 				containerName := container.Name
 
 				newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
+				RAMReqV, ok := RAMReqMap[newKey]
+				if !ok {
+					klog.V(4).Info("no RAM requests for " + newKey)
+					RAMReqV = []*Vector{}
+				}
+				RAMUsedV, ok := RAMUsedMap[newKey]
+				if !ok {
+					klog.V(4).Info("no RAM usage for " + newKey)
+					RAMUsedV = []*Vector{}
+				}
+				CPUReqV, ok := CPUReqMap[newKey]
+				if !ok {
+					klog.V(4).Info("no CPU requests for " + newKey)
+					CPUReqV = []*Vector{}
+				}
+				CPUUsedV, ok := CPUUsedMap[newKey]
+				if !ok {
+					klog.V(4).Info("no CPU usage for " + newKey)
+					CPUUsedV = []*Vector{}
+				}
 				RAMAllocsV, ok := RAMAllocMap[newKey]
 				if !ok {
 					klog.V(4).Info("no RAM allocation for " + newKey)
@@ -1747,6 +1831,10 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					Jobs:            getJobsOfPod(pod),
 					Statefulsets:    getStatefulSetsOfPod(pod),
 					NodeData:        nodeData,
+					RAMReq:          RAMReqV,
+					RAMUsed:         RAMUsedV,
+					CPUReq:          CPUReqV,
+					CPUUsed:         CPUUsedV,
 					RAMAllocation:   RAMAllocsV,
 					CPUAllocation:   CPUAllocsV,
 					GPUReq:          GPUReqV,
@@ -1767,7 +1855,26 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			// Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
 			klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
 			c, _ := NewContainerMetricFromKey(key)
-
+			RAMReqV, ok := RAMReqMap[key]
+			if !ok {
+				klog.V(4).Info("no RAM requests for " + key)
+				RAMReqV = []*Vector{}
+			}
+			RAMUsedV, ok := RAMUsedMap[key]
+			if !ok {
+				klog.V(4).Info("no RAM usage for " + key)
+				RAMUsedV = []*Vector{}
+			}
+			CPUReqV, ok := CPUReqMap[key]
+			if !ok {
+				klog.V(4).Info("no CPU requests for " + key)
+				CPUReqV = []*Vector{}
+			}
+			CPUUsedV, ok := CPUUsedMap[key]
+			if !ok {
+				klog.V(4).Info("no CPU usage for " + key)
+				CPUUsedV = []*Vector{}
+			}
 			RAMAllocsV, ok := RAMAllocMap[key]
 			if !ok {
 				klog.V(4).Info("no RAM allocation for " + key)
@@ -1870,6 +1977,10 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 				Namespace:       c.Namespace,
 				Services:        podServices,
 				Deployments:     podDeployments,
+				RAMReq:          RAMReqV,
+				RAMUsed:         RAMUsedV,
+				CPUReq:          CPUReqV,
+				CPUUsed:         CPUUsedV,
 				RAMAllocation:   RAMAllocsV,
 				CPUAllocation:   CPUAllocsV,
 				GPUReq:          GPUReqV,
@@ -1900,6 +2011,39 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	return containerNameCost, err
 }
 
+func applyAllocationToRequests(allocationMap map[string][]*Vector, requestMap map[string][]*Vector) {
+	// The result of the normalize operation will be a new []*Vector to replace the requests
+	normalizeOp := func(r *Vector, x *float64, y *float64) bool {
+		// Omit data (return false) if both x and y inputs don't exist
+		if x == nil || y == nil {
+			return false
+		}
+
+		// If the allocation value is 0, 0 out request value
+		if *x == 0 {
+			r.Value = 0
+		} else {
+			r.Value = *y
+		}
+
+		return true
+	}
+
+	// Run normalization on all request vectors in the mapping
+	for k, requests := range requestMap {
+
+		// Only run normalization where there are valid allocations
+		allocations, ok := allocationMap[k]
+		if !ok {
+			delete(requestMap, k)
+			continue
+		}
+
+		// Replace request map with normalized
+		requestMap[k] = ApplyVectorOp(allocations, requests, normalizeOp)
+	}
+}
+
 func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pvCostMap map[string]*costAnalyzerCloud.PV, cp costAnalyzerCloud.Provider) {
 	cfg, err := cp.GetConfig()
 	if err != nil {