Explorar el Código

Merge pull request #208 from kubecost/AjayTripathy-fix-thanos

Ajay tripathy fix thanos
Ajay Tripathy hace 6 años
padre
commit
03f27851e9
Se han modificado 4 ficheros con 139 adiciones y 26 borrados
  1. 73 2
      costmodel/aggregations.go
  2. 12 10
      costmodel/cluster.go
  3. 46 11
      costmodel/costmodel.go
  4. 8 3
      costmodel/router.go

+ 73 - 2
costmodel/aggregations.go

@@ -251,7 +251,7 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 			// It is possible to score > 100% efficiency, which is meant to be interpreted as a red flag.
 			// It is not possible to score < 0% efficiency.
 
-			klog.V(1).Infof("\n\tlen(CPU allocation): %d\n\tlen(CPU requested): %d\n\tlen(CPU used): %d",
+			klog.V(4).Infof("\n\tlen(CPU allocation): %d\n\tlen(CPU requested): %d\n\tlen(CPU used): %d",
 				len(agg.CPUAllocationVectors),
 				len(agg.CPURequestedVectors),
 				len(agg.CPUUsedVectors))
@@ -266,7 +266,7 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 				agg.CPUEfficiency = 1.0 - CPUIdle
 			}
 
-			klog.V(1).Infof("\n\tlen(RAM allocation): %d\n\tlen(RAM requested): %d\n\tlen(RAM used): %d",
+			klog.V(4).Infof("\n\tlen(RAM allocation): %d\n\tlen(RAM requested): %d\n\tlen(RAM used): %d",
 				len(agg.RAMAllocationVectors),
 				len(agg.RAMRequestedVectors),
 				len(agg.RAMUsedVectors))
@@ -453,6 +453,77 @@ func roundTimestamp(ts float64, precision float64) float64 {
 	return math.Round(ts/precision) * precision
 }
 
+func NormalizeVectorByVector(xvs []*Vector, yvs []*Vector) []*Vector {
+	// round all non-zero timestamps to the nearest 10 second mark
+	for _, yv := range yvs {
+		if yv.Timestamp != 0 {
+			yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
+		}
+	}
+	for _, xv := range xvs {
+		if xv.Timestamp != 0 {
+			xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
+		}
+	}
+
+	// if xvs is empty, return yvs
+	if xvs == nil || len(xvs) == 0 {
+		return yvs
+	}
+
+	// if yvs is empty, return xvs
+	if yvs == nil || len(yvs) == 0 {
+		return xvs
+	}
+
+	// sum stores the sum of the vector slices xvs and yvs
+	var sum []*Vector
+
+	// timestamps stores all timestamps present in both vector slices
+	// without duplicates
+	var timestamps []float64
+
+	// turn each vector slice into a map of timestamp-to-value so that
+	// values at equal timestamps can be lined-up and summed
+	xMap := make(map[float64]float64)
+	for _, xv := range xvs {
+		if xv.Timestamp == 0 {
+			continue
+		}
+		xMap[xv.Timestamp] = xv.Value
+		timestamps = append(timestamps, xv.Timestamp)
+	}
+	yMap := make(map[float64]float64)
+	for _, yv := range yvs {
+		if yv.Timestamp == 0 {
+			continue
+		}
+		yMap[yv.Timestamp] = yv.Value
+		if _, ok := xMap[yv.Timestamp]; !ok {
+			// no need to double add, since we'll range over sorted timestamps and check.
+			timestamps = append(timestamps, yv.Timestamp)
+		}
+	}
+
+	// iterate over each timestamp to produce a final summed vector slice
+	sort.Float64s(timestamps)
+	for _, t := range timestamps {
+		x, okX := xMap[t]
+		y, okY := yMap[t]
+		sv := &Vector{Timestamp: t}
+		if okX && okY && y != 0 {
+			sv.Value = x / y
+		} else if okX {
+			sv.Value = x
+		} else if okY {
+			sv.Value = 0
+		}
+		sum = append(sum, sv)
+	}
+
+	return sum
+}
+
 // addVectors adds two slices of Vectors. Vector timestamps are rounded to the
 // nearest ten seconds to allow matching of Vectors within a delta allowance.
 // Matching Vectors are summed, while unmatched Vectors are passed through.

+ 12 - 10
costmodel/cluster.go

@@ -13,12 +13,12 @@ import (
 
 const (
 	queryClusterCores = `sum(
-		avg(kube_node_status_capacity_cpu_cores %s) by (node, cluster_id) * avg(node_cpu_hourly_cost %s) by (node, cluster_id) * 730 +
-		avg(node_gpu_hourly_cost %s) by (node, cluster_id) * 730
+		avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
+		avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
 	  ) by (cluster_id)`
 
 	queryClusterRAM = `sum(
-		avg(kube_node_status_capacity_memory_bytes %s) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node, cluster_id) * 730
+		avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
 	  ) by (cluster_id)`
 
 	queryStorage = `sum(
@@ -144,10 +144,12 @@ func resultToTotal(qr interface{}) (map[string][][]string, error) {
 // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
 func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
 
-	offset = fmt.Sprintf("offset 3h") // Set offset to 3h for block sync
+	if offset != "" {
+		offset = fmt.Sprintf("offset %s", offset)
+	}
 
-	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, "")
 
 	klog.V(4).Infof("Running query %s", qCores)
@@ -220,8 +222,8 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 		offset = fmt.Sprintf("offset %s", offset)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
@@ -308,8 +310,8 @@ func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.P
 		offset = fmt.Sprintf("offset %s", offset)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
 	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 

+ 46 - 11
costmodel/costmodel.go

@@ -166,7 +166,7 @@ type PrometheusMetadata struct {
 
 // ValidatePrometheus tells the model what data prometheus has on it.
 func ValidatePrometheus(cli prometheusClient.Client) (*PrometheusMetadata, error) {
-	data, err := Query(cli, "up")
+	data, err := Query(cli, "up offset 3h")
 	if err != nil {
 		return &PrometheusMetadata{
 			Running:            false,
@@ -1244,9 +1244,9 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		nsLabelsResults, promErr = Query(cli, queryNSLabels)
 		defer wg.Done()
 	}()
-	var normalizationResult interface{}
+	var normalizationResults interface{}
 	go func() {
-		normalizationResult, promErr = Query(cli, normalization)
+		normalizationResults, promErr = QueryRange(cli, normalization, start, end, window)
 		defer wg.Done()
 	}()
 
@@ -1283,7 +1283,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, fmt.Errorf("Error querying the kubernetes api: %s", k8sErr.Error())
 	}
 
-	normalizationValue, err := getNormalization(normalizationResult)
+	normalizationValue, err := getNormalizations(normalizationResults)
 	if err != nil {
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
@@ -1366,7 +1366,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, 0, clusterID) // No need to normalize here, as this comes from a counter
+	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, normalizationValue, clusterID) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
 	}
@@ -1553,7 +1553,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 			node, ok := nodes[c.NodeName]
 			if !ok {
-				klog.V(2).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
+				klog.V(4).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
 				if n, ok := missingNodes[c.NodeName]; ok {
 					node = n
 				} else {
@@ -2231,6 +2231,43 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 	return toReturn, nil
 }
 
+//todo: don't cast, implement unmarshaler interface
+func getNormalizations(qr interface{}) ([]*Vector, error) {
+	data, ok := qr.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(qr)
+		if err != nil {
+			return nil, err
+		}
+		return nil, fmt.Errorf(e)
+	}
+	results, ok := data.(map[string]interface{})["result"].([]interface{})
+	if !ok {
+		return nil, fmt.Errorf("Result field not found in normalization response, aborting")
+	}
+	if len(results) > 0 {
+		vectors := []*Vector{}
+		for i := range results {
+			klog.Infof("%+v", results[i])
+			values, ok := results[i].(map[string]interface{})["values"].([]interface{})
+			for _, d := range values {
+				dataPoint := d.([]interface{})
+				if !ok || len(dataPoint) != 2 {
+					return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+				}
+				strVal := dataPoint[1].(string)
+				v, _ := strconv.ParseFloat(strVal, 64)
+				vectors = append(vectors, &Vector{
+					Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
+					Value:     v,
+				})
+			}
+		}
+		return vectors, nil
+	}
+	return nil, fmt.Errorf("Normalization data is empty, kube-state-metrics or node-exporter may not be running")
+}
+
 //todo: don't cast, implement unmarshaler interface
 func getNormalization(qr interface{}) (float64, error) {
 	data, ok := qr.(map[string]interface{})["data"]
@@ -2413,7 +2450,7 @@ func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue
 	return containerData, nil
 }
 
-func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*Vector, error) {
+func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValues []*Vector, defaultClusterID string) (map[string][]*Vector, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)
@@ -2456,15 +2493,13 @@ func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValu
 			}
 			strVal := dataPoint[1].(string)
 			v, _ := strconv.ParseFloat(strVal, 64)
-			if normalize && normalizationValue != 0 {
-				v = v / normalizationValue
-			}
 			vectors = append(vectors, &Vector{
 				Timestamp: math.Round(dataPoint[0].(float64)/10) * 10,
 				Value:     v,
 			})
 		}
-		containerData[containerMetric.Key()] = vectors
+		normalizedVectors := NormalizeVectorByVector(vectors, normalizationValues)
+		containerData[containerMetric.Key()] = normalizedVectors
 	}
 	return containerData, nil
 }

+ 8 - 3
costmodel/router.go

@@ -330,6 +330,11 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 			endTime = endTime.Add(-1 * o)
 		}
 
+		if endTime.After(time.Now().Add(-3 * time.Hour)) {
+			klog.Infof("Setting end time backwards to first present data")
+			endTime = time.Now().Add(-3 * time.Hour)
+		}
+
 		// if window is defined in terms of days, convert to hours
 		// e.g. convert "2d" to "48h"
 		window, err := normalizeTimeParam(window)
@@ -396,12 +401,11 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		return
 	}
 
-	remoteAvailable := os.Getenv(remoteEnabled)
+	remoteAvailable := os.Getenv(remoteEnabled) == "true"
 	remoteEnabled := false
-	if remoteAvailable == "true" && remote != "false" {
+	if remoteAvailable && remote != "false" {
 		remoteEnabled = true
 	}
-	klog.Infof("REMOTE ENABLED: %t", remoteEnabled)
 
 	// Use Thanos Client if it exists (enabled) and remote flag set
 	var pClient prometheusClient.Client
@@ -434,6 +438,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	if allocateIdle {
 		windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
 		if a.ThanosClient != nil {
+			klog.Infof("Setting offset to 3h")
 			offset = "3h"
 		}
 		idleCoefficients, err = ComputeIdleCoefficient(data, pClient, a.Cloud, discount, windowStr, offset)