Quellcode durchsuchen

WIP prom query refactor

Niko Kovacevic vor 5 Jahren
Ursprung
Commit
c3aa46418b
3 geänderte Dateien mit 84 neuen und 86 gelöschten Zeilen
  1. 46 81
      pkg/costmodel/costmodel.go
  2. 21 0
      pkg/costmodel/promparsers.go
  3. 17 5
      pkg/prom/query.go

+ 46 - 81
pkg/costmodel/costmodel.go

@@ -232,31 +232,45 @@ func ValidatePrometheus(cli prometheusClient.Client, isThanos bool) (*Prometheus
 	if isThanos {
 		q += thanos.QueryOffset()
 	}
-	data, err := Query(cli, q)
+
+	ctx := prom.NewContext(cli)
+
+	resUp, err := ctx.QuerySync(q)
 	if err != nil {
 		return &PrometheusMetadata{
 			Running:            false,
 			KubecostDataExists: false,
 		}, err
 	}
-	v, kcmetrics, err := getUptimeData(data)
-	if err != nil {
+
+	if len(resUp) == 0 {
 		return &PrometheusMetadata{
 			Running:            false,
 			KubecostDataExists: false,
-		}, err
+		}, fmt.Errorf("no running jobs on Prometheus at %s", ctx.QueryURL().Path)
 	}
-	if len(v) > 0 {
-		return &PrometheusMetadata{
-			Running:            true,
-			KubecostDataExists: kcmetrics,
-		}, nil
-	} else {
-		return &PrometheusMetadata{
-			Running:            false,
-			KubecostDataExists: false,
-		}, fmt.Errorf("No running jobs found on Prometheus at %s", cli.URL(epQuery, nil).Path)
+
+	for _, result := range resUp {
+		job, err := result.GetString("job")
+		if err != nil {
+			return &PrometheusMetadata{
+				Running:            false,
+				KubecostDataExists: false,
+			}, fmt.Errorf("up query does not have job names")
+		}
+
+		if job == "kubecost" {
+			return &PrometheusMetadata{
+				Running:            true,
+				KubecostDataExists: true,
+			}, err
+		}
 	}
+
+	return &PrometheusMetadata{
+		Running:            true,
+		KubecostDataExists: false,
+	}, nil
 }
 
 func getUptimeData(qr interface{}) ([]*util.Vector, bool, error) {
@@ -753,22 +767,22 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 	if len(missingContainers) > 0 {
 		queryHistoricalPodLabels := fmt.Sprintf(`kube_pod_labels{}[%s]`, window)
 
-		podLabelsResult, err := Query(cli, queryHistoricalPodLabels)
+		podLabelsResult, err := prom.NewContext(cli).QuerySync(queryHistoricalPodLabels)
 		if err != nil {
-			klog.V(1).Infof("Error parsing historical labels: %s", err.Error())
+			log.Errorf("failed to parse historical pod labels: %s", err.Error())
 		}
 		podLabels := make(map[string]map[string]string)
 		if podLabelsResult != nil {
-			podLabels, err = labelsFromPrometheusQuery(podLabelsResult)
+			podLabels, err = parsePodLabels(podLabelsResult)
 			if err != nil {
-				klog.V(1).Infof("Error parsing historical labels: %s", err.Error())
+				log.Errorf("failed to parse historical pod labels: %s", err.Error())
 			}
 		}
 		for key, costData := range missingContainers {
 			cm, _ := NewContainerMetricFromKey(key)
 			labels, ok := podLabels[cm.PodName]
 			if !ok {
-				klog.V(1).Infof("Unable to find historical data for pod '%s'", cm.PodName)
+				log.Errorf("unable to find historical data for pod '%s'", cm.PodName)
 				labels = make(map[string]string)
 			}
 			for k, v := range costData.NamespaceLabels {
@@ -781,54 +795,6 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 	return nil
 }
 
-func labelsFromPrometheusQuery(qr interface{}) (map[string]map[string]string, error) {
-	toReturn := make(map[string]map[string]string)
-	data, ok := qr.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(qr)
-		if err != nil {
-			return toReturn, err
-		}
-		return toReturn, fmt.Errorf(e)
-	}
-	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
-		metricInterface, ok := val.(map[string]interface{})["metric"]
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return toReturn, fmt.Errorf("Metric field is improperly formatted")
-		}
-		pod, ok := metricMap["pod"]
-		if !ok {
-			return toReturn, fmt.Errorf("pod field does not exist in data result vector")
-		}
-		podName, ok := pod.(string)
-		if !ok {
-			return toReturn, fmt.Errorf("pod field is improperly formatted")
-		}
-
-		for labelName, labelValue := range metricMap {
-			parsedLabelName := labelName
-			parsedLv, ok := labelValue.(string)
-			if !ok {
-				return toReturn, fmt.Errorf("label value is improperly formatted")
-			}
-			if strings.HasPrefix(parsedLabelName, "label_") {
-				l := strings.Replace(parsedLabelName, "label_", "", 1)
-				if podLabels, ok := toReturn[podName]; ok {
-					podLabels[l] = parsedLv
-				} else {
-					toReturn[podName] = make(map[string]string)
-					toReturn[podName][l] = parsedLv
-				}
-			}
-		}
-	}
-	return toReturn, nil
-}
-
 func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*costAnalyzerCloud.Node, window string) error {
 	if len(missingNodes) > 0 {
 		defer measureTime(time.Now(), profileThreshold, "Finding Deleted Node Info")
@@ -844,28 +810,27 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 		queryHistoricalRAMCost := fmt.Sprintf(`avg_over_time(node_ram_hourly_cost{instance=~"%s"}[%s])`, l, window)
 		queryHistoricalGPUCost := fmt.Sprintf(`avg_over_time(node_gpu_hourly_cost{instance=~"%s"}[%s])`, l, window)
 
-		cpuCostResult, err := Query(cli, queryHistoricalCPUCost)
-		if err != nil {
-			return fmt.Errorf("Error fetching cpu cost data: " + err.Error())
-		}
-		ramCostResult, err := Query(cli, queryHistoricalRAMCost)
-		if err != nil {
-			return fmt.Errorf("Error fetching ram cost data: " + err.Error())
-		}
-		gpuCostResult, err := Query(cli, queryHistoricalGPUCost)
-		if err != nil {
-			return fmt.Errorf("Error fetching gpu cost data: " + err.Error())
+		ctx := prom.NewContext(cli)
+		cpuCostResCh := ctx.Query(queryHistoricalCPUCost)
+		ramCostResCh := ctx.Query(queryHistoricalRAMCost)
+		gpuCostResCh := ctx.Query(queryHistoricalGPUCost)
+
+		cpuCostRes, _ := cpuCostResCh.Await()
+		ramCostRes, _ := ramCostResCh.Await()
+		gpuCostRes, _ := gpuCostResCh.Await()
+		if ctx.HasErrors() {
+			return ctx.Errors()[0]
 		}
 
-		cpuCosts, err := getCost(cpuCostResult)
+		cpuCosts, err := getCost(cpuCostRes)
 		if err != nil {
 			return err
 		}
-		ramCosts, err := getCost(ramCostResult)
+		ramCosts, err := getCost(ramCostRes)
 		if err != nil {
 			return err
 		}
-		gpuCosts, err := getCost(gpuCostResult)
+		gpuCosts, err := getCost(gpuCostRes)
 		if err != nil {
 			return err
 		}

+ 21 - 0
pkg/costmodel/promparsers.go

@@ -1,6 +1,7 @@
 package costmodel
 
 import (
+	"errors"
 	"fmt"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
@@ -386,3 +387,23 @@ func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID s
 
 	return toReturn, nil
 }
+
+func parsePodLabels(qrs []*prom.QueryResult) (map[string]map[string]string, error) {
+	podLabels := map[string]map[string]string{}
+
+	for _, result := range qrs {
+		pod, err := result.GetString("pod")
+		if err != nil {
+			return podLabels, errors.New("missing pod field")
+		}
+
+		if _, ok := podLabels[pod]; ok {
+			podLabels[pod] = result.GetLabels()
+		} else {
+			podLabels[pod] = map[string]string{}
+			podLabels[pod] = result.GetLabels()
+		}
+	}
+
+	return podLabels, nil
+}

+ 17 - 5
pkg/prom/query.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"net/http"
+	"net/url"
 	"strconv"
 	"time"
 
@@ -15,8 +16,9 @@ import (
 )
 
 const (
-	apiPrefix = "/api/v1"
-	epQuery   = apiPrefix + "/query"
+	apiPrefix    = "/api/v1"
+	epQuery      = apiPrefix + "/query"
+	epQueryRange = apiPrefix + "/query_range"
 )
 
 // Context wraps a Prometheus client and provides methods for querying and
@@ -81,7 +83,7 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
-func (ctx *Context) QuerySync(query string) (*QueryResults, error) {
+func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
 	raw, err := ctx.query(query)
 	if err != nil {
 		return nil, err
@@ -92,7 +94,12 @@ func (ctx *Context) QuerySync(query string) (*QueryResults, error) {
 		return nil, err
 	}
 
-	return results, nil
+	return results.Results, nil
+}
+
+// QueryURL returns the URL used to query Prometheus
+func (ctx *Context) QueryURL() *url.URL {
+	return ctx.Client.URL(epQuery, nil)
 }
 
 func (ctx *Context) query(query string) (interface{}, error) {
@@ -159,8 +166,13 @@ func (ctx *Context) QueryRangeSync(query string, start, end time.Time, step time
 	return results, nil
 }
 
+// QueryRangeURL returns the URL used to query_range Prometheus
+func (ctx *Context) QueryRangeURL() *url.URL {
+	return ctx.Client.URL(epQueryRange, nil)
+}
+
 func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
-	u := ctx.Client.URL(epQuery, nil)
+	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()
 	q.Set("query", query)
 	q.Set("start", start.Format(time.RFC3339Nano))