Преглед изворни кода

Merge pull request #199 from kubecost/Bolt-pvc-multi-fix

PV Metrics for Multi-Cluster
Ajay Tripathy пре 6 година
родитељ
комит
3edbc92497
1 измењених фајлова са 216 додато и 1 уклоњено
  1. 216 1
      costmodel/costmodel.go

+ 216 - 1
costmodel/costmodel.go

@@ -150,6 +150,8 @@ const (
 						* 
 						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
 				sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id)`
+	queryPVCAllocation        = `avg_over_time(pod_pvc_allocation[24h])`
+	queryPVHourlyCost         = `avg_over_time(pv_hourly_cost[24h])`
 	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
@@ -626,6 +628,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 			if !ok {
 				klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
 			}
+
 			costs := &CostData{
 				Name:            c.ContainerName,
 				PodName:         c.PodName,
@@ -1177,7 +1180,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	}
 
 	var wg sync.WaitGroup
-	wg.Add(11)
+	wg.Add(13)
 
 	var promErr error
 	var resultRAMRequests interface{}
@@ -1225,6 +1228,16 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		resultNetInternetRequests, promErr = QueryRange(cli, queryNetInternetRequests, start, end, window)
 		defer wg.Done()
 	}()
+	var pvPodAllocationResults interface{}
+	go func() {
+		pvPodAllocationResults, promErr = Query(cli, queryPVCAllocation)
+		defer wg.Done()
+	}()
+	var pvCostResults interface{}
+	go func() {
+		pvCostResults, promErr = Query(cli, queryPVHourlyCost)
+		defer wg.Done()
+	}()
 	var normalizationResult interface{}
 	go func() {
 		normalizationResult, promErr = Query(cli, normalization)
@@ -1287,6 +1300,19 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		}
 	}
 
+	pvCostMapping, err := getPVCostMetrics(pvCostResults)
+	if err != nil {
+		klog.V(1).Infof("Unable to get PV Hourly Cost Data: %s", err.Error())
+	}
+
+	pvAllocationMapping, err := getPVAllocationMetrics(pvPodAllocationResults)
+	if err != nil {
+		klog.V(1).Infof("Unable to get PV Allocation Cost Data: %s", err.Error())
+	}
+	if pvAllocationMapping != nil {
+		addMetricPVData(pvAllocationMapping, pvCostMapping, cp)
+	}
+
 	networkUsageMap, err := GetNetworkUsageData(resultNetZoneRequests, resultNetRegionRequests, resultNetInternetRequests, clusterID, true)
 	if err != nil {
 		klog.V(1).Infof("Unable to get Network Cost Data: %s", err.Error())
@@ -1523,6 +1549,27 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 			if !ok {
 				klog.V(3).Infof("Missing data for namespace %s", c.Namespace)
 			}
+
+			// For PVC data, we'll need to find the claim mapping and cost data. Will need to append
+			// cost data since that was populated by cluster data previously. We do this with
+			// the pod_pvc_allocation metric
+			podPVs, ok := pvAllocationMapping[c.Namespace+","+c.PodName+","+c.ClusterID]
+			if !ok {
+				klog.V(3).Infof("Failed to locate pv allocation mapping for missing pod.")
+			}
+
+			// For network costs, we'll use existing map since it should still contain the
+			// correct data.
+			var podNetCosts []*Vector
+			if usage, ok := networkUsageMap[c.Namespace+","+c.PodName+","+c.ClusterID]; ok {
+				netCosts, err := GetNetworkCost(usage, cp)
+				if err != nil {
+					klog.V(3).Infof("Error pulling network costs: %s", err.Error())
+				} else {
+					podNetCosts = netCosts
+				}
+			}
+
 			costs := &CostData{
 				Name:            c.ContainerName,
 				PodName:         c.PodName,
@@ -1535,6 +1582,8 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
+				PVCData:         podPVs,
+				NetworkData:     podNetCosts,
 				ClusterID:       c.ClusterID,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
@@ -1564,6 +1613,172 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	return containerNameCost, err
 }
 
+func parseStringField(metricMap map[string]interface{}, field string) (string, error) {
+	f, ok := metricMap[field]
+	if !ok {
+		return "", fmt.Errorf("%s field does not exist in data result vector", field)
+	}
+
+	strField, ok := f.(string)
+	if !ok {
+		return "", fmt.Errorf("%s field is improperly formatted", field)
+	}
+
+	return strField, nil
+}
+
+func getPVAllocationMetrics(queryResult interface{}) (map[string][]*PersistentVolumeClaimData, error) {
+	toReturn := make(map[string][]*PersistentVolumeClaimData)
+	data, ok := queryResult.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(queryResult)
+		if err != nil {
+			return toReturn, err
+		}
+		return toReturn, fmt.Errorf(e)
+	}
+
+	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
+		metricInterface, ok := val.(map[string]interface{})["metric"]
+		if !ok {
+			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return toReturn, fmt.Errorf("Metric field is improperly formatted")
+		}
+
+		clusterID, err := parseStringField(metricMap, "cluster_id")
+		if err != nil {
+			return toReturn, err
+		}
+
+		ns, err := parseStringField(metricMap, "namespace")
+		if err != nil {
+			return toReturn, err
+		}
+
+		pod, err := parseStringField(metricMap, "pod")
+		if err != nil {
+			return toReturn, err
+		}
+
+		pvcName, err := parseStringField(metricMap, "persistentvolumeclaim")
+		if err != nil {
+			return toReturn, err
+		}
+
+		pvName, err := parseStringField(metricMap, "persistentvolume")
+		if err != nil {
+			return toReturn, err
+		}
+
+		dataPoint, ok := val.(map[string]interface{})["value"]
+		if !ok {
+			return nil, fmt.Errorf("Value field does not exist in data result vector")
+		}
+		value, ok := dataPoint.([]interface{})
+		if !ok || len(value) != 2 {
+			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		}
+		var vectors []*Vector
+		strVal := value[1].(string)
+		v, _ := strconv.ParseFloat(strVal, 64)
+
+		vectors = append(vectors, &Vector{
+			Timestamp: value[0].(float64),
+			Value:     v,
+		})
+
+		key := fmt.Sprintf("%s,%s,%s", ns, pod, clusterID)
+		pvcData := &PersistentVolumeClaimData{
+			Class:      "",
+			Claim:      pvcName,
+			Namespace:  ns,
+			ClusterID:  clusterID,
+			VolumeName: pvName,
+			Values:     vectors,
+		}
+
+		toReturn[key] = append(toReturn[key], pvcData)
+	}
+
+	return toReturn, nil
+}
+
+func getPVCostMetrics(queryResult interface{}) (map[string]*costAnalyzerCloud.PV, error) {
+	toReturn := make(map[string]*costAnalyzerCloud.PV)
+	data, ok := queryResult.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(queryResult)
+		if err != nil {
+			return toReturn, err
+		}
+		return toReturn, fmt.Errorf(e)
+	}
+
+	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
+		metricInterface, ok := val.(map[string]interface{})["metric"]
+		if !ok {
+			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return toReturn, fmt.Errorf("Metric field is improperly formatted")
+		}
+
+		clusterID, err := parseStringField(metricMap, "cluster_id")
+		if err != nil {
+			return toReturn, err
+		}
+
+		volumeName, err := parseStringField(metricMap, "volumename")
+		if err != nil {
+			return toReturn, err
+		}
+
+		dataPoint, ok := val.(map[string]interface{})["value"]
+		if !ok {
+			return toReturn, fmt.Errorf("Value field does not exist in data result vector")
+		}
+		value, ok := dataPoint.([]interface{})
+		if !ok || len(value) != 2 {
+			return toReturn, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		}
+
+		key := fmt.Sprintf("%s,%s", volumeName, clusterID)
+		toReturn[key] = &costAnalyzerCloud.PV{
+			Cost: value[1].(string),
+		}
+	}
+
+	return toReturn, nil
+}
+
+func addMetricPVData(pvAllocationMap map[string][]*PersistentVolumeClaimData, pvCostMap map[string]*costAnalyzerCloud.PV, cp costAnalyzerCloud.Provider) {
+	cfg, err := cp.GetConfig()
+	if err != nil {
+		klog.V(1).Infof("Failed to get provider config while adding pv metrics data.")
+		return
+	}
+
+	for _, pvcDataArray := range pvAllocationMap {
+		for _, pvcData := range pvcDataArray {
+			costKey := fmt.Sprintf("%s,%s", pvcData.VolumeName, pvcData.ClusterID)
+
+			pvCost, ok := pvCostMap[costKey]
+			if !ok {
+				pvcData.Volume = &costAnalyzerCloud.PV{
+					Cost: cfg.Storage,
+				}
+				continue
+			}
+
+			pvcData.Volume = pvCost
+		}
+	}
+}
+
 func getNamespaceLabels(cache ClusterCache) (map[string]map[string]string, error) {
 	nsToLabels := make(map[string]map[string]string)
 	nss := cache.GetAllNamespaces()