Explorar o código

multicluster fixes

AjayTripathy %!s(int64=6) %!d(string=hai) anos
pai
achega
0cbc7c689d
Modificáronse 3 ficheiros con 30 adicións e 46 borrados
  1. 9 3
      costmodel/aggregations.go
  2. 16 39
      costmodel/cluster.go
  3. 5 4
      costmodel/router.go

+ 9 - 3
costmodel/aggregations.go

@@ -82,7 +82,9 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 	if err != nil {
 		return nil, err
 	}
-	aggregateContainerCosts := AggregateCostData(cp, costData, 0, "cluster", []string{}, "", false, discount, 1, nil)
+	tempCoefficient := make(map[string]float64)
+
+	aggregateContainerCosts := AggregateCostData(cp, costData, 0, "cluster", []string{}, "", false, discount, tempCoefficient, nil)
 	allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
 	if err != nil {
 		return nil, err
@@ -125,7 +127,7 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 
 // AggregateCostData reduces the dimensions of raw cost data by field and, optionally, by time. The field parameter determines the field
 // by which to group data, with an optional subfield, e.g. for groupings like field="label" and subfield="app" for grouping by "label.app".
-func AggregateCostData(cp cloud.Provider, costData map[string]*CostData, dataCount int64, field string, subfields []string, rate string, timeSeries bool, discount float64, idleCoefficient float64, sr *SharedResourceInfo) map[string]*Aggregation {
+func AggregateCostData(cp cloud.Provider, costData map[string]*CostData, dataCount int64, field string, subfields []string, rate string, timeSeries bool, discount float64, idleCoefficients map[string]float64, sr *SharedResourceInfo) map[string]*Aggregation {
 	// aggregations collects key-value pairs of resource group-to-aggregated data
 	// e.g. namespace-to-data or label-value-to-data
 	aggregations := make(map[string]*Aggregation)
@@ -135,6 +137,10 @@ func AggregateCostData(cp cloud.Provider, costData map[string]*CostData, dataCou
 	sharedResourceCost := 0.0
 
 	for _, costDatum := range costData {
+		idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
+		if !ok {
+			idleCoefficient = 1.0
+		}
 		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
 			cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
 			sharedResourceCost += totalVector(cpuv)
@@ -171,7 +177,7 @@ func AggregateCostData(cp cloud.Provider, costData map[string]*CostData, dataCou
 					}
 				}
 			} else if field == "pod" {
-				aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.PodName, discount, idleCoefficient)
+				aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, idleCoefficient)
 			}
 		}
 	}

+ 16 - 39
costmodel/cluster.go

@@ -18,7 +18,7 @@ const (
 	  ) by (cluster_id)`
 
 	queryClusterRAM = `sum(
-		avg(kube_node_status_capacity_memory_bytes %s) by (node) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node, cluster_id) * 730
+		avg(kube_node_status_capacity_memory_bytes %s) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node, cluster_id) * 730
 	  ) by (cluster_id)`
 
 	queryStorage = `sum(
@@ -89,7 +89,7 @@ func resultToTotal(qr interface{}) (map[string][][]string, error) {
 		if err != nil {
 			return nil, err
 		}
-		return nil, fmt.Errorf(e)
+		return nil, fmt.Errorf("Prometheus query error: %s", e)
 	}
 	r, ok := data.(map[string]interface{})["result"]
 	if !ok {
@@ -143,48 +143,34 @@ func resultToTotal(qr interface{}) (map[string][][]string, error) {
 
 // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
 func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
-	localStorageQuery, err := cloud.GetLocalStorageQuery()
-	if err != nil {
-		return nil, err
-	}
-	if localStorageQuery != "" {
-		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
-	}
 
-	// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
 	if offset != "" {
-		offset = fmt.Sprintf("offset %s", offset)
+		offset = fmt.Sprintf("offset 3h") // Set offset to 3h for block sync
 	}
 
 	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
 	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
-	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
-	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
+	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, "")
 
 	resultClusterCores, err := Query(cli, qCores)
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
 	}
 	resultClusterRAM, err := Query(cli, qRAM)
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
 	}
 
 	resultStorage, err := Query(cli, qStorage)
 	if err != nil {
-		return nil, err
-	}
-
-	resultTotal, err := Query(cli, qTotal)
-	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
 	}
 
 	toReturn := make(map[string]*Totals)
 
 	coreTotal, err := resultToTotal(resultClusterCores)
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
 	}
 	for clusterID, total := range coreTotal {
 		if _, ok := toReturn[clusterID]; !ok {
@@ -195,7 +181,7 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
 
 	ramTotal, err := resultToTotal(resultClusterRAM)
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
 	}
 	for clusterID, total := range ramTotal {
 		if _, ok := toReturn[clusterID]; !ok {
@@ -206,7 +192,7 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
 
 	storageTotal, err := resultToTotal(resultStorage)
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
 	}
 	for clusterID, total := range storageTotal {
 		if _, ok := toReturn[clusterID]; !ok {
@@ -215,17 +201,6 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
 		toReturn[clusterID].StorageCost = total
 	}
 
-	clusterTotal, err := resultToTotal(resultTotal)
-	if err != nil {
-		return nil, err
-	}
-	for clusterID, total := range clusterTotal {
-		if _, ok := toReturn[clusterID]; !ok {
-			toReturn[clusterID] = &Totals{}
-		}
-		toReturn[clusterID].TotalCost = total
-	}
-
 	return toReturn, nil
 }
 
@@ -289,11 +264,13 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 		return nil, err
 	}
 
+	defaultClusterID := os.Getenv(clusterIDKey)
+
 	return &Totals{
-		TotalCost:   clusterTotal,
-		CPUCost:     coreTotal,
-		MemCost:     ramTotal,
-		StorageCost: storageTotal,
+		TotalCost:   clusterTotal[defaultClusterID],
+		CPUCost:     coreTotal[defaultClusterID],
+		MemCost:     ramTotal[defaultClusterID],
+		StorageCost: storageTotal[defaultClusterID],
 	}, nil
 }
 

+ 5 - 4
costmodel/router.go

@@ -211,8 +211,8 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		// This assumes hourly data, incremented by one to capture the 0th data point.
 		dataCount := int64(dur.Hours()) + 1
 		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
-
-		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, 1.0, nil)
+		defaultIdleCoefficientMap := make(map[string]float64)
+		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, defaultIdleCoefficientMap, nil)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -417,7 +417,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	}
 	discount = discount * 0.01
 
-	idleCoefficient := 1.0
+	idleCoefficient := make(map[string]float64)
 	if allocateIdle {
 		windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
 		if a.ThanosClient != nil {
@@ -527,7 +527,8 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		dataCount := (int64(dur.Hours()) / windowHrs) + 1
 		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
 
-		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, 1.0, nil)
+		defaultIdleCoefficientMap := make(map[string]float64)
+		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, defaultIdleCoefficientMap, nil)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {