Просмотр исходного кода

Update PVC cost queries to use single query tables for lookups rather than query using custom parameters.

Matt Bolt 6 лет назад
Родитель
Сommit
5e05577910
1 измененных файлов с 120 добавлено и 76 удалено
  1. 120 76
      costmodel/costmodel.go

+ 120 - 76
costmodel/costmodel.go

@@ -143,8 +143,8 @@ const (
 						* 
 						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
 				sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id)`
-	queryPVCAllocationStr     = `avg_over_time(pod_pvc_allocation{cluster_id="%s", namespace="%s", pod="%s"}[24h])`
-	queryPVHourlyCostStr      = `avg_over_time(pv_hourly_cost{cluster_id="%s", volumename="%s"}[24h])`
+	queryPVCAllocation        = `avg_over_time(pod_pvc_allocation[24h])`
+	queryPVHourlyCost         = `avg_over_time(pv_hourly_cost[24h])`
 	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
@@ -1173,7 +1173,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	}
 
 	var wg sync.WaitGroup
-	wg.Add(11)
+	wg.Add(13)
 
 	var promErr error
 	var resultRAMRequests interface{}
@@ -1221,6 +1221,16 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
 		defer wg.Done()
 	}()
+	var pvPodAllocationResults interface{}
+	go func() {
+		pvPodAllocationResults, promErr = Query(cli, queryPVCAllocation)
+		defer wg.Done()
+	}()
+	var pvCostResults interface{}
+	go func() {
+		pvCostResults, promErr = Query(cli, queryPVHourlyCost)
+		defer wg.Done()
+	}()
 	var normalizationResult interface{}
 	go func() {
 		normalizationResult, promErr = Query(cli, normalization)
@@ -1283,6 +1293,19 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		}
 	}
 
+	pvCostMapping, err := getPVCostMetrics(pvCostResults)
+	if err != nil {
+		klog.V(1).Infof("Unable to get PV Hourly Cost Data: %s", err.Error())
+	}
+
+	pvAllocationMapping, err := getPVAllocationMetrics(pvPodAllocationResults)
+	if err != nil {
+		klog.V(1).Infof("Unable to get PV Allocation Cost Data: %s", err.Error())
+	}
+	if pvAllocationMapping != nil {
+		addMetricPVData(pvAllocationMapping, pvCostMapping, cp)
+	}
+
 	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID, true)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Network Cost Data: %s", err.Error())
@@ -1523,12 +1546,9 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			// For PVC data, we'll need to find the claim mapping and cost data. Will need to append
 			// cost data since that was populated by cluster data previously. We do this with
 			// the pod_pvc_allocation metric
-			podPVs, err := findPVCData(cli, c.ClusterID, c.Namespace, c.PodName)
-			if err != nil {
-				klog.V(1).Infof("Failed to locate missing pod PV data: %s", err.Error())
-			}
-			if podPVs != nil {
-				addMetricPVData(cli, c.ClusterID, podPVs)
+			podPVs, ok := pvAllocationMapping[c.Namespace+","+c.PodName+","+c.ClusterID]
+			if !ok {
+				klog.V(3).Infof("Failed to locate pv allocation mapping for missing pod.")
 			}
 
 			// For network costs, we'll use existing map since it should still contain the
@@ -1586,17 +1606,25 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return containerNameCost, err
 }
 
-func findPVCData(cli prometheusClient.Client, clusterID string, ns string, pod string) ([]*PersistentVolumeClaimData, error) {
-	var toReturn []*PersistentVolumeClaimData
-	queryPVCData := fmt.Sprintf(queryPVCAllocationStr, clusterID, ns, pod)
-	pvcResult, err := Query(cli, queryPVCData)
-	if err != nil {
-		return toReturn, err
+func parseStringField(metricMap map[string]interface{}, field string) (string, error) {
+	f, ok := metricMap[field]
+	if !ok {
+		return "", fmt.Errorf("%s field does not exist in data result vector", field)
 	}
 
-	data, ok := pvcResult.(map[string]interface{})["data"]
+	strField, ok := f.(string)
+	if !ok {
+		return "", fmt.Errorf("%s field is improperly formatted", field)
+	}
+
+	return strField, nil
+}
+
+func getPVAllocationMetrics(queryResult interface{}) (map[string][]*PersistentVolumeClaimData, error) {
+	toReturn := make(map[string][]*PersistentVolumeClaimData)
+	data, ok := queryResult.(map[string]interface{})["data"]
 	if !ok {
-		e, err := wrapPrometheusError(pvcResult)
+		e, err := wrapPrometheusError(queryResult)
 		if err != nil {
 			return toReturn, err
 		}
@@ -1613,30 +1641,29 @@ func findPVCData(cli prometheusClient.Client, clusterID string, ns string, pod s
 			return toReturn, fmt.Errorf("Metric field is improperly formatted")
 		}
 
-		pvcField, ok := metricMap["persistentvolumeclaim"]
-		if !ok {
-			return toReturn, fmt.Errorf("persistentvolumeclaim field does not exist in data result vector")
+		clusterID, err := parseStringField(metricMap, "cluster_id")
+		if err != nil {
+			return toReturn, err
 		}
 
-		pvcName, ok := pvcField.(string)
-		if !ok {
-			return toReturn, fmt.Errorf("persistentvolumeclaim field is improperly formatted")
+		ns, err := parseStringField(metricMap, "namespace")
+		if err != nil {
+			return toReturn, err
 		}
 
-		// TODO: Could we use original pvClaimMapping to simplify this?
-		// pvClaimData, ok := pvClaimMapping[ns+pvcName+clusterID]
-		// if ok {
-		// 	toReturn = append(toReturn, pvClaimData)
-		// 	continue
-		// }
+		pod, err := parseStringField(metricMap, "pod")
+		if err != nil {
+			return toReturn, err
+		}
 
-		pvField, ok := metricMap["persistentvolume"]
-		if !ok {
-			return toReturn, fmt.Errorf("persistentvolume field does not exist in data result vector")
+		pvcName, err := parseStringField(metricMap, "persistentvolumeclaim")
+		if err != nil {
+			return toReturn, err
 		}
-		pvName, ok := pvField.(string)
-		if !ok {
-			return toReturn, fmt.Errorf("persistentvolume field is improperly formatted")
+
+		pvName, err := parseStringField(metricMap, "persistentvolume")
+		if err != nil {
+			return toReturn, err
 		}
 
 		dataPoint, ok := val.(map[string]interface{})["value"]
@@ -1656,6 +1683,7 @@ func findPVCData(cli prometheusClient.Client, clusterID string, ns string, pod s
 			Value:     v,
 		})
 
+		key := fmt.Sprintf("%s,%s,%s", ns, pod, clusterID)
 		pvcData := &PersistentVolumeClaimData{
 			Class:      "",
 			Claim:      pvcName,
@@ -1665,67 +1693,83 @@ func findPVCData(cli prometheusClient.Client, clusterID string, ns string, pod s
 			Values:     vectors,
 		}
 
-		toReturn = append(toReturn, pvcData)
+		toReturn[key] = append(toReturn[key], pvcData)
 	}
 
 	return toReturn, nil
 }
 
-func addMetricPVData(cli prometheusClient.Client, clusterID string, pvData []*PersistentVolumeClaimData) {
-	for _, pvcd := range pvData {
-		hourlyCost, err := getMetricPVCost(cli, clusterID, pvcd.VolumeName)
+func getPVCostMetrics(queryResult interface{}) (map[string]*costAnalyzerCloud.PV, error) {
+	toReturn := make(map[string]*costAnalyzerCloud.PV)
+	data, ok := queryResult.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(queryResult)
 		if err != nil {
-			klog.V(1).Infof("Failed to parse hourly cost metric for clusterID: %s, volume: %s, Error: %s", clusterID, pvcd.VolumeName, err.Error())
-			continue
+			return toReturn, err
 		}
+		return toReturn, fmt.Errorf(e)
+	}
 
-		pvcd.Volume = &costAnalyzerCloud.PV{
-			Cost: fmt.Sprintf("%f", hourlyCost),
+	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
+		metricInterface, ok := val.(map[string]interface{})["metric"]
+		if !ok {
+			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return toReturn, fmt.Errorf("Metric field is improperly formatted")
 		}
-	}
-}
 
-func getMetricPVCost(cli prometheusClient.Client, clusterID string, volumeName string) (float64, error) {
-	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostStr, clusterID, volumeName)
-	pvCostResult, err := Query(cli, queryPVHourlyCost)
-	if err != nil {
-		return 0, err
-	}
+		clusterID, err := parseStringField(metricMap, "cluster_id")
+		if err != nil {
+			return toReturn, err
+		}
 
-	data, ok := pvCostResult.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(pvCostResult)
+		volumeName, err := parseStringField(metricMap, "volumename")
 		if err != nil {
-			return 0, err
+			return toReturn, err
 		}
-		return 0, fmt.Errorf(e)
-	}
 
-	resultArray, ok := data.(map[string]interface{})["result"].([]interface{})
-	if !ok {
-		return 0, fmt.Errorf("Result field not part of data")
-	}
+		dataPoint, ok := val.(map[string]interface{})["value"]
+		if !ok {
+			return toReturn, fmt.Errorf("Value field does not exist in data result vector")
+		}
+		value, ok := dataPoint.([]interface{})
+		if !ok || len(value) != 2 {
+			return toReturn, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		}
 
-	if len(resultArray) == 0 {
-		return 0, fmt.Errorf("Result array was empty")
+		key := fmt.Sprintf("%s,%s", volumeName, clusterID)
+		toReturn[key] = &costAnalyzerCloud.PV{
+			Cost: value[1].(string),
+		}
 	}
 
-	result := resultArray[0]
-	dataPoint, ok := result.(map[string]interface{})["value"]
-	if !ok {
-		return 0, fmt.Errorf("Value field does not exist in data result vector")
-	}
-	value, ok := dataPoint.([]interface{})
-	if !ok || len(value) != 2 {
-		return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-	}
+	return toReturn, nil
+}
 
-	strVal, ok := value[1].(string)
-	if !ok {
-		return 0, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pvCostMap map[string]*costAnalyzerCloud.PV, cp costAnalyzerCloud.Provider) {
+	cfg, err := cp.GetConfig()
+	if err != nil {
+		klog.V(1).Infof("Failed to get provider config while adding pv metrics data.")
+		return
 	}
 
-	return strconv.ParseFloat(strVal, 64)
+	for _, pvcDataArray := range pvAllocationMap {
+		for _, pvcData := range pvcDataArray {
+			costKey := fmt.Sprintf("%s,%s", pvcData.VolumeName, pvcData.ClusterID)
+
+			pvCost, ok := pvCostMap[costKey]
+			if !ok {
+				pvcData.Volume = &costAnalyzerCloud.PV{
+					Cost: cfg.Storage,
+				}
+				continue
+			}
+
+			pvcData.Volume = pvCost
+		}
+	}
 }
 
 func getNamespaceLabels(cache ClusterCache) (map[string]map[string]string, error) {