Просмотр исходного кода

Merge pull request #177 from kubecost/AjayTripathy-idle-coefficient

Ajay tripathy idle coefficient
Ajay Tripathy 6 лет назад
Родитель
Сommit
862fa3e73a

+ 9 - 0
cloud/awsprovider.go

@@ -815,10 +815,18 @@ func (aws *AWS) NodePricing(k Key) (*Node, error) {
 func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	defaultClusterName := "AWS Cluster #1"
 	c, err := awsProvider.GetConfig()
+	remote := os.Getenv(remoteEnabled)
+	remoteEnabled := false
+	if os.Getenv(remote) == "true" {
+		remoteEnabled = true
+	}
+
 	if c.ClusterName != "" {
 		m := make(map[string]string)
 		m["name"] = c.ClusterName
 		m["provider"] = "AWS"
+		m["id"] = os.Getenv(KC_CLUSTER_ID)
+		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		return m, nil
 	}
 	makeStructure := func(clusterName string) (map[string]string, error) {
@@ -827,6 +835,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m["name"] = clusterName
 		m["provider"] = "AWS"
 		m["id"] = os.Getenv(KC_CLUSTER_ID)
+		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		return m, nil
 	}
 

+ 15 - 1
cloud/azureprovider.go

@@ -326,7 +326,8 @@ func (az *Azure) DownloadPricingData() error {
 	containerServiceClient := containerservice.NewContainerServicesClient(config.AzureSubscriptionID)
 	containerServiceClient.Authorizer = authorizer
 
-	rateCardFilter := "OfferDurableId eq 'MS-AZR-0003p' and Currency eq 'USD' and Locale eq 'en-US' and RegionInfo eq 'US'"
+	rateCardFilter := fmt.Sprintf("OfferDurableId eq 'MS-AZR-0003p' and Currency eq '%s' and Locale eq 'en-US' and RegionInfo eq '%s'", config.CurrencyCode, config.AzureBillingRegion)
+	klog.Infof("Using ratecard query %s", rateCardFilter)
 	result, err := rcClient.Get(context.TODO(), rateCardFilter)
 	if err != nil {
 		return err
@@ -488,6 +489,12 @@ func (*Azure) GetDisks() ([]byte, error) {
 }
 
 func (az *Azure) ClusterInfo() (map[string]string, error) {
+	remote := os.Getenv(remoteEnabled)
+	remoteEnabled := false
+	if os.Getenv(remote) == "true" {
+		remoteEnabled = true
+	}
+
 	m := make(map[string]string)
 	m["name"] = "Azure Cluster #1"
 	c, err := az.GetConfig()
@@ -498,6 +505,7 @@ func (az *Azure) ClusterInfo() (map[string]string, error) {
 		m["name"] = c.ClusterName
 	}
 	m["provider"] = "azure"
+	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 	m["id"] = os.Getenv(KC_CLUSTER_ID)
 	return m, nil
 
@@ -554,6 +562,12 @@ func (az *Azure) GetConfig() (*CustomPricing, error) {
 	if c.Discount == "" {
 		c.Discount = "0%"
 	}
+	if c.CurrencyCode == "" {
+		c.CurrencyCode = "USD"
+	}
+	if c.AzureBillingRegion == "" {
+		c.AzureBillingRegion = "US"
+	}
 	if err != nil {
 		return nil, err
 	}

+ 6 - 6
cloud/gcpprovider.go

@@ -242,6 +242,11 @@ func (gcp *GCP) QuerySQL(query string) ([]*OutOfClusterAllocation, error) {
 
 // ClusterName returns the name of a GKE cluster, as provided by metadata.
 func (gcp *GCP) ClusterInfo() (map[string]string, error) {
+	remote := os.Getenv(remoteEnabled)
+	remoteEnabled := false
+	if os.Getenv(remote) == "true" {
+		remoteEnabled = true
+	}
 	metadataClient := metadata.NewClient(&http.Client{Transport: userAgentTransport{
 		userAgent: "kubecost",
 		base:      http.DefaultTransport,
@@ -264,6 +269,7 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m["name"] = attribute
 	m["provider"] = "GCP"
 	m["id"] = os.Getenv(KC_CLUSTER_ID)
+	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 	return m, nil
 }
 
@@ -615,12 +621,6 @@ func (gcp *GCP) parsePages(inputKeys map[string]Key, pvKeys map[string]PVKey) (m
 		return nil, err
 	}
 	returnPages := make(map[string]*GCPPricing)
-	for _, page := range pages {
-		klog.V(1).Infof("Page: %s : %+v", page)
-		for k, v := range page {
-			klog.V(1).Infof("Unmerged Page: %s : %+v", k, v)
-		}
-	}
 	for _, page := range pages {
 		for k, v := range page {
 			if val, ok := returnPages[k]; ok { //keys may need to be merged

+ 1 - 0
cloud/provider.go

@@ -205,6 +205,7 @@ type CustomPricing struct {
 	AzureClientID         string `json:"azureClientID"`
 	AzureClientSecret     string `json:"azureClientSecret"`
 	AzureTenantID         string `json:"azureTenantID"`
+	AzureBillingRegion    string `json:"azureBillingRegion"`
 	CurrencyCode          string `json:"currencyCode"`
 	Discount              string `json:"discount"`
 	ClusterName           string `json:"clusterName"`

+ 108 - 27
costmodel/aggregations.go

@@ -4,6 +4,10 @@ import (
 	"math"
 	"sort"
 	"strconv"
+	"time"
+
+	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
+	prometheusClient "github.com/prometheus/client_golang/api"
 )
 
 type Aggregation struct {
@@ -23,28 +27,104 @@ type Aggregation struct {
 	GPUCost            float64   `json:"gpuCost"`
 	PVCost             float64   `json:"pvCost"`
 	NetworkCost        float64   `json:"networkCost"`
+	SharedCost         float64   `json:"sharedCost"`
 	TotalCost          float64   `json:"totalCost"`
 }
 
-func AggregateCostModel(costData map[string]*CostData, discount float64, aggregationField string, aggregationSubField string) map[string]*Aggregation {
+type SharedResourceInfo struct {
+	ShareResources  bool
+	SharedNamespace map[string]bool
+	LabelSelectors  map[string]string
+}
+
+func (s *SharedResourceInfo) IsSharedResource(costDatum *CostData) bool {
+	if _, ok := s.SharedNamespace[costDatum.Namespace]; ok {
+		return true
+	}
+	for labelName, labelValue := range s.LabelSelectors {
+		if val, ok := costDatum.Labels[labelName]; ok {
+			if val == labelValue {
+				return true
+			}
+		}
+	}
+	return false
+}
+
+func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, labelnames []string, labelvalues []string) *SharedResourceInfo {
+	sr := &SharedResourceInfo{
+		ShareResources:  shareResources,
+		SharedNamespace: make(map[string]bool),
+		LabelSelectors:  make(map[string]string),
+	}
+	for _, ns := range sharedNamespaces {
+		sr.SharedNamespace[ns] = true
+	}
+	sr.SharedNamespace["kube-system"] = true // kube-system should be split by default
+	for i := range labelnames {
+		sr.LabelSelectors[labelnames[i]] = labelvalues[i]
+	}
+	return sr
+}
+
+func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, discount float64, windowString, offset string) (float64, error) {
+	windowDuration, err := time.ParseDuration(windowString)
+	if err != nil {
+		return 0.0, err
+	}
+	totals, err := ClusterCosts(cli, cloud, windowString, offset)
+	if err != nil {
+		return 0.0, err
+	}
+	totalClusterCost, err := strconv.ParseFloat(totals.TotalCost[0][1], 64)
+	if err != nil || totalClusterCost == 0.0 {
+		return 0.0, err
+	}
+	totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours() * (1 - discount)
+	totalContainerCost := 0.0
+	for _, costDatum := range costData {
+		cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount, 1)
+		totalContainerCost += totalVector(cpuv)
+		totalContainerCost += totalVector(ramv)
+		totalContainerCost += totalVector(gpuv)
+		for _, pv := range pvvs {
+			totalContainerCost += totalVector(pv)
+		}
+	}
+
+	return (totalContainerCost / totalClusterCostOverWindow), nil
+}
+
+func AggregateCostModel(costData map[string]*CostData, discount float64, idleCoefficient float64, sr *SharedResourceInfo, aggregationField string, aggregationSubField string) map[string]*Aggregation {
 	aggregations := make(map[string]*Aggregation)
+	sharedResourceCost := 0.0
 	for _, costDatum := range costData {
-		if aggregationField == "cluster" {
-			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations, discount)
-		} else if aggregationField == "namespace" {
-			aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations, discount)
-		} else if aggregationField == "service" {
-			if len(costDatum.Services) > 0 {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations, discount)
-			}
-		} else if aggregationField == "deployment" {
-			if len(costDatum.Deployments) > 0 {
-				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations, discount)
+		if sr != nil && sr.ShareResources && sr.IsSharedResource(costDatum) {
+			cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount, idleCoefficient)
+			sharedResourceCost += totalVector(cpuv)
+			sharedResourceCost += totalVector(ramv)
+			sharedResourceCost += totalVector(gpuv)
+			for _, pv := range pvvs {
+				sharedResourceCost += totalVector(pv)
 			}
-		} else if aggregationField == "label" {
-			if costDatum.Labels != nil {
-				if subfieldName, ok := costDatum.Labels[aggregationSubField]; ok {
-					aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations, discount)
+		} else {
+			if aggregationField == "cluster" {
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.ClusterID, aggregations, discount, idleCoefficient)
+			} else if aggregationField == "namespace" {
+				aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Namespace, aggregations, discount, idleCoefficient)
+			} else if aggregationField == "service" {
+				if len(costDatum.Services) > 0 {
+					aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Services[0], aggregations, discount, idleCoefficient)
+				}
+			} else if aggregationField == "deployment" {
+				if len(costDatum.Deployments) > 0 {
+					aggregationHelper(costDatum, aggregationField, aggregationSubField, costDatum.Deployments[0], aggregations, discount, idleCoefficient)
+				}
+			} else if aggregationField == "label" {
+				if costDatum.Labels != nil {
+					if subfieldName, ok := costDatum.Labels[aggregationSubField]; ok {
+						aggregationHelper(costDatum, aggregationField, aggregationSubField, subfieldName, aggregations, discount, idleCoefficient)
+					}
 				}
 			}
 		}
@@ -54,12 +134,13 @@ func AggregateCostModel(costData map[string]*CostData, discount float64, aggrega
 		agg.RAMCost = totalVector(agg.RAMCostVector)
 		agg.GPUCost = totalVector(agg.GPUCostVector)
 		agg.PVCost = totalVector(agg.PVCostVector)
-		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost
+		agg.SharedCost = sharedResourceCost / float64(len(aggregations))
+		agg.TotalCost = agg.CPUCost + agg.RAMCost + agg.GPUCost + agg.PVCost + agg.SharedCost
 	}
 	return aggregations
 }
 
-func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation, discount float64) {
+func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubField string, key string, aggregations map[string]*Aggregation, discount float64, idleCoefficient float64) {
 	if _, ok := aggregations[key]; !ok {
 		agg := &Aggregation{}
 		agg.Aggregator = aggregator
@@ -68,15 +149,15 @@ func aggregationHelper(costDatum *CostData, aggregator string, aggregatorSubFiel
 		agg.Cluster = costDatum.ClusterID
 		aggregations[key] = agg
 	}
-	mergeVectors(costDatum, aggregations[key], discount)
+	mergeVectors(costDatum, aggregations[key], discount, idleCoefficient)
 }
 
-func mergeVectors(costDatum *CostData, aggregation *Aggregation, discount float64) {
+func mergeVectors(costDatum *CostData, aggregation *Aggregation, discount float64, idleCoefficient float64) {
 	aggregation.CPUAllocation = addVectors(costDatum.CPUAllocation, aggregation.CPUAllocation)
 	aggregation.RAMAllocation = addVectors(costDatum.RAMAllocation, aggregation.RAMAllocation)
 	aggregation.GPUAllocation = addVectors(costDatum.GPUReq, aggregation.GPUAllocation)
 
-	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount)
+	cpuv, ramv, gpuv, pvvs := getPriceVectors(costDatum, discount, idleCoefficient)
 	aggregation.CPUCostVector = addVectors(cpuv, aggregation.CPUCostVector)
 	aggregation.RAMCostVector = addVectors(ramv, aggregation.RAMCostVector)
 	aggregation.GPUCostVector = addVectors(gpuv, aggregation.GPUCostVector)
@@ -85,13 +166,13 @@ func mergeVectors(costDatum *CostData, aggregation *Aggregation, discount float6
 	}
 }
 
-func getPriceVectors(costDatum *CostData, discount float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
+func getPriceVectors(costDatum *CostData, discount float64, idleCoefficient float64) ([]*Vector, []*Vector, []*Vector, [][]*Vector) {
 	cpuv := make([]*Vector, 0, len(costDatum.CPUAllocation))
 	for _, val := range costDatum.CPUAllocation {
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.VCPUCost, 64)
 		cpuv = append(cpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * cost * (1 - discount),
+			Value:     val.Value * cost * (1 - discount) * 1 / idleCoefficient,
 		})
 	}
 	ramv := make([]*Vector, 0, len(costDatum.RAMAllocation))
@@ -99,7 +180,7 @@ func getPriceVectors(costDatum *CostData, discount float64) ([]*Vector, []*Vecto
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.RAMCost, 64)
 		ramv = append(ramv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
+			Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount) * 1 / idleCoefficient,
 		})
 	}
 	gpuv := make([]*Vector, 0, len(costDatum.GPUReq))
@@ -107,7 +188,7 @@ func getPriceVectors(costDatum *CostData, discount float64) ([]*Vector, []*Vecto
 		cost, _ := strconv.ParseFloat(costDatum.NodeData.GPUCost, 64)
 		gpuv = append(gpuv, &Vector{
 			Timestamp: math.Round(val.Timestamp/10) * 10,
-			Value:     val.Value * cost * (1 - discount),
+			Value:     val.Value * cost * (1 - discount) * 1 / idleCoefficient,
 		})
 	}
 	pvvs := make([][]*Vector, 0, len(costDatum.PVCData))
@@ -118,7 +199,7 @@ func getPriceVectors(costDatum *CostData, discount float64) ([]*Vector, []*Vecto
 			for _, val := range pvcData.Values {
 				pvv = append(pvv, &Vector{
 					Timestamp: math.Round(val.Timestamp/10) * 10,
-					Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount),
+					Value:     (val.Value / 1024 / 1024 / 1024) * cost * (1 - discount) * 1 / idleCoefficient,
 				})
 			}
 			pvvs = append(pvvs, pvv)
@@ -196,4 +277,4 @@ func addVectors(req []*Vector, used []*Vector) []*Vector {
 	}
 
 	return allocation
-}
+}

+ 10 - 2
costmodel/cluster.go

@@ -42,7 +42,11 @@ type Totals struct {
 func resultToTotals(qr interface{}) ([][]string, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
-		return nil, fmt.Errorf("Improperly formatted response from prometheus, response %+v has no data field", data)
+		e, err := wrapPrometheusError(qr)
+		if err != nil {
+			return nil, err
+		}
+		return nil, fmt.Errorf(e)
 	}
 	r, ok := data.(map[string]interface{})["result"]
 	if !ok {
@@ -78,7 +82,11 @@ func resultToTotals(qr interface{}) ([][]string, error) {
 func resultToTotal(qr interface{}) ([][]string, error) {
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
-		return nil, fmt.Errorf("Improperly formatted response from prometheus, response %+v has no data field", data)
+		e, err := wrapPrometheusError(qr)
+		if err != nil {
+			return nil, err
+		}
+		return nil, fmt.Errorf(e)
 	}
 	r, ok := data.(map[string]interface{})["result"]
 	if !ok {

+ 43 - 34
costmodel/costmodel.go

@@ -646,30 +646,30 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	err = findDeletedNodeInfo(cli, missingNodes, window)
 
 	if err != nil {
-		return nil, err
+		klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 	}
 	err = findDeletedPodInfo(cli, missingContainers, window)
 	if err != nil {
-		return nil, err
+		klog.V(1).Infof("Error fetching historical pod data: %s", err.Error())
 	}
 	return containerNameCost, err
 }
 
 func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[string]*CostData, window string) error {
 	if len(missingContainers) > 0 {
-		q := make([]string, 0, len(missingContainers))
-		for key := range missingContainers {
-			cm, _ := NewContainerMetricFromKey(key)
-			q = append(q, cm.PodName)
-		}
-		l := strings.Join(q, "|")
-		queryHistoricalPodLabels := fmt.Sprintf(`kube_pod_labels{pod=~"%s"}[%s]`, l, window)
+		queryHistoricalPodLabels := fmt.Sprintf(`kube_pod_labels{}[%s]`, window)
 
 		podLabelsResult, err := Query(cli, queryHistoricalPodLabels)
 		if err != nil {
-			return fmt.Errorf("Error fetching historical pod labels: " + err.Error())
+			klog.V(1).Infof("Error parsing historical labels: %s", err.Error())
+		}
+		podLabels := make(map[string]map[string]string)
+		if podLabelsResult != nil {
+			podLabels, err = labelsFromPrometheusQuery(podLabelsResult)
+			if err != nil {
+				klog.V(1).Infof("Error parsing historical labels: %s", err.Error())
+			}
 		}
-		podLabels, err := labelsFromPrometheusQuery(podLabelsResult)
 		for key, costData := range missingContainers {
 			cm, _ := NewContainerMetricFromKey(key)
 			labels, ok := podLabels[cm.PodName]
@@ -691,29 +691,37 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 
 func labelsFromPrometheusQuery(qr interface{}) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	for _, val := range qr.(map[string]interface{})["data"].(map[string]interface{})["result"].([]interface{}) {
+	data, ok := qr.(map[string]interface{})["data"]
+	if !ok {
+		e, err := wrapPrometheusError(qr)
+		if err != nil {
+			return toReturn, err
+		}
+		return toReturn, fmt.Errorf(e)
+	}
+	for _, val := range data.(map[string]interface{})["result"].([]interface{}) {
 		metricInterface, ok := val.(map[string]interface{})["metric"]
 		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+			return toReturn, fmt.Errorf("Metric field does not exist in data result vector")
 		}
 		metricMap, ok := metricInterface.(map[string]interface{})
 		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
+			return toReturn, fmt.Errorf("Metric field is improperly formatted")
 		}
 		pod, ok := metricMap["pod"]
 		if !ok {
-			return nil, fmt.Errorf("pod field does not exist in data result vector")
+			return toReturn, fmt.Errorf("pod field does not exist in data result vector")
 		}
 		podName, ok := pod.(string)
 		if !ok {
-			return nil, fmt.Errorf("pod field is improperly formatted")
+			return toReturn, fmt.Errorf("pod field is improperly formatted")
 		}
 
 		for labelName, labelValue := range metricMap {
 			parsedLabelName := labelName
 			parsedLv, ok := labelValue.(string)
 			if !ok {
-				return nil, fmt.Errorf("label value is improperly formatted")
+				return toReturn, fmt.Errorf("label value is improperly formatted")
 			}
 			if strings.HasPrefix(parsedLabelName, "label_") {
 				l := strings.Replace(parsedLabelName, "label_", "", 1)
@@ -1103,7 +1111,7 @@ func getPodDeployments(cache ClusterCache, podList []*v1.Pod) (map[string]map[st
 }
 
 func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset kubernetes.Interface, cloud costAnalyzerCloud.Provider,
-	startString, endString, windowString string, filterNamespace string) (map[string]*CostData, error) {
+	startString, endString, windowString string, filterNamespace string, remoteEnabled bool) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
 	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
@@ -1133,8 +1141,7 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		return nil, err
 	}
 	clustID := os.Getenv(CLUSTER_ID)
-	remoteEnabled := os.Getenv(remoteEnabled)
-	if remoteEnabled == "true" {
+	if remoteEnabled == true {
 		remoteLayout := "2006-01-02T15:04:05Z"
 		remoteStartStr := start.Format(remoteLayout)
 		remoteEndStr := end.Format(remoteLayout)
@@ -1524,12 +1531,11 @@ func (cm *CostModel) ComputeCostDataRange(cli prometheusClient.Client, clientset
 		wStr := fmt.Sprintf("%dm", int(w.Minutes()))
 		err = findDeletedNodeInfo(cli, missingNodes, wStr)
 		if err != nil {
-			return nil, err
+			klog.V(1).Infof("Error fetching historical node data: %s", err.Error())
 		}
-		klog.Infof("Finding deleted pod info from range query:")
 		err = findDeletedPodInfo(cli, missingContainers, wStr)
 		if err != nil {
-			return nil, err
+			klog.V(1).Infof("Error fetching historical pod data: %s", err.Error())
 		}
 	}
 
@@ -1820,22 +1826,22 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 	q.Set("step", strconv.FormatFloat(step.Seconds(), 'f', 3, 64))
 	u.RawQuery = q.Encode()
 
-	req, err := http.NewRequest(http.MethodGet, u.String(), nil)
+	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	if err != nil {
 		return nil, err
 	}
 
-	_, body, _, err := cli.Do(context.Background(), req)
-	if err != nil {
-		klog.V(1).Infof("ERROR" + err.Error())
+	resp, body, warnings, err := cli.Do(context.Background(), req)
+	for _, w := range warnings {
+		klog.V(3).Infof("%s", w)
 	}
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("%s Error %s fetching query %s", resp.StatusCode, err.Error(), query)
 	}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		klog.V(1).Infof("ERROR" + err.Error())
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
 	}
 	return toReturn, err
 }
@@ -1846,21 +1852,24 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 	q.Set("query", query)
 	u.RawQuery = q.Encode()
 
-	req, err := http.NewRequest(http.MethodGet, u.String(), nil)
+	req, err := http.NewRequest(http.MethodPost, u.String(), nil)
 	if err != nil {
 		return nil, err
 	}
 
-	_, body, _, err := cli.Do(context.Background(), req)
+	resp, body, warnings, err := cli.Do(context.Background(), req)
+	for _, w := range warnings {
+		klog.V(3).Infof("%s", w)
+	}
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("%s Error %s fetching query %s", resp.StatusCode, err.Error(), query)
 	}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		klog.V(1).Infof("ERROR" + err.Error())
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
 	}
-	return toReturn, err
+	return toReturn, nil
 }
 
 //todo: don't cast, implement unmarshaler interface

+ 0 - 5
costmodel/watchcontroller.go

@@ -7,7 +7,6 @@ import (
 
 	"k8s.io/klog"
 
-	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/fields"
 	rt "k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/util/runtime"
@@ -129,14 +128,10 @@ func (c *CachingWatchController) handle(key string) error {
 	}
 
 	if !exists {
-		klog.V(3).Infof("Removed %s for key: %s\n", c.resourceType, key)
-
 		if c.removeHandler != nil {
 			c.removeHandler(key)
 		}
 	} else {
-		klog.V(3).Infof("Updated %s: %s\n", c.resourceType, obj.(v1.Object).GetName())
-
 		if c.updateHandler != nil {
 			c.updateHandler(obj)
 		}

+ 80 - 28
main.go

@@ -41,6 +41,8 @@ var (
 	gitCommit string
 )
 
+var Router = httprouter.New()
+
 type Accesses struct {
 	PrometheusClient              prometheusClient.Client
 	KubeClientSet                 kubernetes.Interface
@@ -154,8 +156,8 @@ func (a *Accesses) CostDataModel(w http.ResponseWriter, r *http.Request, ps http
 		if err != nil {
 			w.Write(wrapData(nil, err))
 		}
-
-		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
+		discount = discount * 0.01
+		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -207,6 +209,10 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	aggregation := r.URL.Query().Get("aggregation")
 	namespace := r.URL.Query().Get("namespace")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
+	allocateIdle := r.URL.Query().Get("allocateIdle")
+	sharedNamespaces := r.URL.Query().Get("sharedNamespaces")
+	sharedLabelNames := r.URL.Query().Get("sharedLabelNames")
+	sharedLabelValues := r.URL.Query().Get("sharedLabelValues")
 
 	endTime := time.Now()
 	if offset != "" {
@@ -235,25 +241,66 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 		w.Write(wrapData(nil, err))
 		return
 	}
+
 	startTime := endTime.Add(-1 * d)
 	layout := "2006-01-02T15:04:05.000Z"
 	start := startTime.Format(layout)
 	end := endTime.Format(layout)
-	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace)
+
+	remote := r.URL.Query().Get("remote")
+
+	remoteAvailable := os.Getenv(remoteEnabled)
+	remoteEnabled := false
+	if remoteAvailable == "true" && remote != "false" {
+		remoteEnabled = true
+	}
+	klog.Infof("REMOTE ENABLED: %t", remoteEnabled)
+	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, "1h", namespace, remoteEnabled)
 	if err != nil {
 		w.Write(wrapData(nil, err))
 		return
 	}
+
 	c, err := a.Cloud.GetConfig()
 	if err != nil {
 		w.Write(wrapData(nil, err))
+		return
 	}
 	discount, err := strconv.ParseFloat(c.Discount[:len(c.Discount)-1], 64)
 	if err != nil {
 		w.Write(wrapData(nil, err))
+		return
+	}
+	discount = discount * 0.01
+
+	idleCoefficient := 1.0
+	if allocateIdle == "true" {
+		idleCoefficient, err = costModel.ComputeIdleCoefficient(data, a.PrometheusClient, a.Cloud, discount, fmt.Sprintf("%dh", int(d.Hours())), offset)
+		if err != nil {
+			w.Write(wrapData(nil, err))
+		}
 	}
+
 	if aggregation != "" {
-		agg := costModel.AggregateCostModel(data, discount*0.01, aggregation, aggregationSubField)
+		sn := []string{}
+		sln := []string{}
+		slv := []string{}
+		if sharedNamespaces != "" {
+			sn = strings.Split(sharedNamespaces, ",")
+		}
+		if sharedLabelNames != "" {
+			sln = strings.Split(sharedLabelNames, ",")
+			slv = strings.Split(sharedLabelValues, ",")
+			if len(sln) != len(slv) || slv[0] == "" {
+				w.Write(wrapData(nil, fmt.Errorf("Supply exacly one label value per label name")))
+				return
+			}
+		}
+		var s *costModel.SharedResourceInfo
+		if len(sn) > 0 || len(sln) > 0 {
+			s = costModel.NewSharedResourceInfo(true, sn, sln, slv)
+		}
+		agg := costModel.AggregateCostModel(data, discount, idleCoefficient, s, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	}
 }
@@ -269,8 +316,14 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	namespace := r.URL.Query().Get("namespace")
 	aggregation := r.URL.Query().Get("aggregation")
 	aggregationSubField := r.URL.Query().Get("aggregationSubfield")
+	remote := r.URL.Query().Get("remote")
 
-	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace)
+	remoteAvailable := os.Getenv(remoteEnabled)
+	remoteEnabled := false
+	if remoteAvailable == "true" && remote != "false" {
+		remoteEnabled = true
+	}
+	data, err := a.Model.ComputeCostDataRange(a.PrometheusClient, a.KubeClientSet, a.Cloud, start, end, window, namespace, remoteEnabled)
 	if err != nil {
 		w.Write(wrapData(nil, err))
 	}
@@ -283,7 +336,8 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 		if err != nil {
 			w.Write(wrapData(nil, err))
 		}
-		agg := costModel.AggregateCostModel(data, discount, aggregation, aggregationSubField)
+		discount = discount * 0.01
+		agg := costModel.AggregateCostModel(data, discount, 1.0, nil, aggregation, aggregationSubField)
 		w.Write(wrapData(agg, nil))
 	} else {
 		if fields != "" {
@@ -556,7 +610,6 @@ func (a *Accesses) recordPrices() {
 				if podStatus[podName] == v1.PodRunning { // Only report data for current pods
 					containerSeen[labelKey] = true
 				} else {
-					klog.Infof("Container %s not running", labelKey)
 					containerSeen[labelKey] = false
 				}
 
@@ -811,29 +864,28 @@ func main() {
 
 	a.recordPrices()
 
-	router := httprouter.New()
-	router.GET("/costDataModel", a.CostDataModel)
-	router.GET("/costDataModelRange", a.CostDataModelRange)
-	router.GET("/costDataModelRangeLarge", a.CostDataModelRangeLarge)
-	router.GET("/outOfClusterCosts", a.OutofClusterCosts)
-	router.GET("/allNodePricing", a.GetAllNodePricing)
-	router.GET("/healthz", Healthz)
-	router.GET("/getConfigs", a.GetConfigs)
-	router.POST("/refreshPricing", a.RefreshPricingData)
-	router.POST("/updateSpotInfoConfigs", a.UpdateSpotInfoConfigs)
-	router.POST("/updateAthenaInfoConfigs", a.UpdateAthenaInfoConfigs)
-	router.POST("/updateBigQueryInfoConfigs", a.UpdateBigQueryInfoConfigs)
-	router.POST("/updateConfigByKey", a.UpdateConfigByKey)
-	router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
-	router.GET("/clusterCosts", a.ClusterCosts)
-	router.GET("/validatePrometheus", a.GetPrometheusMetadata)
-	router.GET("/managementPlatform", a.ManagementPlatform)
-	router.GET("/clusterInfo", a.ClusterInfo)
-	router.GET("/containerUptimes", a.ContainerUptimes)
-	router.GET("/aggregatedCostModel", a.AggregateCostModel)
+	Router.GET("/costDataModel", a.CostDataModel)
+	Router.GET("/costDataModelRange", a.CostDataModelRange)
+	Router.GET("/costDataModelRangeLarge", a.CostDataModelRangeLarge)
+	Router.GET("/outOfClusterCosts", a.OutofClusterCosts)
+	Router.GET("/allNodePricing", a.GetAllNodePricing)
+	Router.GET("/healthz", Healthz)
+	Router.GET("/getConfigs", a.GetConfigs)
+	Router.POST("/refreshPricing", a.RefreshPricingData)
+	Router.POST("/updateSpotInfoConfigs", a.UpdateSpotInfoConfigs)
+	Router.POST("/updateAthenaInfoConfigs", a.UpdateAthenaInfoConfigs)
+	Router.POST("/updateBigQueryInfoConfigs", a.UpdateBigQueryInfoConfigs)
+	Router.POST("/updateConfigByKey", a.UpdateConfigByKey)
+	Router.GET("/clusterCostsOverTime", a.ClusterCostsOverTime)
+	Router.GET("/clusterCosts", a.ClusterCosts)
+	Router.GET("/validatePrometheus", a.GetPrometheusMetadata)
+	Router.GET("/managementPlatform", a.ManagementPlatform)
+	Router.GET("/clusterInfo", a.ClusterInfo)
+	Router.GET("/containerUptimes", a.ContainerUptimes)
+	Router.GET("/aggregatedCostModel", a.AggregateCostModel)
 
 	rootMux := http.NewServeMux()
-	rootMux.Handle("/", router)
+	rootMux.Handle("/", Router)
 	rootMux.Handle("/metrics", promhttp.Handler())
 
 	klog.Fatal(http.ListenAndServe(":9003", rootMux))

+ 107 - 0
test/aggregation_test.go

@@ -0,0 +1,107 @@
+package costmodel_test
+
+import (
+	"log"
+	"testing"
+
+	"gotest.tools/assert"
+
+	"github.com/kubecost/cost-model/cloud"
+	costModel "github.com/kubecost/cost-model/costmodel"
+)
+
+func TestAggregation(t *testing.T) {
+	cd1 := &costModel.CostData{
+		Namespace: "test1",
+		NodeName:  "testnode",
+		NodeData: &cloud.Node{
+			VCPUCost: "1.0",
+			RAMCost:  "1.0",
+		},
+		RAMAllocation: []*costModel.Vector{&costModel.Vector{
+			Timestamp: 10,
+			Value:     1073741824,
+		}},
+		CPUAllocation: []*costModel.Vector{&costModel.Vector{
+			Timestamp: 10,
+			Value:     1.0,
+		}},
+		GPUReq: []*costModel.Vector{&costModel.Vector{}},
+		PVCData: []*costModel.PersistentVolumeClaimData{
+			&costModel.PersistentVolumeClaimData{
+				Namespace:  "test1",
+				VolumeName: "foo",
+				Volume: &cloud.PV{
+					Cost: "1.0",
+					Size: "1073741824",
+				},
+				Values: []*costModel.Vector{&costModel.Vector{
+					Timestamp: 10,
+					Value:     1073741824,
+				}},
+			},
+			&costModel.PersistentVolumeClaimData{
+				Namespace:  "test1",
+				VolumeName: "bar",
+				Volume: &cloud.PV{
+					Cost: "1.0",
+					Size: "1073741824",
+				},
+				Values: []*costModel.Vector{&costModel.Vector{
+					Timestamp: 10,
+					Value:     1073741824,
+				}},
+			},
+		},
+	}
+	cd2 := &costModel.CostData{
+		Namespace: "test1",
+		NodeName:  "testnode",
+		NodeData: &cloud.Node{
+			VCPUCost: "1.0",
+			RAMCost:  "1.0",
+		},
+		RAMAllocation: []*costModel.Vector{&costModel.Vector{
+			Timestamp: 10,
+			Value:     1073741824,
+		}},
+		CPUAllocation: []*costModel.Vector{&costModel.Vector{
+			Timestamp: 10,
+			Value:     1.0,
+		}},
+		GPUReq: []*costModel.Vector{&costModel.Vector{}},
+		PVCData: []*costModel.PersistentVolumeClaimData{
+			&costModel.PersistentVolumeClaimData{
+				Namespace:  "test1",
+				VolumeName: "foo",
+				Volume: &cloud.PV{
+					Cost: "1.0",
+					Size: "1073741824",
+				},
+				Values: []*costModel.Vector{&costModel.Vector{
+					Timestamp: 10,
+					Value:     1073741824,
+				}},
+			},
+			&costModel.PersistentVolumeClaimData{
+				Namespace:  "test1",
+				VolumeName: "bar",
+				Volume: &cloud.PV{
+					Cost: "1.0",
+					Size: "1073741824",
+				},
+				Values: []*costModel.Vector{&costModel.Vector{
+					Timestamp: 10,
+					Value:     1073741824,
+				}},
+			},
+		},
+	}
+
+	costData := make(map[string]*costModel.CostData)
+	costData["test1,foo,nginx,testnode"] = cd1
+	costData["test1,bar,nginx,testnode"] = cd2
+	agg := costModel.AggregateCostModel(costData, 0.0, 1.0, nil, "namespace", "")
+	log.Printf("agg: %+v", agg["test1"])
+	assert.Equal(t, agg["test1"].TotalCost, 8.0)
+}

+ 4 - 4
test/historical_pod_test.go

@@ -190,11 +190,11 @@ func TestPodUpDown(t *testing.T) {
 	log.Printf("Starting at %s \n", startStr)
 	log.Printf("Ending at %s \n", endStr)
 	provider.DownloadPricingData()
-	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1m", "")
+	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1m", "", false)
 	if err != nil {
 		panic(err)
 	}
-	agg := costModel.AggregateCostModel(data, 0.0, "namespace", "")
+	agg := costModel.AggregateCostModel(data, 0.0, 1.0, nil, "namespace", "")
 	_, ok := agg["test"]
 	assert.Assert(t, ok)
 
@@ -202,11 +202,11 @@ func TestPodUpDown(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-	agg2 := costModel.AggregateCostModel(data2, 0.0, "namespace", "")
+	agg2 := costModel.AggregateCostModel(data2, 0.0, 1.0, nil, "namespace", "")
 	_, ok2 := agg2["test"]
 	assert.Assert(t, ok2)
 
-	agg3 := costModel.AggregateCostModel(data, 0.0, "label", "testaggregation")
+	agg3 := costModel.AggregateCostModel(data, 0.0, 1.0, nil, "label", "testaggregation")
 	_, ok3 := agg3["foo"]
 	assert.Assert(t, ok3)
 }

+ 76 - 0
test/remote_cluster_test.go

@@ -0,0 +1,76 @@
+package costmodel_test
+
+import (
+	"log"
+	"net"
+	"net/http"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/kubecost/cost-model/cloud"
+	costModel "github.com/kubecost/cost-model/costmodel"
+	"gotest.tools/assert"
+
+	prometheusClient "github.com/prometheus/client_golang/api"
+
+	_ "k8s.io/client-go/plugin/pkg/client/auth"
+)
+
+func TestClusterConvergence(t *testing.T) {
+	rclient, err := getKubernetesClient()
+	if err != nil {
+		panic(err)
+	}
+	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
+		Proxy: http.ProxyFromEnvironment,
+		DialContext: (&net.Dialer{
+			Timeout:   120 * time.Second,
+			KeepAlive: 120 * time.Second,
+		}).DialContext,
+		TLSHandshakeTimeout: 10 * time.Second,
+	}
+
+	pc := prometheusClient.Config{
+		Address:      address,
+		RoundTripper: LongTimeoutRoundTripper,
+	}
+	promCli, err := prometheusClient.NewClient(pc)
+	if err != nil {
+		panic(err)
+	}
+	cm := costModel.NewCostModel(rclient)
+
+	provider := &cloud.CustomProvider{
+		Clientset: rclient,
+	}
+	loc, _ := time.LoadLocation("UTC")
+	endTime := time.Now().In(loc)
+	d, _ := time.ParseDuration("24h")
+	startTime := endTime.Add(-1 * d)
+	layout := "2006-01-02T15:04:05.000Z"
+	startStr := startTime.Format(layout)
+	endStr := endTime.Format(layout)
+	log.Printf("Starting at %s \n", startStr)
+	log.Printf("Ending at %s \n", endStr)
+	provider.DownloadPricingData()
+
+	data, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1h", "", false)
+	if err != nil {
+		panic(err)
+	}
+
+	os.Setenv("SQL_ADDRESS", "ab5cfc235d64e11e9b8280265f54018f-778641917.us-east-2.elb.amazonaws.com")
+	os.Setenv("REMOTE_WRITE_PASSWORD", "savemoney123")
+
+	data2, err := cm.ComputeCostDataRange(promCli, rclient, provider, startStr, endStr, "1h", "", true)
+	if err != nil {
+		panic(err)
+	}
+
+	agg := costModel.AggregateCostModel(data, 0.0, 1.0, nil, "namespace", "")
+	agg2 := costModel.AggregateCostModel(data2, 0.0, 1.0, nil, "namespace", "")
+
+	assert.Equal(t, agg["kubecost"].TotalCost, agg2["kubecost"].TotalCost)
+
+}