Просмотр исходного кода

Merge pull request #425 from kubecost/develop

Merge develop into master
Ajay Tripathy 6 лет назад
Родитель
Сommit
8e43d03a76
5 измененных файлов с 326 добавлено и 10 удалено
  1. 2 0
      PROMETHEUS.md
  2. 8 6
      pkg/cloud/awsprovider.go
  3. 4 4
      pkg/costmodel/costmodel.go
  4. 112 0
      pkg/prom/query.go
  5. 200 0
      pkg/prom/result.go

+ 2 - 0
PROMETHEUS.md

@@ -58,5 +58,7 @@ sum(node_total_hourly_cost) * 730
 | node_ram_hourly_cost   | Hourly cost per Gb of memory on this node                       |
 | node_total_hourly_cost   | Total node cost per hour                       |
 | container_cpu_allocation   | Average number of CPUs requested/used over last 1m                      |
+| container_gpu_allocation   | Average number of GPUs requested over last 1m                      |
 | container_memory_allocation_bytes   | Average bytes of RAM requested/used over last 1m                 |
+| pod_pvc_allocation   | Bytes provisioned for a PVC attached to a pod                      |
 | pv_hourly_cost   | Hourly cost per GP on a persistent volume                 |

+ 8 - 6
pkg/cloud/awsprovider.go

@@ -324,7 +324,9 @@ func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			}
 
 			c.ServiceKeyName = a.ServiceKeyName
-			c.ServiceKeySecret = a.ServiceKeySecret
+			if a.ServiceKeySecret != "" {
+				c.ServiceKeySecret = a.ServiceKeySecret
+			}
 			c.SpotDataPrefix = a.Prefix
 			c.SpotDataBucket = a.BucketName
 			c.ProjectID = a.AccountID
@@ -343,7 +345,9 @@ func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			c.AthenaDatabase = a.AthenaDatabase
 			c.AthenaTable = a.AthenaTable
 			c.ServiceKeyName = a.ServiceKeyName
-			c.ServiceKeySecret = a.ServiceKeySecret
+			if a.ServiceKeySecret != "" {
+				c.ServiceKeySecret = a.ServiceKeySecret
+			}
 			c.AthenaProjectID = a.AccountID
 		} else {
 			a := make(map[string]interface{})
@@ -1590,8 +1594,7 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 			return nil, err
 		}
 		if len(op.ResultSet.Rows) > 1 {
-			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
-
+			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows))] {
 				cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
 				if err != nil {
 					return nil, err
@@ -1627,8 +1630,7 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 		}
 		oocAllocs = append(oocAllocs, gcpOOC...)
 	}
-
-	return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
+	return oocAllocs, nil
 }
 
 // QuerySQL can query a properly configured Athena database.

+ 4 - 4
pkg/costmodel/costmodel.go

@@ -208,10 +208,10 @@ const (
 		) 
 	) by (namespace,container_name,pod_name,node,cluster_id) 
 	* on (pod_name, namespace, cluster_id) group_left(container) label_replace(avg(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s)) by (pod,namespace,cluster_id), "pod_name","$1","pod","(.+)")`
-	queryPVRequestsStr = `avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id) 
-						* 
-						on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
-				sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id)`
+	queryPVRequestsStr = `avg(avg(kube_persistentvolumeclaim_info) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id) 
+	* 
+	on (persistentvolumeclaim, namespace, cluster_id) group_right(storageclass, volumename) 
+	sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id)`
 	// queryRAMAllocationByteHours yields the total byte-hour RAM allocation over the given
 	// window, aggregated by container.
 	//  [line 3]     sum(all byte measurements) = [byte*scrape] by metric

+ 112 - 0
pkg/prom/query.go

@@ -0,0 +1,112 @@
+package prom
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"net/http"
+
+	"github.com/kubecost/cost-model/pkg/util"
+	prometheus "github.com/prometheus/client_golang/api"
+	"k8s.io/klog"
+)
+
+const (
+	apiPrefix = "/api/v1"
+	epQuery   = apiPrefix + "/query"
+)
+
+// Context wraps a Prometheus client and provides methods for querying and
+// parsing query responses and errors.
+type Context struct {
+	Client         prometheus.Client
+	ErrorCollector *util.ErrorCollector
+	semaphore      *util.Semaphore
+}
+
+// NewContext creates a new Promethues querying context from the given client
+func NewContext(client prometheus.Client) *Context {
+	var ec util.ErrorCollector
+
+	// By deafult, allow 20 concurrent queries, which is the Prometheus default
+	sem := util.NewSemaphore(20)
+
+	return &Context{
+		Client:         client,
+		ErrorCollector: &ec,
+		semaphore:      sem,
+	}
+}
+
+// Errors returns the errors collected from the Context's ErrorCollector
+func (ctx *Context) Errors() []error {
+	return ctx.ErrorCollector.Errors()
+}
+
+// TODO SetMaxConcurrency
+
+// QueryAll returns one QueryResultsChan for each query provided, then runs
+// each query concurrently and returns results on each channel, respectively,
+// in the order they were provided; i.e. the response to queries[1] will be
+// sent on channel resChs[1].
+func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
+	resChs := []QueryResultsChan{}
+
+	for _, q := range queries {
+		resChs = append(resChs, ctx.Query(q))
+	}
+
+	return resChs
+}
+
+// Query returns a QueryResultsChan, then runs the given query and sends the
+// results on the provided channel. Receiver is responsible for closing the
+// channel, preferably using the Read method.
+func (ctx *Context) Query(query string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
+
+	go func(ctx *Context, resCh QueryResultsChan) {
+		raw, promErr := ctx.query(query)
+		ctx.ErrorCollector.Report(promErr)
+
+		results, parseErr := NewQueryResults(raw)
+		ctx.ErrorCollector.Report(parseErr)
+
+		resCh <- results
+	}(ctx, resCh)
+
+	return resCh
+}
+
+func (ctx *Context) query(query string) (interface{}, error) {
+	ctx.semaphore.Acquire()
+	defer ctx.semaphore.Return()
+
+	u := ctx.Client.URL(epQuery, nil)
+	q := u.Query()
+	q.Set("query", query)
+	u.RawQuery = q.Encode()
+
+	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
+	for _, w := range warnings {
+		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+	}
+	if err != nil {
+		if resp == nil {
+			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		}
+
+		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+	}
+	var toReturn interface{}
+	err = json.Unmarshal(body, &toReturn)
+	if err != nil {
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+	}
+	return toReturn, nil
+}

+ 200 - 0
pkg/prom/result.go

@@ -0,0 +1,200 @@
+package prom
+
+import (
+	"fmt"
+	"math"
+	"strconv"
+	"strings"
+
+	"github.com/kubecost/cost-model/pkg/util"
+	"k8s.io/klog"
+)
+
+// QueryResultsChan is a channel of query results
+type QueryResultsChan chan []*QueryResult
+
+// Await returns query results, blocking until they are made available, and
+// deferring the closure of the underlying channel
+func (qrc QueryResultsChan) Await() []*QueryResult {
+	defer close(qrc)
+	return <-qrc
+}
+
+// QueryResult contains a single result from a prometheus query. It's common
+// to refer to query results as a slice of QueryResult
+type QueryResult struct {
+	Metric map[string]interface{}
+	Values []*util.Vector
+}
+
+// NewQueryResults accepts the raw prometheus query result and returns an array of
+// QueryResult objects
+func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
+	var result []*QueryResult
+	if queryResult == nil {
+		return nil, fmt.Errorf("[Error] nil result from prometheus, has it gone down?")
+	}
+	data, ok := queryResult.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(queryResult)
+		if err != nil {
+			return nil, err
+		}
+		return nil, fmt.Errorf(e)
+	}
+
+	// Deep Check for proper formatting
+	d, ok := data.(map[string]interface{})
+	if !ok {
+		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
+	}
+	resultData, ok := d["result"]
+	if !ok {
+		return nil, fmt.Errorf("Result field not present in prometheus response")
+	}
+	resultsData, ok := resultData.([]interface{})
+	if !ok {
+		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
+	}
+
+	// Scan Results
+	for _, val := range resultsData {
+		resultInterface, ok := val.(map[string]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Result is improperly formatted")
+		}
+
+		metricInterface, ok := resultInterface["metric"]
+		if !ok {
+			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+		}
+		metricMap, ok := metricInterface.(map[string]interface{})
+		if !ok {
+			return nil, fmt.Errorf("Metric field is improperly formatted")
+		}
+
+		// Wrap execution of this lazily in case the data is not used
+		labels := func() string { return labelsForMetric(metricMap) }
+
+		// Determine if the result is a ranged data set or single value
+		_, isRange := resultInterface["values"]
+
+		var vectors []*util.Vector
+		if !isRange {
+			dataPoint, ok := resultInterface["value"]
+			if !ok {
+				return nil, fmt.Errorf("Value field does not exist in data result vector")
+			}
+
+			v, err := parseDataPoint(dataPoint, labels)
+			if err != nil {
+				return nil, err
+			}
+			vectors = append(vectors, v)
+		} else {
+			values, ok := resultInterface["values"].([]interface{})
+			if !ok {
+				return nil, fmt.Errorf("Values field is improperly formatted")
+			}
+
+			for _, value := range values {
+				v, err := parseDataPoint(value, labels)
+				if err != nil {
+					return nil, err
+				}
+
+				vectors = append(vectors, v)
+			}
+		}
+
+		result = append(result, &QueryResult{
+			Metric: metricMap,
+			Values: vectors,
+		})
+	}
+
+	return result, nil
+}
+
+// GetString returns the requested field, or an error if it does not exist
+func (qr *QueryResult) GetString(field string) (string, error) {
+	f, ok := qr.Metric[field]
+	if !ok {
+		return "", fmt.Errorf("%s field does not exist in data result vector", field)
+	}
+
+	strField, ok := f.(string)
+	if !ok {
+		return "", fmt.Errorf("%s field is improperly formatted", field)
+	}
+
+	return strField, nil
+}
+
+// GetLabels returns all labels and their values from the query result
+func (qr *QueryResult) GetLabels() map[string]string {
+	result := make(map[string]string)
+
+	// Find All keys with prefix label_, remove prefix, add to labels
+	for k, v := range qr.Metric {
+		if !strings.HasPrefix(k, "label_") {
+			continue
+		}
+
+		label := k[6:]
+		value, ok := v.(string)
+		if !ok {
+			klog.V(3).Infof("Failed to parse label value for label: %s", label)
+			continue
+		}
+
+		result[label] = value
+	}
+
+	return result
+}
+
+func parseDataPoint(dataPoint interface{}, labels func() string) (*util.Vector, error) {
+	value, ok := dataPoint.([]interface{})
+	if !ok || len(value) != 2 {
+		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+	}
+
+	strVal := value[1].(string)
+	v, err := strconv.ParseFloat(strVal, 64)
+	if err != nil {
+		return nil, err
+	}
+
+	// Test for +Inf and -Inf (sign: 0), Test for NaN
+	if math.IsInf(v, 0) {
+		klog.V(1).Infof("[Warning] Found Inf value parsing vector data point for metric: %s", labels())
+		v = 0.0
+	} else if math.IsNaN(v) {
+		klog.V(1).Infof("[Warning] Found NaN value parsing vector data point for metric: %s", labels())
+		v = 0.0
+	}
+
+	return &util.Vector{
+		Timestamp: math.Round(value[0].(float64)/10) * 10,
+		Value:     v,
+	}, nil
+}
+
+func labelsForMetric(metricMap map[string]interface{}) string {
+	var pairs []string
+	for k, v := range metricMap {
+		pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
+	}
+
+	return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
+}
+
+func wrapPrometheusError(qr interface{}) (string, error) {
+	e, ok := qr.(map[string]interface{})["error"]
+	if !ok {
+		return "", fmt.Errorf("Unexpected response from Prometheus")
+	}
+	eStr, ok := e.(string)
+	return eStr, nil
+}