Browse Source

Resolve merge conflicts

Niko Kovacevic 6 years ago
parent
commit
8663c8616a

+ 63 - 39
costmodel/aggregations.go

@@ -81,54 +81,64 @@ func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, label
 	return sr
 }
 
-func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset string) (float64, error) {
+func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset string) (map[string]float64, error) {
+
+	coefficients := make(map[string]float64)
+
 	windowDuration, err := time.ParseDuration(windowString)
 	if err != nil {
-		return 0.0, err
-	}
-	totals, err := ClusterCosts(cli, cp, windowString, offset)
-	if err != nil {
-		return 0.0, err
-	}
-	cpuCost, err := strconv.ParseFloat(totals.CPUCost[0][1], 64)
-	if err != nil {
-		return 0.0, err
+		return nil, err
 	}
-	memCost, err := strconv.ParseFloat(totals.MemCost[0][1], 64)
-	if err != nil {
-		return 0.0, err
-	}
-	storageCost, err := strconv.ParseFloat(totals.StorageCost[0][1], 64)
+
+	aggregateContainerCosts := AggregateCostData(costData, "cluster", []string{}, cp, &AggregationOptions{Discount: discount})
+	allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
 	if err != nil {
-		return 0.0, err
+		return nil, err
 	}
-	totalClusterCost := (cpuCost * (1 - discount)) + (memCost * (1 - discount)) + storageCost
-	if err != nil || totalClusterCost == 0.0 {
-		return 0.0, err
-	}
-	totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours()
-	totalContainerCost := 0.0
-	for _, costDatum := range costData {
-		cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
-		totalContainerCost += totalVectors(cpuv)
-		totalContainerCost += totalVectors(ramv)
-		totalContainerCost += totalVectors(gpuv)
-		for _, pv := range pvvs {
-			totalContainerCost += totalVectors(pv)
+	for cid, totals := range allTotals {
+
+		cpuCost, err := strconv.ParseFloat(totals.CPUCost[0][1], 64)
+		if err != nil {
+			return nil, err
 		}
+		memCost, err := strconv.ParseFloat(totals.MemCost[0][1], 64)
+		if err != nil {
+			return nil, err
+		}
+		storageCost, err := strconv.ParseFloat(totals.StorageCost[0][1], 64)
+		if err != nil {
+			return nil, err
+		}
+		totalClusterCost := (cpuCost * (1 - discount)) + (memCost * (1 - discount)) + storageCost
+		if err != nil || totalClusterCost == 0.0 {
+			return nil, err
+		}
+		totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours()
+		totalContainerCost := 0.0
+		for _, costDatum := range costData {
+			cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
+			totalContainerCost += totalVectors(cpuv)
+			totalContainerCost += totalVectors(ramv)
+			totalContainerCost += totalVectors(gpuv)
+			for _, pv := range pvvs {
+				totalContainerCost += totalVectors(pv)
+			}
+		}
+
+		coefficients[cid] = aggregateContainerCosts[cid].TotalCost / totalClusterCostOverWindow
 	}
 
-	return (totalContainerCost / totalClusterCostOverWindow), nil
+	return coefficients, nil
 }
 
 // AggregationOptions provides optional parameters to AggregateCostData, allowing callers to perform more complex operations
 type AggregationOptions struct {
-	DataCount          int64   // number of cost data points expected; ensures proper rate calculation if data is incomplete
-	Discount           float64 // percent by which to discount CPU, RAM, and GPU cost
-	IdleCoefficient    float64 // scales costs by amount of idle resources
-	IncludeEfficiency  bool    // set to true to receive efficiency/usage data
-	IncludeTimeSeries  bool    // set to true to receive time series data
-	Rate               string  // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
+	DataCount          int64              // number of cost data points expected; ensures proper rate calculation if data is incomplete
+	Discount           float64            // percent by which to discount CPU, RAM, and GPU cost
+	IdleCoefficients   map[string]float64 // scales costs by amount of idle resources on a per-cluster basis
+	IncludeEfficiency  bool               // set to true to receive efficiency/usage data
+	IncludeTimeSeries  bool               // set to true to receive time series data
+	Rate               string             // set to "hourly", "daily", or "monthly" to receive cost rate, rather than cumulative cost
 	SharedResourceInfo *SharedResourceInfo
 }
 
@@ -140,12 +150,16 @@ type AggregationOptions struct {
 func AggregateCostData(costData map[string]*CostData, field string, subfields []string, cp cloud.Provider, opts *AggregationOptions) map[string]*Aggregation {
 	dataCount := opts.DataCount
 	discount := opts.Discount
-	idleCoefficient := opts.IdleCoefficient
+	idleCoefficients := opts.IdleCoefficients
 	includeTimeSeries := opts.IncludeTimeSeries
 	includeEfficiency := opts.IncludeEfficiency
 	rate := opts.Rate
 	sr := opts.SharedResourceInfo
 
+	if idleCoefficients == nil {
+		idleCoefficients = make(map[string]float64)
+	}
+
 	// aggregations collects key-value pairs of resource group-to-aggregated data
 	// e.g. namespace-to-data or label-value-to-data
 	aggregations := make(map[string]*Aggregation)
@@ -155,6 +169,10 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 	sharedResourceCost := 0.0
 
 	for _, costDatum := range costData {
+		idleCoefficient, ok := idleCoefficients[costDatum.ClusterID]
+		if !ok {
+			idleCoefficient = 1.0
+		}
 		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
 			cpuv, ramv, gpuv, pvvs, netv := getPriceVectors(cp, costDatum, rate, discount, idleCoefficient)
 			sharedResourceCost += totalVectors(cpuv)
@@ -171,11 +189,15 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 				aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace, discount, idleCoefficient)
 			} else if field == "service" {
 				if len(costDatum.Services) > 0 {
-					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Services[0], discount, idleCoefficient)
+					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Services[0], discount, idleCoefficient)
 				}
 			} else if field == "deployment" {
 				if len(costDatum.Deployments) > 0 {
-					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Deployments[0], discount, idleCoefficient)
+					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Deployments[0], discount, idleCoefficient)
+				}
+			} else if field == "daemonset" {
+				if len(costDatum.Daemonsets) > 0 {
+					aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.Daemonsets[0], discount, idleCoefficient)
 				}
 			} else if field == "label" {
 				if costDatum.Labels != nil {
@@ -186,6 +208,8 @@ func AggregateCostData(costData map[string]*CostData, field string, subfields []
 						}
 					}
 				}
+			} else if field == "pod" {
+				aggregateDatum(cp, aggregations, costDatum, field, subfields, rate, costDatum.Namespace+"/"+costDatum.PodName, discount, idleCoefficient)
 			}
 		}
 	}

+ 118 - 31
costmodel/cluster.go

@@ -2,6 +2,7 @@ package costmodel
 
 import (
 	"fmt"
+	"os"
 	"time"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
@@ -12,24 +13,24 @@ import (
 
 const (
 	queryClusterCores = `sum(
-		avg(kube_node_status_capacity_cpu_cores %s) by (node) * avg(node_cpu_hourly_cost %s) by (node) * 730 +
-		avg(node_gpu_hourly_cost %s) by (node) * 730
-	  )`
+		avg(kube_node_status_capacity_cpu_cores %s) by (node, cluster_id) * avg(node_cpu_hourly_cost %s) by (node, cluster_id) * 730 +
+		avg(node_gpu_hourly_cost %s) by (node, cluster_id) * 730
+	  ) by (cluster_id)`
 
 	queryClusterRAM = `sum(
-		avg(kube_node_status_capacity_memory_bytes %s) by (node) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node) * 730
-	  )`
+		avg(kube_node_status_capacity_memory_bytes %s) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node, cluster_id) * 730
+	  ) by (cluster_id)`
 
 	queryStorage = `sum(
-		avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume) * 730 
-		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume) / 1024 / 1024 / 1024
-	  ) %s`
+		avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730 
+		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
+	  ) by (cluster_id) %s`
 
-	queryTotal = `sum(avg(node_total_hourly_cost) by (node)) * 730 +
+	queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
 	  sum(
-		avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume) * 730 
-		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume) / 1024 / 1024 / 1024
-	  ) %s`
+		avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730 
+		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
+	  ) by (cluster_id) %s`
 )
 
 type Totals struct {
@@ -79,14 +80,16 @@ func resultToTotals(qr interface{}) ([][]string, error) {
 	return totals, nil
 }
 
-func resultToTotal(qr interface{}) ([][]string, error) {
+func resultToTotal(qr interface{}) (map[string][][]string, error) {
+	defaultClusterID := os.Getenv(clusterIDKey)
+
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)
 		if err != nil {
 			return nil, err
 		}
-		return nil, fmt.Errorf(e)
+		return nil, fmt.Errorf("Prometheus query error: %s", e)
 	}
 	r, ok := data.(map[string]interface{})["result"]
 	if !ok {
@@ -99,22 +102,104 @@ func resultToTotal(qr interface{}) ([][]string, error) {
 	if len(results) == 0 {
 		return nil, fmt.Errorf("Not enough data available in the selected time range")
 	}
-	val, ok := results[0].(map[string]interface{})["value"]
-	totals := [][]string{}
-	if !ok {
-		return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
+	toReturn := make(map[string][][]string)
+	for i := range results {
+		metrics, ok := results[i].(map[string]interface{})["metric"]
+		if !ok {
+			return nil, fmt.Errorf("Improperly formatted results from prometheus, metric is not a field in the vector")
+		}
+		metricMap, ok := metrics.(map[string]interface{})
+		cid, ok := metricMap["cluster_id"]
+		if !ok {
+			klog.V(4).Info("Prometheus vector does not have cluster id")
+			cid = defaultClusterID
+		}
+		clusterID, ok := cid.(string)
+		if !ok {
+			return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
+		}
+
+		val, ok := results[i].(map[string]interface{})["value"]
+		if !ok {
+			return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
+		}
+		dataPoint, ok := val.([]interface{})
+		if !ok || len(dataPoint) != 2 {
+			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		}
+		d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
+		toAppend := []string{
+			d0,
+			dataPoint[1].(string),
+		}
+		if t, ok := toReturn[clusterID]; ok {
+			t = append(t, toAppend)
+		} else {
+			toReturn[clusterID] = [][]string{toAppend}
+		}
 	}
-	dataPoint, ok := val.([]interface{})
-	if !ok || len(dataPoint) != 2 {
-		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+	return toReturn, nil
+}
+
+// ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
+func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
+
+	offset = fmt.Sprintf("offset 3h") // Set offset to 3h for block sync
+
+	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, "")
+
+	resultClusterCores, err := Query(cli, qCores)
+	if err != nil {
+		return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
 	}
-	d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
-	toAppend := []string{
-		d0,
-		dataPoint[1].(string),
+	resultClusterRAM, err := Query(cli, qRAM)
+	if err != nil {
+		return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
 	}
-	totals = append(totals, toAppend)
-	return totals, nil
+
+	resultStorage, err := Query(cli, qStorage)
+	if err != nil {
+		return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
+	}
+
+	toReturn := make(map[string]*Totals)
+
+	coreTotal, err := resultToTotal(resultClusterCores)
+	if err != nil {
+		return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
+	}
+	for clusterID, total := range coreTotal {
+		if _, ok := toReturn[clusterID]; !ok {
+			toReturn[clusterID] = &Totals{}
+		}
+		toReturn[clusterID].CPUCost = total
+	}
+
+	ramTotal, err := resultToTotal(resultClusterRAM)
+	if err != nil {
+		return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
+	}
+	for clusterID, total := range ramTotal {
+		if _, ok := toReturn[clusterID]; !ok {
+			toReturn[clusterID] = &Totals{}
+		}
+		toReturn[clusterID].MemCost = total
+	}
+
+	storageTotal, err := resultToTotal(resultStorage)
+	if err != nil {
+		return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
+	}
+	for clusterID, total := range storageTotal {
+		if _, ok := toReturn[clusterID]; !ok {
+			toReturn[clusterID] = &Totals{}
+		}
+		toReturn[clusterID].StorageCost = total
+	}
+
+	return toReturn, nil
 }
 
 // ClusterCosts gives the current full cluster costs averaged over a window of time.
@@ -177,11 +262,13 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 		return nil, err
 	}
 
+	defaultClusterID := os.Getenv(clusterIDKey)
+
 	return &Totals{
-		TotalCost:   clusterTotal,
-		CPUCost:     coreTotal,
-		MemCost:     ramTotal,
-		StorageCost: storageTotal,
+		TotalCost:   clusterTotal[defaultClusterID],
+		CPUCost:     coreTotal[defaultClusterID],
+		MemCost:     ramTotal[defaultClusterID],
+		StorageCost: storageTotal[defaultClusterID],
 	}, nil
 }
 

+ 19 - 19
costmodel/costmodel.go

@@ -246,7 +246,7 @@ func ComputeUptimes(cli prometheusClient.Client) (map[string]float64, error) {
 	if err != nil {
 		return nil, err
 	}
-	vectors, err := GetContainerMetricVector(res, false, 0)
+	vectors, err := GetContainerMetricVector(res, false, 0, os.Getenv(clusterIDKey))
 	if err != nil {
 		return nil, err
 	}
@@ -396,7 +396,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	containerNameCost := make(map[string]*CostData)
 	containers := make(map[string]bool)
 
-	RAMReqMap, err := GetContainerMetricVector(resultRAMRequests, true, normalizationValue)
+	RAMReqMap, err := GetContainerMetricVector(resultRAMRequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
@@ -404,28 +404,28 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		containers[key] = true
 	}
 
-	RAMUsedMap, err := GetContainerMetricVector(resultRAMUsage, true, normalizationValue)
+	RAMUsedMap, err := GetContainerMetricVector(resultRAMUsage, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range RAMUsedMap {
 		containers[key] = true
 	}
-	CPUReqMap, err := GetContainerMetricVector(resultCPURequests, true, normalizationValue)
+	CPUReqMap, err := GetContainerMetricVector(resultCPURequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range CPUReqMap {
 		containers[key] = true
 	}
-	GPUReqMap, err := GetContainerMetricVector(resultGPURequests, true, normalizationValue)
+	GPUReqMap, err := GetContainerMetricVector(resultGPURequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := GetContainerMetricVector(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
+	CPUUsedMap, err := GetContainerMetricVector(resultCPUUsage, false, 0, clusterID) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
 	}
@@ -1296,7 +1296,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 	containerNameCost := make(map[string]*CostData)
 	containers := make(map[string]bool)
 
-	RAMReqMap, err := GetContainerMetricVectors(resultRAMRequests, true, normalizationValue)
+	RAMReqMap, err := GetContainerMetricVectors(resultRAMRequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
@@ -1304,28 +1304,28 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		containers[key] = true
 	}
 
-	RAMUsedMap, err := GetContainerMetricVectors(resultRAMUsage, true, normalizationValue)
+	RAMUsedMap, err := GetContainerMetricVectors(resultRAMUsage, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range RAMUsedMap {
 		containers[key] = true
 	}
-	CPUReqMap, err := GetContainerMetricVectors(resultCPURequests, true, normalizationValue)
+	CPUReqMap, err := GetContainerMetricVectors(resultCPURequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range CPUReqMap {
 		containers[key] = true
 	}
-	GPUReqMap, err := GetContainerMetricVectors(resultGPURequests, true, normalizationValue)
+	GPUReqMap, err := GetContainerMetricVectors(resultGPURequests, true, normalizationValue, clusterID)
 	if err != nil {
 		return nil, err
 	}
 	for key := range GPUReqMap {
 		containers[key] = true
 	}
-	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, 0) // No need to normalize here, as this comes from a counter
+	CPUUsedMap, err := GetContainerMetricVectors(resultCPUUsage, false, 0, clusterID) // No need to normalize here, as this comes from a counter
 	if err != nil {
 		return nil, err
 	}
@@ -1875,12 +1875,12 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 		return nil, err
 	}
 
-	resp, body, warnings, err := cli.Do(context.Background(), req)
+	_, body, warnings, err := cli.Do(context.Background(), req)
 	for _, w := range warnings {
 		klog.V(3).Infof("%s", w)
 	}
 	if err != nil {
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
 	}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
@@ -2000,7 +2000,7 @@ func newContainerMetricsFromPod(pod v1.Pod, clusterID string) ([]*ContainerMetri
 	return cs, nil
 }
 
-func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*ContainerMetric, error) {
+func newContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClusterID string) (*ContainerMetric, error) {
 	cName, ok := metrics["container_name"]
 	if !ok {
 		return nil, fmt.Errorf("Prometheus vector does not have container name")
@@ -2037,7 +2037,7 @@ func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*Containe
 	cid, ok := metrics["cluster_id"]
 	if !ok {
 		klog.V(4).Info("Prometheus vector does not have cluster id")
-		cid = ""
+		cid = defaultClusterID
 	}
 	clusterID, ok := cid.(string)
 	if !ok {
@@ -2052,7 +2052,7 @@ func newContainerMetricFromPrometheus(metrics map[string]interface{}) (*Containe
 	}, nil
 }
 
-func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
+func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*Vector, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)
@@ -2075,7 +2075,7 @@ func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue
 		if !ok {
 			return nil, fmt.Errorf("Prometheus vector does not have metric labels")
 		}
-		containerMetric, err := newContainerMetricFromPrometheus(metric)
+		containerMetric, err := newContainerMetricFromPrometheus(metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}
@@ -2102,7 +2102,7 @@ func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue
 	return containerData, nil
 }
 
-func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64) (map[string][]*Vector, error) {
+func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*Vector, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)
@@ -2125,7 +2125,7 @@ func GetContainerMetricVectors(qr interface{}, normalize bool, normalizationValu
 		if !ok {
 			return nil, fmt.Errorf("Prometheus vector does not have metric labels")
 		}
-		containerMetric, err := newContainerMetricFromPrometheus(metric)
+		containerMetric, err := newContainerMetricFromPrometheus(metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}

+ 12 - 9
costmodel/router.go

@@ -213,9 +213,9 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
 
 		opts := &AggregationOptions{
-			DataCount:       dataCount,
-			Discount:        discount,
-			IdleCoefficient: 1.0,
+			DataCount:        dataCount,
+			Discount:         discount,
+			IdleCoefficients: make(map[string]float64),
 		}
 		agg := AggregateCostData(data, aggregationField, subfields, a.Cloud, opts)
 		w.Write(wrapData(agg, nil))
@@ -430,10 +430,13 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	}
 	discount = discount * 0.01
 
-	idleCoefficient := 1.0
+	idleCoefficients := make(map[string]float64)
 	if allocateIdle {
 		windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
-		idleCoefficient, err = ComputeIdleCoefficient(data, a.PrometheusClient, a.Cloud, discount, windowStr, offset)
+		if a.ThanosClient != nil {
+			offset = "3h"
+		}
+		idleCoefficients, err = ComputeIdleCoefficient(data, pClient, a.Cloud, discount, windowStr, offset)
 		if err != nil {
 			klog.V(1).Infof("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
 			w.Write(wrapData(nil, err))
@@ -464,7 +467,7 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	opts := &AggregationOptions{
 		DataCount:          dataCount,
 		Discount:           discount,
-		IdleCoefficient:    idleCoefficient,
+		IdleCoefficients:   idleCoefficients,
 		IncludeEfficiency:  includeEfficiency,
 		IncludeTimeSeries:  includeTimeSeries,
 		Rate:               rate,
@@ -547,9 +550,9 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		klog.V(1).Infof("for duration %s dataCount = %d", dur.String(), dataCount)
 
 		opts := &AggregationOptions{
-			DataCount:       dataCount,
-			Discount:        discount,
-			IdleCoefficient: 1.0,
+			DataCount:        dataCount,
+			Discount:         discount,
+			IdleCoefficients: make(map[string]float64),
 		}
 		agg := AggregateCostData(data, aggregationField, subfields, a.Cloud, opts)
 		w.Write(wrapData(agg, nil))

+ 0 - 2
test/cluster_test.go

@@ -25,8 +25,6 @@ import (
 	"log"
 )
 
-const address = "http://localhost:9003"
-
 const apiPrefix = "/api/v1"
 
 const epQuery = apiPrefix + "/query"

+ 31 - 10
test/historical_pod_test.go

@@ -169,7 +169,7 @@ func TestPodUpDown(t *testing.T) {
 	klog.Infof("Sleeping 5 minutes to wait for steady state.")
 	time.Sleep(5 * time.Minute)
 
-	qr := `label_replace(label_replace(container_cpu_allocation{container='web',namespace='test'}, "container_name", "$1", "container","(.+)"), "pod_name", "$1", "pod","(.+)")`
+	qr := `label_replace(label_replace(container_cpu_allocation{container='web',namespace='test2'}, "container_name", "$1", "container","(.+)"), "pod_name", "$1", "pod","(.+)")`
 
 	end := time.Now()
 	start := end.Add(-1 * time.Duration(3*time.Minute))
@@ -180,16 +180,20 @@ func TestPodUpDown(t *testing.T) {
 		panic(err)
 	}
 
-	vectors, err := costModel.GetContainerMetricVectors(res, false, 0)
+	vectors, err := costModel.GetContainerMetricVectors(res, false, 0, "cluster-one")
 	if err != nil {
 		panic(err)
 	}
-
-	assert.Check(t, len(vectors) > 0)
+	klog.Infof("Found Vectors %+v", vectors)
+	if !(len(vectors) > 0) {
+		panic("Expected vectors to have data")
+	}
 	for _, values := range vectors {
 		assert.Check(t, len(values) > 0)
 		for _, vector := range values {
-			assert.Check(t, vector.Value == 0.25 || vector.Value == 0.125) // It's halved for fractional minute normalization.
+			if vector.Value != 0.25 && vector.Value != 0.125 { // It's halved for fractional minute normalization.
+				panic(fmt.Sprintf("Expected %f to equal 0.25", vector.Value))
+			}
 		}
 	}
 
@@ -197,10 +201,13 @@ func TestPodUpDown(t *testing.T) {
 	deleteOptions := &metav1.DeleteOptions{
 		PropagationPolicy: &deletePolicy,
 	}
+
+	klog.Infof("Deleting deployment in namespace test2")
 	if err := client.Resource(deploymentRes).Namespace("test2").Delete("demo-deployment", deleteOptions); err != nil {
 		panic(err)
 	}
 
+	klog.Infof("Sleeping 5 minutes to wait for steady state.")
 	time.Sleep(5 * time.Minute)
 
 	res, err = costModel.Query(promCli, qr)
@@ -208,13 +215,17 @@ func TestPodUpDown(t *testing.T) {
 		panic(err)
 	}
 
-	vectors, err = costModel.GetContainerMetricVector(res, false, 0)
+	vectors, err = costModel.GetContainerMetricVector(res, false, 0, "cluster-one")
 	if err != nil {
 		panic(err)
 	}
-	assert.Equal(t, len(vectors), 0)
-	provider := &cloud.CustomProvider{
-		Clientset: rclient,
+	if len(vectors) != 0 {
+		panic("Pods are not gone from namespace test2 data")
+	}
+	klog.Infof("Validated that pods are gone from namespace test2 data")
+	provider, err := cloud.NewProvider(rclient, os.Getenv("CLOUD_PROVIDER_API_KEY"))
+	if err != nil {
+		panic(err)
 	}
 	loc, _ := time.LoadLocation("UTC")
 	endTime := time.Now().In(loc)
@@ -235,6 +246,10 @@ func TestPodUpDown(t *testing.T) {
 	agg := costModel.AggregateCostData(data, "namespace", []string{""}, provider, nil)
 	_, ok := agg["test"]
 	assert.Assert(t, ok)
+	_, ok = agg["test2"]
+	if !ok {
+		panic("No test2 namespace!")
+	}
 
 	data2, err := cm.ComputeCostData(promCli, rclient, provider, "10m", "", "")
 	if err != nil {
@@ -244,8 +259,14 @@ func TestPodUpDown(t *testing.T) {
 	agg2 := costModel.AggregateCostData(data2, "namespace", []string{""}, provider, nil)
 	_, ok2 := agg2["test"]
 	assert.Assert(t, ok2)
+	_, ok2 = agg2["test2"]
+	if !ok2 {
+		panic("No test2 namespace!")
+	}
 
 	agg3 := costModel.AggregateCostData(data, "label", []string{"testaggregation"}, provider, nil)
 	_, ok3 := agg3["foo"]
-	assert.Assert(t, ok3)
+	if !ok3 {
+		panic("No label foo aggregate!")
+	}
 }

+ 12 - 0
test/kubernetes/cluster-role-binding.yaml

@@ -0,0 +1,12 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: cost-model
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: cost-model
+subjects:
+  - kind: ServiceAccount
+    name: cost-model
+    namespace: default

+ 80 - 0
test/kubernetes/cluster-role.yaml

@@ -0,0 +1,80 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  name: cost-model 
+rules:
+  - apiGroups:
+      - ''
+    resources:
+      - configmaps
+      - deployments
+      - nodes
+      - pods
+      - services
+      - resourcequotas
+      - replicationcontrollers
+      - limitranges
+      - persistentvolumeclaims
+      - persistentvolumes
+      - namespaces
+      - endpoints
+    verbs:
+      - get
+      - list
+      - watch
+      - create
+  - apiGroups:
+      - extensions
+    resources:
+      - daemonsets
+      - deployments
+      - replicasets
+    verbs:
+      - get
+      - list
+      - watch
+  - apiGroups:
+      - apps
+    resources:
+      - statefulsets
+      - deployments
+      - daemonsets
+      - replicasets
+    verbs:
+      - list
+      - watch
+      - create
+      - delete
+  - apiGroups:
+      - batch
+    resources:
+      - cronjobs
+      - jobs
+    verbs:
+      - get
+      - list
+      - watch
+  - apiGroups:
+      - autoscaling
+    resources:
+      - horizontalpodautoscalers
+    verbs:
+      - get
+      - list
+      - watch
+  - apiGroups:
+      - policy
+    resources:
+      - poddisruptionbudgets
+    verbs:
+      - get
+      - list
+      - watch
+  - apiGroups: 
+      - storage.k8s.io
+    resources: 
+      - storageclasses
+    verbs:
+      - get
+      - list
+      - watch

+ 4 - 0
test/kubernetes/service-account.yaml

@@ -0,0 +1,4 @@
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: cost-model

+ 30 - 0
test/kubernetes/test-pod.yaml

@@ -0,0 +1,30 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+  name: cost-model-test
+  labels:
+    app: cost-model-test
+spec:
+  backoffLimit: 0
+  template:
+    metadata:
+      labels:
+        app: cost-model-test
+    spec:
+      restartPolicy: Never
+      serviceAccountName: cost-model
+      containers:
+        - image: ajaytripathy/kubecost-cost-model-integration:latest
+          name: cost-model
+          securityContext:
+            runAsUser: 0
+          resources:
+            requests:
+              cpu: "10m"
+              memory: "55M"
+          env:
+            - name: PROMETHEUS_SERVER_ENDPOINT
+              value: http://kubecost-prometheus-server.kubecost #The endpoint should have the form http://<service-name>.<namespace-name>.svc.cluster.local 
+            - name: CLOUD_PROVIDER_API_KEY
+              value: "AIzaSyDXQPG_MHUEy9neR7stolq6l0ujXmjJlvk" # The GCP Pricing API requires a key.
+          imagePullPolicy: Always

+ 3 - 14
test/remote_cluster_test.go

@@ -1,22 +1,10 @@
 package costmodel_test
 
 import (
-	"log"
-	"net"
-	"net/http"
-	"os"
-	"testing"
-	"time"
-
-	"github.com/kubecost/cost-model/cloud"
-	costModel "github.com/kubecost/cost-model/costmodel"
-	"gotest.tools/assert"
-
-	prometheusClient "github.com/prometheus/client_golang/api"
-
 	_ "k8s.io/client-go/plugin/pkg/client/auth"
 )
 
+/*
 func TestClusterConvergence(t *testing.T) {
 	rclient, err := getKubernetesClient()
 	if err != nil {
@@ -32,7 +20,7 @@ func TestClusterConvergence(t *testing.T) {
 	}
 
 	pc := prometheusClient.Config{
-		Address:      address,
+		Address:      os.Getenv(PROMETHEUS_SERVER_ENDPOINT),
 		RoundTripper: LongTimeoutRoundTripper,
 	}
 	promCli, err := prometheusClient.NewClient(pc)
@@ -74,3 +62,4 @@ func TestClusterConvergence(t *testing.T) {
 	assert.Equal(t, agg["kubecost"].TotalCost, agg2["kubecost"].TotalCost)
 
 }
+*/