2
0
Эх сурвалжийг харах

Merge pull request #570 from kubecost/niko/assets

Simplify CPU mode query & propagate query in errors
Niko Kovacevic 5 жил өмнө
parent
commit
3007e3e355

+ 50 - 20
pkg/costmodel/cluster.go

@@ -424,7 +424,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
 	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryActiveMins := fmt.Sprintf(`node_total_hourly_cost[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
@@ -435,7 +435,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
 	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
 	resChNodeLabels := ctx.Query(queryNodeLabels)
-	resChNodeCPUModePct := ctx.Query(queryNodeCPUModePct)
+	resChNodeCPUModeTotal := ctx.Query(queryNodeCPUModeTotal)
 	resChNodeRAMSystemPct := ctx.Query(queryNodeRAMSystemPct)
 	resChNodeRAMUserPct := ctx.Query(queryNodeRAMUserPct)
 	resChActiveMins := ctx.Query(queryActiveMins)
@@ -446,7 +446,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resNodeRAMCost, _ := resChNodeRAMCost.Await()
 	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
 	resNodeLabels, _ := resChNodeLabels.Await()
-	resNodeCPUModePct, _ := resChNodeCPUModePct.Await()
+	resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
 	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
 	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
 	resActiveMins, _ := resChActiveMins.Await()
@@ -621,13 +621,20 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		}
 	}
 
-	for _, result := range resNodeCPUModePct {
+	// Mapping of cluster/node=cpu for computing resource efficiency
+	clusterNodeCPUTotal := map[string]float64{}
+	// Mapping of cluster/node:mode=cpu for computing resource efficiency
+	clusterNodeModeCPUTotal := map[string]map[string]float64{}
+
+	// Build intermediate structures for CPU usage by (cluster, node) and by
+	// (cluster, node, mode) for computing resouce efficiency
+	for _, result := range resNodeCPUModeTotal {
 		cluster, err := result.GetString("cluster_id")
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
 
-		name, err := result.GetString("kubernetes_node")
+		node, err := result.GetString("kubernetes_node")
 		if err != nil {
 			log.DedupedWarningf(5, "ClusterNodes: CPU mode data missing node")
 			continue
@@ -639,23 +646,45 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 			mode = "other"
 		}
 
-		pct := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, node)
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
-		if _, ok := nodeMap[key]; !ok {
-			log.Warningf("ClusterNodes: CPU mode data for unidentified node")
-			continue
+		total := result.Values[0].Value
+
+		// Increment total
+		clusterNodeCPUTotal[key] += total
+
+		// Increment mode
+		if _, ok := clusterNodeModeCPUTotal[key]; !ok {
+			clusterNodeModeCPUTotal[key] = map[string]float64{}
 		}
+		clusterNodeModeCPUTotal[key][mode] += total
+	}
 
-		switch mode {
-		case "idle":
-			nodeMap[key].CPUBreakdown.Idle += pct
-		case "system":
-			nodeMap[key].CPUBreakdown.System += pct
-		case "user":
-			nodeMap[key].CPUBreakdown.User += pct
-		default:
-			nodeMap[key].CPUBreakdown.Other += pct
+	// Compute resource efficiency from intermediate structures
+	for key, total := range clusterNodeCPUTotal {
+		if modeTotals, ok := clusterNodeModeCPUTotal[key]; ok {
+			for mode, subtotal := range modeTotals {
+				// Compute percentage for the current cluster, node, mode
+				pct := subtotal / total
+
+				if _, ok := nodeMap[key]; !ok {
+					log.Warningf("ClusterNodes: CPU mode data for unidentified node")
+					continue
+				}
+
+				switch mode {
+				case "idle":
+					nodeMap[key].CPUBreakdown.Idle += pct
+				case "system":
+					nodeMap[key].CPUBreakdown.System += pct
+				case "user":
+					nodeMap[key].CPUBreakdown.User += pct
+				default:
+					nodeMap[key].CPUBreakdown.Other += pct
+				}
+
+				log.Infof("ClusterNodes: %s:%s=%.3f", key, mode, pct)
+			}
 		}
 	}
 
@@ -781,7 +810,8 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		// TODO take RI into account
 		node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
 
-		// Apply all remaining RAM to Idle
+		// Apply all remaining resources to Idle
+		node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
 		node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
 	}
 

+ 3 - 3
pkg/costmodel/promparsers.go

@@ -392,10 +392,10 @@ func getCost(qrs []*prom.QueryResult) (map[string][]*util.Vector, error) {
 // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
 func getNormalization(qrs []*prom.QueryResult) (float64, error) {
 	if len(qrs) == 0 {
-		return 0.0, prom.NoDataErr
+		return 0.0, prom.NoDataErr("getNormalization")
 	}
 	if len(qrs[0].Values) == 0 {
-		return 0.0, prom.NoDataErr
+		return 0.0, prom.NoDataErr("getNormalization")
 	}
 	return qrs[0].Values[0].Value, nil
 }
@@ -404,7 +404,7 @@ func getNormalization(qrs []*prom.QueryResult) (float64, error) {
 // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
 func getNormalizations(qrs []*prom.QueryResult) ([]*util.Vector, error) {
 	if len(qrs) == 0 {
-		return nil, prom.NoDataErr
+		return nil, prom.NoDataErr("getNormalizations")
 	}
 
 	return qrs[0].Values, nil

+ 5 - 6
pkg/prom/query.go

@@ -13,7 +13,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
-	"k8s.io/klog"
 )
 
 const (
@@ -152,20 +151,20 @@ func (ctx *Context) query(query string) (interface{}, error) {
 
 	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
 	for _, w := range warnings {
-		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+		log.Warningf("fetching query '%s': %s", query, w)
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+			return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
 		}
 
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
 	}
 
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
 	}
 
 	return toReturn, nil
@@ -243,7 +242,7 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 
 	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
 	for _, w := range warnings {
-		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+		log.Warningf("fetching query '%s': %s", query, w)
 	}
 	if err != nil {
 		if resp == nil {

+ 64 - 31
pkg/prom/result.go

@@ -1,7 +1,6 @@
 package prom
 
 import (
-	"errors"
 	"fmt"
 	"math"
 	"strconv"
@@ -15,22 +14,56 @@ var (
 	// Static Warnings for data point parsing
 	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
 	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
-
-	// Static Errors for query result parsing
-	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
-	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
-	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
-	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
-	NoDataErr                  error = errors.New("No data")
-	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
-	QueryResultNilErr          error = NewCommError("nil queryResult")
-	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
-	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
-	ResultFormatErr            error = errors.New("Result is improperly formatted")
-	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
-	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
 )
 
+func DataFieldFormatErr(query string) error {
+	return fmt.Errorf("Data field improperly formatted in prometheus repsonse fetching query '%s'", query)
+}
+
+func DataPointFormatErr(query string) error {
+	return fmt.Errorf("Improperly formatted datapoint from Prometheus fetching query '%s'", query)
+}
+
+func MetricFieldDoesNotExistErr(query string) error {
+	return fmt.Errorf("Metric field does not exist in data result vector fetching query '%s'", query)
+}
+
+func MetricFieldFormatErr(query string) error {
+	return fmt.Errorf("Metric field is improperly formatted fetching query '%s'", query)
+}
+
+func NoDataErr(query string) error {
+	return fmt.Errorf("No data fetching query '%s'", query)
+}
+
+func PromUnexpectedResponseErr(query string) error {
+	return fmt.Errorf("Unexpected response from Prometheus fetching query '%s'", query)
+}
+
+func QueryResultNilErr(query string) error {
+	return NewCommError(fmt.Sprintf("nil queryResult fetching query '%s'", query))
+}
+
+func ResultFieldDoesNotExistErr(query string) error {
+	return fmt.Errorf("Result field not does not exist in prometheus response fetching query '%s'", query)
+}
+
+func ResultFieldFormatErr(query string) error {
+	return fmt.Errorf("Result field improperly formatted in prometheus response fetching query '%s'", query)
+}
+
+func ResultFormatErr(query string) error {
+	return fmt.Errorf("Result is improperly formatted fetching query '%s'", query)
+}
+
+func ValueFieldDoesNotExistErr(query string) error {
+	return fmt.Errorf("Value field does not exist in data result vector fetching query '%s'", query)
+}
+
+func ValueFieldFormatErr(query string) error {
+	return fmt.Errorf("Values field is improperly formatted fetching query '%s'", query)
+}
+
 // QueryResultsChan is a channel of query results
 type QueryResultsChan chan *QueryResults
 
@@ -67,13 +100,13 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 	qrs := &QueryResults{Query: query}
 
 	if queryResult == nil {
-		qrs.Error = QueryResultNilErr
+		qrs.Error = QueryResultNilErr(query)
 		return qrs
 	}
 
 	data, ok := queryResult.(map[string]interface{})["data"]
 	if !ok {
-		e, err := wrapPrometheusError(queryResult)
+		e, err := wrapPrometheusError(query, queryResult)
 		if err != nil {
 			qrs.Error = err
 			return qrs
@@ -85,17 +118,17 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 	// Deep Check for proper formatting
 	d, ok := data.(map[string]interface{})
 	if !ok {
-		qrs.Error = DataFieldFormatErr
+		qrs.Error = DataFieldFormatErr(query)
 		return qrs
 	}
 	resultData, ok := d["result"]
 	if !ok {
-		qrs.Error = ResultFieldDoesNotExistErr
+		qrs.Error = ResultFieldDoesNotExistErr(query)
 		return qrs
 	}
 	resultsData, ok := resultData.([]interface{})
 	if !ok {
-		qrs.Error = ResultFieldFormatErr
+		qrs.Error = ResultFieldFormatErr(query)
 		return qrs
 	}
 
@@ -106,18 +139,18 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 	for _, val := range resultsData {
 		resultInterface, ok := val.(map[string]interface{})
 		if !ok {
-			qrs.Error = ResultFormatErr
+			qrs.Error = ResultFormatErr(query)
 			return qrs
 		}
 
 		metricInterface, ok := resultInterface["metric"]
 		if !ok {
-			qrs.Error = MetricFieldDoesNotExistErr
+			qrs.Error = MetricFieldDoesNotExistErr(query)
 			return qrs
 		}
 		metricMap, ok := metricInterface.(map[string]interface{})
 		if !ok {
-			qrs.Error = MetricFieldFormatErr
+			qrs.Error = MetricFieldFormatErr(query)
 			return qrs
 		}
 
@@ -132,12 +165,12 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 		if !isRange {
 			dataPoint, ok := resultInterface["value"]
 			if !ok {
-				qrs.Error = ValueFieldDoesNotExistErr
+				qrs.Error = ValueFieldDoesNotExistErr(query)
 				return qrs
 			}
 
 			// Append new data point, log warnings
-			v, warn, err := parseDataPoint(dataPoint)
+			v, warn, err := parseDataPoint(query, dataPoint)
 			if err != nil {
 				qrs.Error = err
 				return qrs
@@ -156,7 +189,7 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 
 			// Append new data points, log warnings
 			for _, value := range values {
-				v, warn, err := parseDataPoint(value)
+				v, warn, err := parseDataPoint(query, value)
 				if err != nil {
 					qrs.Error = err
 					return qrs
@@ -222,12 +255,12 @@ func (qr *QueryResult) GetLabels() map[string]string {
 
 // parseDataPoint parses a data point from raw prometheus query results and returns
 // a new Vector instance containing the parsed data along with any warnings or errors.
-func parseDataPoint(dataPoint interface{}) (*util.Vector, warning, error) {
+func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {
 	var w warning = nil
 
 	value, ok := dataPoint.([]interface{})
 	if !ok || len(value) != 2 {
-		return nil, w, DataPointFormatErr
+		return nil, w, DataPointFormatErr(query)
 	}
 
 	strVal := value[1].(string)
@@ -260,11 +293,11 @@ func labelsForMetric(metricMap map[string]interface{}) string {
 	return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
 }
 
-func wrapPrometheusError(qr interface{}) (string, error) {
+func wrapPrometheusError(query string, qr interface{}) (string, error) {
 	e, ok := qr.(map[string]interface{})["error"]
 	if !ok {
-		return "", PromUnexpectedResponseErr
+		return "", PromUnexpectedResponseErr(query)
 	}
 	eStr, ok := e.(string)
-	return eStr, nil
+	return fmt.Sprintf("'%s' parsing query '%s'", eStr, query), nil
 }