فهرست منبع

Issue #173: use cluster name, rather than clusterId from helm chart, when reporting by cluster; in aggregation API, provide optional boolean flag to send time series data.

Niko Kovacevic 6 سال پیش
والد
کامیت
d1a1079f69
4فایلهای تغییر یافته به همراه107 افزوده شده و 52 حذف شده
  1. 40 20
      costmodel/aggregations.go
  2. 8 8
      costmodel/costmodel.go
  3. 2 0
      go.mod
  4. 57 24
      main.go

+ 40 - 20
costmodel/aggregations.go

@@ -16,12 +16,12 @@ type Aggregation struct {
 	Environment        string    `json:"environment"`
 	Cluster            string    `json:"cluster"`
 	CPUAllocation      []*Vector `json:"-"`
-	CPUCostVector      []*Vector `json:"-"`
+	CPUCostVector      []*Vector `json:"cpuCostVector,omitempty"`
 	RAMAllocation      []*Vector `json:"-"`
-	RAMCostVector      []*Vector `json:"-"`
-	PVCostVector       []*Vector `json:"-"`
+	RAMCostVector      []*Vector `json:"ramCostVector,omitempty"`
+	PVCostVector       []*Vector `json:"pvCostVector,omitempty"`
 	GPUAllocation      []*Vector `json:"-"`
-	GPUCostVector      []*Vector `json:"-"`
+	GPUCostVector      []*Vector `json:"gpuCostVector,omitempty"`
 	CPUCost            float64   `json:"cpuCost"`
 	RAMCost            float64   `json:"ramCost"`
 	GPUCost            float64   `json:"gpuCost"`
@@ -95,9 +95,17 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 	return (totalContainerCost / totalClusterCostOverWindow), nil
 }
 
-func AggregateCostModel(costData map[string]*CostData, discount float64, idleCoefficient float64, sr *SharedResourceInfo, aggregationField string, aggregationSubField string) map[string]*Aggregation {
+// AggregateCostModel reduces the dimensions of raw cost data by field and, optionally, by time. The field parameter determines the field
+// by which to group data, with an optional subfield, e.g. for groupings like field="label" and subfield="app" for grouping by "label.app".
+func AggregateCostModel(costData map[string]*CostData, field string, subfield string, timeSeries bool, discount float64, idleCoefficient float64, sr *SharedResourceInfo) map[string]*Aggregation {
+	// aggregations collects key-value pairs of resource group-to-aggregated data
+	// e.g. namespace-to-data or label-value-to-data
 	aggregations := make(map[string]*Aggregation)
+
+	// sharedResourceCost is the running total cost of resources that should be reported
+	// as shared across all other resources, rather than reported as a stand-alone category
 	sharedResourceCost := 0.0
+
 	for _, costDatum := range costData {
 		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
 			cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount, idleCoefficient)
@@ -108,27 +116,28 @@ func AggregateCostModel(costData map[string]*CostData, discount float64, idleCoe
 				sharedResourceCost += totalVector(pv)
 			}
 		} else {
-			if aggregationField == "cluster" {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations, discount, idleCoefficient)
-			} else if aggregationField == "namespace" {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations, discount, idleCoefficient)
-			} else if aggregationField == "service" {
+			if field == "cluster" {
+				aggregateDatum(aggregations, costDatum, field, subfield, costDatum.ClusterID, discount, idleCoefficient)
+			} else if field == "namespace" {
+				aggregateDatum(aggregations, costDatum, field, subfield, costDatum.Namespace, discount, idleCoefficient)
+			} else if field == "service" {
 				if len(costDatum.Services) > 0 {
-					aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations, discount, idleCoefficient)
+					aggregateDatum(aggregations, costDatum, field, subfield, costDatum.Services[0], discount, idleCoefficient)
 				}
-			} else if aggregationField == "deployment" {
+			} else if field == "deployment" {
 				if len(costDatum.Deployments) > 0 {
-					aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations, discount, idleCoefficient)
+					aggregateDatum(aggregations, costDatum, field, subfield, costDatum.Deployments[0], discount, idleCoefficient)
 				}
-			} else if aggregationField == "label" {
+			} else if field == "label" {
 				if costDatum.Labels != nil {
-					if subfieldName, ok := costDatum.Labels[aggregationSubField]; ok {
-						aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations, discount, idleCoefficient)
+					if subfieldName, ok := costDatum.Labels[subfield]; ok {
+						aggregateDatum(aggregations, costDatum, field, subfield, subfieldName, discount, idleCoefficient)
 					}
 				}
 			}
 		}
 	}
+
 	for _, agg := range aggregations {
 		agg.CPUCost = totalVector(agg.CPUCostVector)
 		agg.RAMCost = totalVector(agg.RAMCostVector)
@@ -136,19 +145,30 @@ func AggregateCostModel(costData map[string]*CostData, discount float64, idleCoe
 		agg.PVCost = totalVector(agg.PVCostVector)
 		agg.SharedCost = sharedResourceCost / float64(len(aggregations))
 		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.SharedCost
+
+		// remove time series data if it is not explicitly requested
+		if !timeSeries {
+			agg.CPUCostVector = nil
+			agg.RAMCostVector = nil
+			agg.PVCostVector = nil
+			agg.GPUCostVector = nil
+		}
 	}
+
 	return aggregations
 }
 
-func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation, discount float64, idleCoefficient float64) {
+func aggregateDatum(aggregations map[string]*Aggregation, costDatum *CostData, field string, subfield string, key string, discount float64, idleCoefficient float64) {
+	// add new entry to aggregation results if a new
 	if _, ok := aggregations[key]; !ok {
 		agg := &Aggregation{}
-		agg.Aggregator = aggregator
-		agg.AggregatorSubField = aggregatorSubField
+		agg.Aggregator = field
+		agg.AggregatorSubField = subfield
 		agg.Environment = key
 		agg.Cluster = costDatum.ClusterID
 		aggregations[key] = agg
 	}
+
 	mergeVectors(costDatum, aggregations[key], discount, idleCoefficient)
 }
 
@@ -277,4 +297,4 @@ func addVectors(req []*Vector, used []*Vector) []*Vector {
 	}
 
 	return allocation
-}
+}

+ 8 - 8
costmodel/costmodel.go

@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"math"
 	"net/http"
-	"os"
 	"sort"
 	"strconv"
 	"strings"
@@ -864,6 +863,7 @@ func getContainerAllocation(req []*Vector, used []*Vector) []*Vector {
 
 	return allocation
 }
+
 func addPVData(cache ClusterCache, pvClaimMapping map[string]*PersistentVolumeClaimData, cloud costAnalyzerCloud.Provider) error {
 	cfg, err := cloud.GetConfig()
 	if err != nil {
@@ -1112,7 +1112,7 @@ func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[st
 	return podDeploymentsMapping, nil
 }
 
-func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider,
+func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider,
 	startString, endString, windowString string, filterNamespace string, remoteEnabled bool) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
@@ -1142,7 +1142,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
-	clustID := os.Getenv(clusterIDKey)
+	clusterName := cloud.ClusterName(cp)
 	if remoteEnabled == true {
 		remoteLayout := "2006-01-02T15:04:05Z"
 		remoteStartStr := start.Format(remoteLayout)
@@ -1244,7 +1244,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, fmt.Errorf("Error parsing normalization values: " + err.Error())
 	}
 
-	nodes, err := getNodeCost(cm.Cache, cloud)
+	nodes, err := getNodeCost(cm.Cache, cp)
 	if err != nil {
 		klog.V(1).Infof("Warning, no cost model available: " + err.Error())
 		return nil, err
@@ -1256,7 +1256,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		klog.Infof("Unable to get PV Data: %s", err.Error())
 	}
 	if pvClaimMapping != nil {
-		err = addPVData(cm.Cache, pvClaimMapping, cloud)
+		err = addPVData(cm.Cache, pvClaimMapping, cp)
 		if err != nil {
 			return nil, err
 		}
@@ -1358,7 +1358,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 
 			var podNetCosts []*Vector
 			if usage, ok := networkUsageMap[ns+","+podName]; ok {
-				netCosts, err := GetNetworkCost(usage, cloud)
+				netCosts, err := GetNetworkCost(usage, cp)
 				if err != nil {
 					klog.V(3).Infof("Error pulling network costs: %s", err.Error())
 				} else {
@@ -1446,7 +1446,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 					Labels:          podLabels,
 					NetworkData:     netReq,
 					NamespaceLabels: nsLabels,
-					ClusterID:       clustID,
+					ClusterID:       clusterName,
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)
@@ -1513,7 +1513,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
-				ClusterID:       clustID,
+				ClusterID:       clusterName,
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed)
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed)

+ 2 - 0
go.mod

@@ -40,3 +40,5 @@ require (
 	k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
 	k8s.io/klog v0.4.0
 )
+
+go 1.13

+ 57 - 24
main.go

@@ -72,6 +72,21 @@ type DataEnvelope struct {
 	Message string      `json:"message,omitempty"`
 }
 
+func normalizeTimeParam(param string) (string, error) {
+	// convert days to hours
+	if param[len(param)-1:] == "d" {
+		count := param[:len(param)-1]
+		val, err := strconv.ParseInt(count, 10, 64)
+		if err != nil {
+			return "", err
+		}
+		val = val * 24
+		param = fmt.Sprintf("%dh", val)
+	}
+
+	return param, nil
+}
+
 func wrapDataWithMessage(data interface{}, err error, message string) []byte {
 	var resp []byte
 
@@ -183,7 +198,7 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 			w.Write(wrapData(nil, err))
 		}
 		discount = discount * 0.01
-		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregationField, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, aggregationField, aggregationSubField, false, discount, 1.0, nil)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -227,6 +242,9 @@ func (a *Accesses) ClusterCostsOverTime(w http.ResponseWriter, r *http.Request,
 	w.Write(wrapData(data, err))
 }
 
+// AggregateCostModel handles HTTP requests to the aggregated cost model API, which can be parametrized
+// by time period using window and offset, aggregation field using field and subfield (in cases like
+// field=label, subfield=app for grouping by label.app), and filtered by namespace.
 func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -234,19 +252,37 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 	namespace := r.URL.Query().Get("namespace")
-	aggregationField := r.URL.Query().Get("aggregation")
-	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
+	field := r.URL.Query().Get("aggregation")
+	subfield := r.URL.Query().Get("aggregationSubfield")
 	allocateIdle := r.URL.Query().Get("allocateIdle")
 	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
 	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
 	sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
+	remote := r.URL.Query().Get("remote")
 
+	// timeSeries == true maintains the time series dimension of the data,
+	// which by default gets summed over the entire interval
+	timeSeries := r.URL.Query().Get("timeSeries") == "true"
+
+	// disableCache, if set to "true", tells this function to recompute and
+	// cache the requested data
 	disableCache := r.URL.Query().Get("disableCache") == "true"
+
+	// clearCache, if set to "true", tells this function to flush the cache,
+	// then recompute and cache the requested data
 	clearCache := r.URL.Query().Get("clearCache") == "true"
 
-	if aggregationField == "" {
+	// aggregation field is required
+	if field == "" {
+		w.WriteHeader(http.StatusBadRequest)
+		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation field parameter")))
+		return
+	}
+
+	// aggregation subfield is required when aggregation field is "label"
+	if field == "label" && subfield == "" {
 		w.WriteHeader(http.StatusBadRequest)
-		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation parameter")))
+		w.Write(wrapData(nil, fmt.Errorf("Missing aggregation subfield parameter for aggregation by label")))
 		return
 	}
 
@@ -263,17 +299,18 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		endTime = endTime.Add(-1 * o)
 	}
 
-	// if window is defined in terms of days, convert it to hours
+	// if window or offset are defined in terms of days, convert to hours
 	// e.g. convert "2d" to "48h"
-	if window[len(window)-1:] == "d" {
-		count := window[:len(window)-1]
-		val, err := strconv.ParseInt(count, 10, 64)
-		if err != nil {
-			w.Write(wrapData(nil, err))
-			return
-		}
-		val = val * 24
-		window = fmt.Sprintf("%dh", val)
+	window, err := normalizeTimeParam(window)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
+	}
+
+	offset, err = normalizeTimeParam(offset)
+	if err != nil {
+		w.Write(wrapData(nil, err))
+		return
 	}
 
 	// convert time window into start and end times, formatted
@@ -295,18 +332,14 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		a.Cache.Flush()
 	}
 
-	aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s", window, offset, namespace, aggregationField, aggregationSubField)
+	aggKey := fmt.Sprintf("aggregate:%s:%s:%s:%s:%s:%t", window, offset, namespace, field, subfield, timeSeries)
 
 	// check the cache for aggregated response; if cache is hit and not disabled, return response
 	if result, found := a.Cache.Get(aggKey); found && !disableCache {
-		// TODO send http.StatusNotModified when testing is complete
-		w.WriteHeader(http.StatusOK)
 		w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache hit: %s", aggKey)))
 		return
 	}
 
-	remote := r.URL.Query().Get("remote")
-
 	remoteAvailable := os.Getenv(remoteEnabled)
 	remoteEnabled := false
 	if remoteAvailable == "true" && remote != "false" {
@@ -354,13 +387,13 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 			return
 		}
 	}
-	var s *costModel.SharedResourceInfo
+	var sr *costModel.SharedResourceInfo
 	if len(sn) > 0 || len(sln) > 0 {
-		s = costModel.NewSharedResourceInfo(true, sn, sln, slv)
+		sr = costModel.NewSharedResourceInfo(true, sn, sln, slv)
 	}
 
 	// aggregate cost model data by given fields and cache the result for the default expiration
-	result := costModel.AggregateCostModel(data, discount, idleCoefficient, s, aggregationField, aggregationSubField)
+	result := costModel.AggregateCostModel(data, field, subfield, timeSeries, discount, idleCoefficient, sr)
 	a.Cache.Set(aggKey, result, cache.DefaultExpiration)
 
 	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss: %s", aggKey)))
@@ -398,7 +431,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 			w.Write(wrapData(nil, err))
 		}
 		discount = discount * 0.01
-		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregationField, aggregationSubField)
+		agg := costModel.AggregateCostModel(data, aggregationField, aggregationSubField, false, discount, 1.0, nil)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {