Browse Source

Refactor AggregateCostData to take optional parameters to un-pollute the function signature and provide simpler default behavior and testability; refactor calls to AggregateCostData; refactor addVector to better document behavior; WIP efficiency data: add request vectors to Aggregation

Niko Kovacevic 6 years ago
parent
commit
393d093e44

+ 91 - 45
costmodel/aggregations.go

@@ -18,9 +18,12 @@ type Aggregation struct {
 	Cluster           string    `json:"cluster,omitempty"`
 	CPUAllocation     []*Vector `json:"-"`
 	CPUCostVector     []*Vector `json:"cpuCostVector,omitempty"`
+	CPURequestVector  []*Vector `json:"cpuRequestVector,omitempty"`
 	RAMAllocation     []*Vector `json:"-"`
 	RAMCostVector     []*Vector `json:"ramCostVector,omitempty"`
+	RAMRequestVector  []*Vector `json:"ramRequestVector,omitempty"`
 	PVCostVector      []*Vector `json:"pvCostVector,omitempty"`
+	PVRequestVector   []*Vector `json:"pvRequestVector,omitempty"`
 	GPUAllocation     []*Vector `json:"-"`
 	GPUCostVector     []*Vector `json:"gpuCostVector,omitempty"`
 	NetworkCostVector []*Vector `json:"networkCostVector,omitempty"`
@@ -114,9 +117,29 @@ func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.
 	return (totalContainerCost / totalClusterCostOverWindow), nil
 }
 
-// AggregateCostData reduces the dimensions of raw cost data by field and, optionally, by time. The field parameter determines the field
-// by which to group data, with an optional subfield, e.g. for groupings like field="label" and subfield="app" for grouping by "label.app".
-func AggregateCostData(cp cloud.Provider, costData map[string]*CostData, dataCount int64, field string, subfields []string, rate string, timeSeries bool, discount float64, idleCoefficient float64, sr *SharedResourceInfo) map[string]*Aggregation {
+// AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
+type AggregationOptions struct {
+	DataCount          int64   // number of cost data points expected; ensures proper rate calculation if data is incomplete
+	Discount           float64 // percent by which to discount CPU, RAM, and GPU cost
+	IdleCoefficient    float64 // scales costs by amount of idle resources
+	IncludeTimeSeries  bool    // set to true to receive time series data
+	Rate               string  // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
+	SharedResourceInfo *SharedResourceInfo
+}
+
+// AggregateCostData aggregates raw cost data by field; e.g. namespace, cluster, service, or label. In the case of label, callers
+// must pass a slice of subfields indicating the labels by which to group. Provider is used to define custom resource pricing.
+// See AggregationOptions for optional parameters.
+// TODO: Can we restructure custom pricing code to allow that to be optional? Having to pass an entire Provider instance is way
+// overkill and tightly couples this code to the cloud package.
+func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
+	dataCount := opts.DataCount
+	discount := opts.Discount
+	idleCoefficient := opts.IdleCoefficient
+	timeSeries := opts.IncludeTimeSeries
+	rate := opts.Rate
+	sr := opts.SharedResourceInfo
+
 	// aggregations collects key-value pairs of resource group-to-aggregated data
 	// e.g. namespace-to-data or label-value-to-data
 	aggregations := make(map[string]*Aggregation)
@@ -215,6 +238,10 @@ func mergeVectors(cp cloud.Provider, costDatum *CostData, aggregation *Aggregati
 	aggregation.RAMAllocation = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocation)
 	aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
 
+	aggregation.CPURequestVector = addVectors(costDatum.CPUReq, aggregation.CPURequestVector)
+	aggregation.RAMRequestVector = addVectors(costDatum.RAMReq, aggregation.RAMRequestVector)
+	// TODO nikovacevic-agg-efficiency
+
 	cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
 	aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
 	aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
@@ -326,65 +353,84 @@ func totalVector(vectors []*Vector) float64 {
 	return total
 }
 
-func addVectors(req []*Vector, used []*Vector) []*Vector {
-	if req == nil || len(req) == 0 {
-		for _, usedV := range used {
-			if usedV.Timestamp == 0 {
-				continue
-			}
-			usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
+// roundTimestamp rounds the given timestamp to the given precision; e.g. a
+// timestamp given in seconds, rounded to precision 10, will be rounded
+// to the nearest value dividible by 10 (24 goes to 20, but 25 goes to 30).
+func roundTimestamp(ts float64, precision float64) float64 {
+	return math.Round(ts/precision) * precision
+}
+
+// addVectors adds two slices of Vectors. Vector timestamps are rounded to the
+// nearest ten seconds to allow matching of Vectors within a delta allowance.
+// Matching Vectors are summed, while unmatched Vectors are passed through.
+// e.g. [(t=1, 1), (t=2, 2)] + [(t=2, 2), (t=3, 3)] = [(t=1, 1), (t=2, 4), (t=3, 3)]
+func addVectors(xvs []*Vector, yvs []*Vector) []*Vector {
+	// round all non-zero timestamps to the nearest 10 second mark
+	for _, yv := range yvs {
+		if yv.Timestamp != 0 {
+			yv.Timestamp = roundTimestamp(yv.Timestamp, 10.0)
 		}
-		return used
 	}
-	if used == nil || len(used) == 0 {
-		for _, reqV := range req {
-			if reqV.Timestamp == 0 {
-				continue
-			}
-			reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
+	for _, xv := range xvs {
+		if xv.Timestamp != 0 {
+			xv.Timestamp = roundTimestamp(xv.Timestamp, 10.0)
 		}
-		return req
 	}
-	var allocation []*Vector
 
+	// if xvs is empty, return yvs
+	if xvs == nil || len(xvs) == 0 {
+		return yvs
+	}
+
+	// if yvs is empty, return xvs
+	if yvs == nil || len(yvs) == 0 {
+		return xvs
+	}
+
+	// sum stores the sum of the vector slices xvs and yvs
+	var sum []*Vector
+
+	// timestamps stores all timestamps present in both vector slices
+	// without duplicates
 	var timestamps []float64
-	reqMap := make(map[float64]float64)
-	for _, reqV := range req {
-		if reqV.Timestamp == 0 {
+
+	// turn each vector slice into a map of timestamp-to-value so that
+	// values at equal timestamps can be lined-up and summed
+	xMap := make(map[float64]float64)
+	for _, xv := range xvs {
+		if xv.Timestamp == 0 {
 			continue
 		}
-		reqV.Timestamp = math.Round(reqV.Timestamp/10) * 10
-		reqMap[reqV.Timestamp] = reqV.Value
-		timestamps = append(timestamps, reqV.Timestamp)
+		xMap[xv.Timestamp] = xv.Value
+		timestamps = append(timestamps, xv.Timestamp)
 	}
-	usedMap := make(map[float64]float64)
-	for _, usedV := range used {
-		if usedV.Timestamp == 0 {
+	yMap := make(map[float64]float64)
+	for _, yv := range yvs {
+		if yv.Timestamp == 0 {
 			continue
 		}
-		usedV.Timestamp = math.Round(usedV.Timestamp/10) * 10
-		usedMap[usedV.Timestamp] = usedV.Value
-		if _, ok := reqMap[usedV.Timestamp]; !ok { // no need to double add, since we'll range over sorted timestamps and check.
-			timestamps = append(timestamps, usedV.Timestamp)
+		yMap[yv.Timestamp] = yv.Value
+		if _, ok := xMap[yv.Timestamp]; !ok {
+			// no need to double add, since we'll range over sorted timestamps and check.
+			timestamps = append(timestamps, yv.Timestamp)
 		}
 	}
 
+	// iterate over each timestamp to produce a final summed vector slice
 	sort.Float64s(timestamps)
 	for _, t := range timestamps {
-		rv, okR := reqMap[t]
-		uv, okU := usedMap[t]
-		allocationVector := &Vector{
-			Timestamp: t,
-		}
-		if okR && okU {
-			allocationVector.Value = rv + uv
-		} else if okR {
-			allocationVector.Value = rv
-		} else if okU {
-			allocationVector.Value = uv
+		x, okX := xMap[t]
+		y, okY := yMap[t]
+		sv := &Vector{Timestamp: t}
+		if okX && okY {
+			sv.Value = x + y
+		} else if okX {
+			sv.Value = x
+		} else if okY {
+			sv.Value = y
 		}
-		allocation = append(allocation, allocationVector)
+		sum = append(sum, sv)
 	}
 
-	return allocation
+	return sum
 }

+ 3 - 3
costmodel/costmodel.go

@@ -74,9 +74,9 @@ type CostData struct {
 	Jobs            []string                     `json:"jobs,omitempty"`
 	RAMReq          []*Vector                    `json:"ramreq,omitempty"`
 	RAMUsed         []*Vector                    `json:"ramused,omitempty"`
+	RAMAllocation   []*Vector                    `json:"ramallocated,omitempty"`
 	CPUReq          []*Vector                    `json:"cpureq,omitempty"`
 	CPUUsed         []*Vector                    `json:"cpuused,omitempty"`
-	RAMAllocation   []*Vector                    `json:"ramallocated,omitempty"`
 	CPUAllocation   []*Vector                    `json:"cpuallocated,omitempty"`
 	GPUReq          []*Vector                    `json:"gpureq,omitempty"`
 	PVCData         []*PersistentVolumeClaimData `json:"pvcData,omitempty"`
@@ -1873,7 +1873,7 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 		klog.V(3).Infof("%s", w)
 	}
 	if err != nil {
-		return nil, fmt.Errorf("%s Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
 	}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
@@ -1903,7 +1903,7 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
 		}
 
-		return nil, fmt.Errorf("%s Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
 	}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)

+ 21 - 3
costmodel/router.go

@@ -212,7 +212,12 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		dataCount := int64(dur.Hours()) + 1
 		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
 
-		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, 1.0, nil)
+		opts := &AggregationOptions{
+			DataCount:       dataCount,
+			Discount:        discount,
+			IdleCoefficient: 1.0,
+		}
+		agg := AggregateCostData(data, aggregationField, subfields, a.Cloud, opts)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -448,7 +453,15 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	}
 
 	// aggregate cost model data by given fields and cache the result for the default expiration
-	result := AggregateCostData(a.Cloud, data, dataCount, field, subfields, rate, timeSeries, discount, idleCoefficient, sr)
+	opts := &AggregationOptions{
+		DataCount:          dataCount,
+		Discount:           discount,
+		IdleCoefficient:    idleCoefficient,
+		IncludeTimeSeries:  timeSeries,
+		Rate:               rate,
+		SharedResourceInfo: sr,
+	}
+	result := AggregateCostData(data, field, subfields, a.Cloud, opts)
 	a.Cache.Set(aggKey, result, cache.DefaultExpiration)
 
 	w.Write(wrapDataWithMessage(result, nil, fmt.Sprintf("cache miss: %s", aggKey)))
@@ -524,7 +537,12 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		dataCount := (int64(dur.Hours()) / windowHrs) + 1
 		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
 
-		agg := AggregateCostData(a.Cloud, data, dataCount, aggregationField, subfields, "", false, discount, 1.0, nil)
+		opts := &AggregationOptions{
+			DataCount:       dataCount,
+			Discount:        discount,
+			IdleCoefficient: 1.0,
+		}
+		agg := AggregateCostData(data, aggregationField, subfields, a.Cloud, opts)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {

+ 1 - 4
test/aggregation_test.go

@@ -104,13 +104,10 @@ func TestAggregation(t *testing.T) {
 	costData["test1,foo,nginx,testnode"] = cd1
 	costData["test1,bar,nginx,testnode"] = cd2
 
-	dataCount := int64(1)
-
 	field := "namespace"
 	subfields := []string{""}
-	rate := ""
 
-	agg := costModel.AggregateCostData(cp, costData, dataCount, field, subfields, rate, false, 0.0, 1.0, nil)
+	agg := costModel.AggregateCostData(costData, field, subfields, cp, nil)
 	log.Printf("agg: %+v", agg["test1"])
 	assert.Equal(t, agg["test1"].TotalCost, 8.0)
 }

+ 3 - 3
test/historical_pod_test.go

@@ -232,7 +232,7 @@ func TestPodUpDown(t *testing.T) {
 		panic(err)
 	}
 
-	agg := costModel.AggregateCostData(provider, data, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
+	agg := costModel.AggregateCostData(data, "namespace", []string{""}, provider, nil)
 	_, ok := agg["test"]
 	assert.Assert(t, ok)
 
@@ -241,11 +241,11 @@ func TestPodUpDown(t *testing.T) {
 		panic(err)
 	}
 
-	agg2 := costModel.AggregateCostData(provider, data2, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
+	agg2 := costModel.AggregateCostData(data2, "namespace", []string{""}, provider, nil)
 	_, ok2 := agg2["test"]
 	assert.Assert(t, ok2)
 
-	agg3 := costModel.AggregateCostData(provider, data, 1, "label", []string{"testaggregation"}, "", false, 0.0, 1.0, nil)
+	agg3 := costModel.AggregateCostData(data, "label", []string{"testaggregation"}, provider, nil)
 	_, ok3 := agg3["foo"]
 	assert.Assert(t, ok3)
 }

+ 2 - 2
test/remote_cluster_test.go

@@ -68,8 +68,8 @@ func TestClusterConvergence(t *testing.T) {
 		panic(err)
 	}
 
-	agg := costModel.AggregateCostData(provider, data, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
-	agg2 := costModel.AggregateCostData(provider, data2, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
+	agg := costModel.AggregateCostData(data, "namespace", []string{""}, provider, nil)
+	agg2 := costModel.AggregateCostData(data2, "namespace", []string{""}, provider, nil)
 
 	assert.Equal(t, agg["kubecost"].TotalCost, agg2["kubecost"].TotalCost)