Просмотр исходного кода

Merge pull request #233 from kubecost/Bolt-fix-allocation-bug

Fixes for pod CPU and RAM allocation calculation
Matt Bolt 6 лет назад
Родитель
Сommit
132fcb3f27
1 измененных файлов с 70 добавлено и 166 удалено
  1. 70 166
      costmodel/costmodel.go

+ 70 - 166
costmodel/costmodel.go

@@ -158,6 +158,28 @@ const (
 						* 
 						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
 				sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id)`
+	queryRAMAllocation = `avg(
+		label_replace(
+			label_replace(
+				avg(
+					count_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s] %s) 
+					*  
+					avg_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s] %s)
+				) by (namespace,container,pod,node,cluster_id) , "container_name","$1","container","(.+)"
+			), "pod_name","$1","pod","(.+)"
+		) 
+	) by (namespace,container_name,pod_name,node,cluster_id)`
+	queryCPUAllocation = `avg(
+		label_replace(
+			label_replace(
+				avg(
+					count_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s] %s) 
+					*  
+					avg_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s] %s)
+				) by (namespace,container,pod,node,cluster_id) , "container_name","$1","container","(.+)"
+			), "pod_name","$1","pod","(.+)"
+		) 
+	) by (namespace,container_name,pod_name,node,cluster_id)`
 	queryPVCAllocation        = `avg_over_time(pod_pvc_allocation[%s])`
 	queryPVHourlyCost         = `avg_over_time(pv_hourly_cost[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
@@ -1343,10 +1365,8 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 	startString, endString, windowString string, filterNamespace string, filterCluster string, remoteEnabled bool) (map[string]*CostData, error) {
-	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
-	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
-	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
-	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
+	queryRAMAlloc := fmt.Sprintf(queryRAMAllocation, windowString, "", windowString, "")
+	queryCPUAlloc := fmt.Sprintf(queryCPUAllocation, windowString, "", windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "")
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
@@ -1382,32 +1402,20 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	}
 
 	var wg sync.WaitGroup
-	wg.Add(17)
+	wg.Add(15)
 
 	var promErr error
-	var resultRAMRequests interface{}
+	var resultRAMAllocations interface{}
 	go func() {
 		defer wg.Done()
 
-		resultRAMRequests, promErr = QueryRange(cli, queryRAMRequests, start, end, window)
+		resultRAMAllocations, promErr = QueryRange(cli, queryRAMAlloc, start, end, window)
 	}()
-	var resultRAMUsage interface{}
+	var resultCPUAllocations interface{}
 	go func() {
 		defer wg.Done()
 
-		resultRAMUsage, promErr = QueryRange(cli, queryRAMUsage, start, end, window)
-	}()
-	var resultCPURequests interface{}
-	go func() {
-		defer wg.Done()
-
-		resultCPURequests, promErr = QueryRange(cli, queryCPURequests, start, end, window)
-	}()
-	var resultCPUUsage interface{}
-	go func() {
-		defer wg.Done()
-
-		resultCPUUsage, promErr = QueryRange(cli, queryCPUUsage, start, end, window)
+		resultCPUAllocations, promErr = QueryRange(cli, queryCPUAlloc, start, end, window)
 	}()
 	var resultGPURequests interface{}
 	go func() {
@@ -1597,26 +1605,18 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	containers := make(map[string]bool)
 	otherClusterPVRecorded := make(map[string]bool)
 
-	RAMReqMap, err := GetContainerMetricVectors(resultRAMRequests, true, normalizationValue, clusterID)
+	RAMAllocMap, err := GetContainerMetricVectors(resultRAMAllocations, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
-	for key := range RAMReqMap {
+	for key := range RAMAllocMap {
 		containers[key] = true
 	}
-
-	RAMUsedMap, err := GetContainerMetricVectors(resultRAMUsage, true, normalizationValue, clusterID)
+	CPUAllocMap, err := GetContainerMetricVectors(resultCPUAllocations, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
-	for key := range RAMUsedMap {
-		containers[key] = true
-	}
-	CPUReqMap, err := GetContainerMetricVectors(resultCPURequests, true, normalizationValue, clusterID)
-	if err != nil {
-		return nil, err
-	}
-	for key := range CPUReqMap {
+	for key := range CPUAllocMap {
 		containers[key] = true
 	}
 	GPUReqMap, err := GetContainerMetricVectors(resultGPURequests, true, normalizationValue, clusterID)
@@ -1626,13 +1626,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, normalizationValue, clusterID) // No need to normalize here, as this comes from a counter
-	if err != nil {
-		return nil, err
-	}
-	for key := range CPUUsedMap {
-		containers[key] = true
-	}
+
 	currentContainers := make(map[string]v1.Pod)
 	for _, pod := range podlist {
 		if pod.Status.Phase != v1.PodRunning {
@@ -1718,32 +1712,21 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 				containerName := container.Name
 
 				newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
-
-				RAMReqV, ok := RAMReqMap[newKey]
+				RAMAllocsV, ok := RAMAllocMap[newKey]
 				if !ok {
-					klog.V(4).Info("no RAM requests for " + newKey)
-					RAMReqV = []*Vector{}
+					klog.V(4).Info("no RAM allocation for " + newKey)
+					RAMAllocsV = []*Vector{}
 				}
-				RAMUsedV, ok := RAMUsedMap[newKey]
+				CPUAllocsV, ok := CPUAllocMap[newKey]
 				if !ok {
-					klog.V(4).Info("no RAM usage for " + newKey)
-					RAMUsedV = []*Vector{}
-				}
-				CPUReqV, ok := CPUReqMap[newKey]
-				if !ok {
-					klog.V(4).Info("no CPU requests for " + newKey)
-					CPUReqV = []*Vector{}
+					klog.V(4).Info("no CPU allocation for " + newKey)
+					CPUAllocsV = []*Vector{}
 				}
 				GPUReqV, ok := GPUReqMap[newKey]
 				if !ok {
 					klog.V(4).Info("no GPU requests for " + newKey)
 					GPUReqV = []*Vector{}
 				}
-				CPUUsedV, ok := CPUUsedMap[newKey]
-				if !ok {
-					klog.V(4).Info("no CPU usage for " + newKey)
-					CPUUsedV = []*Vector{}
-				}
 
 				var pvReq []*PersistentVolumeClaimData
 				var netReq []*Vector
@@ -1763,10 +1746,8 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					Jobs:            getJobsOfPod(pod),
 					Statefulsets:    getStatefulSetsOfPod(pod),
 					NodeData:        nodeData,
-					RAMReq:          RAMReqV,
-					RAMUsed:         RAMUsedV,
-					CPUReq:          CPUReqV,
-					CPUUsed:         CPUUsedV,
+					RAMAllocation:   RAMAllocsV,
+					CPUAllocation:   CPUAllocsV,
 					GPUReq:          GPUReqV,
 					PVCData:         pvReq,
 					Labels:          podLabels,
@@ -1774,8 +1755,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 					NamespaceLabels: nsLabels,
 					ClusterID:       clusterID,
 				}
-				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
-				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
 
 				if costDataPassesFilters(costs, filterNamespace, filterCluster) {
 					containerNameCost[newKey] = costs
@@ -1787,31 +1766,22 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			// Not all information is sent to prometheus via ksm, so fill out what we can without k8s api
 			klog.V(4).Info("The container " + key + " has been deleted. Calculating allocation but resulting object will be missing data.")
 			c, _ := NewContainerMetricFromKey(key)
-			RAMReqV, ok := RAMReqMap[key]
-			if !ok {
-				klog.V(4).Info("no RAM requests for " + key)
-				RAMReqV = []*Vector{}
-			}
-			RAMUsedV, ok := RAMUsedMap[key]
+
+			RAMAllocsV, ok := RAMAllocMap[key]
 			if !ok {
-				klog.V(4).Info("no RAM usage for " + key)
-				RAMUsedV = []*Vector{}
+				klog.V(4).Info("no RAM allocation for " + key)
+				RAMAllocsV = []*Vector{}
 			}
-			CPUReqV, ok := CPUReqMap[key]
+			CPUAllocsV, ok := CPUAllocMap[key]
 			if !ok {
-				klog.V(4).Info("no CPU requests for " + key)
-				CPUReqV = []*Vector{}
+				klog.V(4).Info("no CPU allocation for " + key)
+				CPUAllocsV = []*Vector{}
 			}
 			GPUReqV, ok := GPUReqMap[key]
 			if !ok {
 				klog.V(4).Info("no GPU requests for " + key)
 				GPUReqV = []*Vector{}
 			}
-			CPUUsedV, ok := CPUUsedMap[key]
-			if !ok {
-				klog.V(4).Info("no CPU usage for " + key)
-				CPUUsedV = []*Vector{}
-			}
 
 			node, ok := nodes[c.NodeName]
 			if !ok {
@@ -1899,10 +1869,8 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 				Namespace:       c.Namespace,
 				Services:        podServices,
 				Deployments:     podDeployments,
-				RAMReq:          RAMReqV,
-				RAMUsed:         RAMUsedV,
-				CPUReq:          CPUReqV,
-				CPUUsed:         CPUUsedV,
+				RAMAllocation:   RAMAllocsV,
+				CPUAllocation:   CPUAllocsV,
 				GPUReq:          GPUReqV,
 				Labels:          pLabels,
 				NamespaceLabels: namespaceLabels,
@@ -1910,8 +1878,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 				NetworkData:     podNetCosts,
 				ClusterID:       c.ClusterID,
 			}
-			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
-			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
 
 			if costDataPassesFilters(costs, filterNamespace, filterCluster) {
 				containerNameCost[key] = costs
@@ -2515,104 +2481,42 @@ func NewKeyTuple(key string) (*KeyTuple, error) {
 }
 
 func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*Vector, error) {
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-	r, ok := data.(map[string]interface{})["result"]
-	if !ok {
-		return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
-	}
-	results, ok := r.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
+	result, err := NewQueryResults(qr)
+	if err != nil {
+		return nil, err
 	}
+
 	containerData := make(map[string][]*Vector)
-	for _, val := range results {
-		metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Prometheus vector does not have metric labels")
-		}
-		containerMetric, err := newContainerMetricFromPrometheus(metric, defaultClusterID)
+	for _, val := range result {
+		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}
-		value, ok := val.(map[string]interface{})["value"]
-		if !ok {
-			return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
-		}
-		dataPoint, ok := value.([]interface{})
-		if !ok || len(dataPoint) != 2 {
-			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-		}
-		strVal := dataPoint[1].(string)
-		v, _ := strconv.ParseFloat(strVal, 64)
+
 		if normalize && normalizationValue != 0 {
-			v = v / normalizationValue
-		}
-		toReturn := &Vector{
-			Timestamp: dataPoint[0].(float64),
-			Value:     v,
+			for _, v := range val.Values {
+				v.Value = v.Value / normalizationValue
+			}
 		}
-		klog.V(4).Info("key: " + containerMetric.Key())
-		containerData[containerMetric.Key()] = []*Vector{toReturn}
+		containerData[containerMetric.Key()] = val.Values
 	}
 	return containerData, nil
 }
 
 func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValues []*Vector, defaultClusterID string) (map[string][]*Vector, error) {
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-	r, ok := data.(map[string]interface{})["result"]
-	if !ok {
-		return nil, fmt.Errorf("Improperly formatted data from prometheus, data has no result field")
-	}
-	results, ok := r.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Improperly formatted results from prometheus, result field is not a slice")
+	result, err := NewQueryResults(qr)
+	if err != nil {
+		return nil, err
 	}
+
 	containerData := make(map[string][]*Vector)
-	for _, val := range results {
-		metric, ok := val.(map[string]interface{})["metric"].(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Prometheus vector does not have metric labels")
-		}
-		containerMetric, err := newContainerMetricFromPrometheus(metric, defaultClusterID)
+	for _, val := range result {
+		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}
-		vs, ok := val.(map[string]interface{})["values"]
-		if !ok {
-			return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a field in the vector")
-		}
-		values, ok := vs.([]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Improperly formatted results from prometheus, values is not a slice")
-		}
-		var vectors []*Vector
-		for _, value := range values {
-			dataPoint, ok := value.([]interface{})
-			if !ok || len(dataPoint) != 2 {
-				return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-			}
-			strVal := dataPoint[1].(string)
-			v, _ := strconv.ParseFloat(strVal, 64)
-			vectors = append(vectors, &Vector{
-				Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
-				Value:     v,
-			})
-		}
-		normalizedVectors := NormalizeVectorByVector(vectors, normalizationValues)
+
+		normalizedVectors := NormalizeVectorByVector(val.Values, normalizationValues)
 		containerData[containerMetric.Key()] = normalizedVectors
 	}
 	return containerData, nil