AjayTripathy hace 6 años
padre
commit
714b667074
Se han modificado 4 ficheros con 137 adiciones y 25 borrados
  1. 73 2
      costmodel/aggregations.go
  2. 9 9
      costmodel/cluster.go
  3. 46 11
      costmodel/costmodel.go
  4. 9 3
      costmodel/router.go

+ 73 - 2
costmodel/aggregations.go

@@ -251,7 +251,7 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 			// It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
 			// It is not possible to score < 0% efficiency.
 
-			klog.V(1).Infof("\n\tlen(CPU allocation): %d\n\tlen(CPU requested): %d\n\tlen(CPU used): %d",
+			klog.V(4).Infof("\n\tlen(CPU allocation): %d\n\tlen(CPU requested): %d\n\tlen(CPU used): %d",
 				len(agg.CPUAllocationVectors),
 				len(agg.CPURequestedVectors),
 				len(agg.CPUUsedVectors))
@@ -266,7 +266,7 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 				agg.CPUEfficiency = 1.0 - CPUIdle
 			}
 
-			klog.V(1).Infof("\n\tlen(RAM allocation): %d\n\tlen(RAM requested): %d\n\tlen(RAM used): %d",
+			klog.V(4).Infof("\n\tlen(RAM allocation): %d\n\tlen(RAM requested): %d\n\tlen(RAM used): %d",
 				len(agg.RAMAllocationVectors),
 				len(agg.RAMRequestedVectors),
 				len(agg.RAMUsedVectors))
@@ -453,6 +453,77 @@ func roundTimestamp(ts float64, precision float64) float64 {
 	return math.Round(ts/precision) * precision
 }
 
+func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
+	// round all non-zero timestamps to the nearest 10 second mark
+	for _, yv := range yvs {
+		if yv.Timestamp != 0 {
+			yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
+		}
+	}
+	for _, xv := range xvs {
+		if xv.Timestamp != 0 {
+			xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
+		}
+	}
+
+	// if xvs is empty, return yvs
+	if xvs == nil || len(xvs) == 0 {
+		return yvs
+	}
+
+	// if yvs is empty, return xvs
+	if yvs == nil || len(yvs) == 0 {
+		return xvs
+	}
+
+	// sum stores the sum of the vector slices xvs and yvs
+	var sum []*Vector
+
+	// timestamps stores all timestamps present in both vector slices
+	// without duplicates
+	var timestamps []float64
+
+	// turn each vector slice into a map of timestamp-to-value so that
+	// values at equal timestamps can be lined-up and summed
+	xMap := make(map[float64]float64)
+	for _, xv := range xvs {
+		if xv.Timestamp == 0 {
+			continue
+		}
+		xMap[xv.Timestamp] = xv.Value
+		timestamps = append(timestamps, xv.Timestamp)
+	}
+	yMap := make(map[float64]float64)
+	for _, yv := range yvs {
+		if yv.Timestamp == 0 {
+			continue
+		}
+		yMap[yv.Timestamp] = yv.Value
+		if _, ok := xMap[yv.Timestamp]; !ok {
+			// no need to double add, since we'll range over sorted timestamps and check.
+			timestamps = append(timestamps, yv.Timestamp)
+		}
+	}
+
+	// iterate over each timestamp to produce a final summed vector slice
+	sort.Float64s(timestamps)
+	for _, t := range timestamps {
+		x, okX := xMap[t]
+		y, okY := yMap[t]
+		sv := &Vector{Timestamp: t}
+		if okX && okY && y != 0 {
+			sv.Value = x / y
+		} else if okX {
+			sv.Value = x
+		} else if okY {
+			sv.Value = 0
+		}
+		sum = append(sum, sv)
+	}
+
+	return sum
+}
+
 // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
 // nearest ten seconds to allow matching of Vectors within a delta allowance.
 // Matching Vectors are summed, while unmatched Vectors are passed through.

+ 9 - 9
costmodel/cluster.go

@@ -13,12 +13,12 @@ import (
 
 const (
 	queryClusterCores = `sum(
-		avg(kube_node_status_capacity_cpu_cores %s) by (node, cluster_id) * avg(node_cpu_hourly_cost %s) by (node, cluster_id) * 730 +
-		avg(node_gpu_hourly_cost %s) by (node, cluster_id) * 730
+		avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
+		avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
 	  ) by (cluster_id)`
 
 	queryClusterRAM = `sum(
-		avg(kube_node_status_capacity_memory_bytes %s) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node, cluster_id) * 730
+		avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
 	  ) by (cluster_id)`
 
 	queryStorage = `sum(
@@ -148,8 +148,8 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
 		offset = fmt.Sprintf("offset %s", offset)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, "")
 
 	klog.V(4).Infof("Running query %s", qCores)
@@ -222,8 +222,8 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 		offset = fmt.Sprintf("offset %s", offset)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
@@ -310,8 +310,8 @@ func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.P
 		offset = fmt.Sprintf("offset %s", offset)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 

+ 46 - 11
costmodel/costmodel.go

@@ -165,7 +165,7 @@ type PrometheusMetadata struct {
 
 // ValidatePrometheus tells the model what data prometheus has on it.
 func ValidatePrometheus(cli prometheusClient.Client) (*PrometheusMetadata, error) {
-	data, err := Query(cli, "up")
+	data, err := Query(cli, "up offset 3h")
 	if err != nil {
 		return &PrometheusMetadata{
 			Running:            false,
@@ -1238,9 +1238,9 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		pvCostResults, promErr = Query(cli, queryPVHourlyCost)
 		defer wg.Done()
 	}()
-	var normalizationResult interface{}
+	var normalizationResults interface{}
 	go func() {
-		normalizationResult, promErr = Query(cli, normalization)
+		normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
 		defer wg.Done()
 	}()
 
@@ -1277,7 +1277,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
 	}
 
-	normalizationValue, err := getNormalization(normalizationResult)
+	normalizationValue, err := getNormalizations(normalizationResults)
 	if err != nil {
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
@@ -1352,7 +1352,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, 0, clusterID) // No need to normalize here, as this comes from a counter
+	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, normalizationValue, clusterID) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
 	}
@@ -1539,7 +1539,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 			node, ok := nodes[c.NodeName]
 			if !ok {
-				klog.V(2).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
+				klog.V(4).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
 				if n, ok := missingNodes[c.NodeName]; ok {
 					node = n
 				} else {
@@ -2150,6 +2150,43 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 	return toReturn, nil
 }
 
+//todo: don't cast, implement unmarshaler interface
+func getNormalizations(qr interface{}) ([]*Vector, error) {
+	data, ok := qr.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(qr)
+		if err != nil {
+			return nil, err
+		}
+		return nil, fmt.Errorf(e)
+	}
+	results, ok := data.(map[string]interface{})["result"].([]interface{})
+	if !ok {
+		return nil, fmt.Errorf("Result field not found in normalization response, aborting")
+	}
+	if len(results) > 0 {
+		vectors := []*Vector{}
+		for i := range results {
+			klog.Infof("%+v", results[i])
+			values, ok := results[i].(map[string]interface{})["values"].([]interface{})
+			for _, d := range values {
+				dataPoint := d.([]interface{})
+				if !ok || len(dataPoint) != 2 {
+					return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+				}
+				strVal := dataPoint[1].(string)
+				v, _ := strconv.ParseFloat(strVal, 64)
+				vectors = append(vectors, &Vector{
+					Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
+					Value:     v,
+				})
+			}
+		}
+		return vectors, nil
+	}
+	return nil, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
+}
+
 //todo: don't cast, implement unmarshaler interface
 func getNormalization(qr interface{}) (float64, error) {
 	data, ok := qr.(map[string]interface{})["data"]
@@ -2332,7 +2369,7 @@ func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue
 	return containerData, nil
 }
 
-func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*Vector, error) {
+func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValues []*Vector, defaultClusterID string) (map[string][]*Vector, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)
@@ -2375,15 +2412,13 @@ func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValu
 			}
 			strVal := dataPoint[1].(string)
 			v, _ := strconv.ParseFloat(strVal, 64)
-			if normalize && normalizationValue != 0 {
-				v = v / normalizationValue
-			}
 			vectors = append(vectors, &Vector{
 				Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
 				Value:     v,
 			})
 		}
-		containerData[containerMetric.Key()] = vectors
+		normalizedVectors := NormalizeVectorByVector(vectors, normalizationValues)
+		containerData[containerMetric.Key()] = normalizedVectors
 	}
 	return containerData, nil
 }

+ 9 - 3
costmodel/router.go

@@ -330,6 +330,11 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 			endTime = endTime.Add(-1 * o)
 		}
 
+		if endTime.After(time.Now().Add(-3 * time.Hour)) {
+			klog.Infof("Setting end time backwards to first present data")
+			endTime = time.Now().Add(-3 * time.Hour)
+		}
+
 		// if window is defined in terms of days, convert to hours
 		// e.g. convert "2d" to "48h"
 		window, err := normalizeTimeParam(window)
@@ -396,9 +401,9 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		return
 	}
 
-	remoteAvailable := os.Getenv(remoteEnabled)
+	remoteAvailable := os.Getenv(remoteEnabled) == "true"
 	remoteEnabled := false
-	if remoteAvailable == "true" && remote != "false" {
+	if remoteAvailable && remote != "false" {
 		remoteEnabled = true
 	}
 
@@ -432,7 +437,8 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	idleCoefficients := make(map[string]float64)
 	if allocateIdle {
 		windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
-		if a.ThanosClient != nil && remoteEnabled {
+		if a.ThanosClient != nil {
+			klog.Infof("Setting offset to 3h")
 			offset = "3h"
 		}
 		idleCoefficients, err = ComputeIdleCoefficient(data, pClient, a.Cloud, discount, windowStr, offset)