Bläddra i källkod

idle coefficient work

AjayTripathy 6 år sedan
förälder
incheckning
30ed02c65f

+ 39 - 30
costmodel/aggregations.go

@@ -74,44 +74,53 @@ func NewSharedResourceInfo(shareResources bool, sharedNamespaces []string, label
 	return sr
 }
 
-func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset string) (float64, error) {
+func ComputeIdleCoefficient(costData map[string]*CostData, cli prometheusClient.Client, cp cloud.Provider, discount float64, windowString, offset string) (map[string]float64, error) {
+
+	coefficients := make(map[string]float64)
+
 	windowDuration, err := time.ParseDuration(windowString)
 	if err != nil {
-		return 0.0, err
+		return nil, err
 	}
-	totals, err := ClusterCosts(cli, cp, windowString, offset)
+	aggregateContainerCosts := AggregateCostData(cp, costData, 0, "cluster", []string{}, "", false, discount, 1, nil)
+	allTotals, err := ClusterCostsForAllClusters(cli, cp, windowString, offset)
 	if err != nil {
-		return 0.0, err
-	}
-	cpuCost, err := strconv.ParseFloat(totals.CPUCost[0][1], 64)
-	if err != nil {
-		return 0.0, err
-	}
-	memCost, err := strconv.ParseFloat(totals.MemCost[0][1], 64)
-	if err != nil {
-		return 0.0, err
-	}
-	storageCost, err := strconv.ParseFloat(totals.StorageCost[0][1], 64)
-	if err != nil {
-		return 0.0, err
-	}
-	totalClusterCost := (cpuCost * (1 - discount)) + (memCost * (1 - discount)) + storageCost
-	if err != nil || totalClusterCost == 0.0 {
-		return 0.0, err
+		return nil, err
 	}
-	totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours()
-	totalContainerCost := 0.0
-	for _, costDatum := range costData {
-		cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
-		totalContainerCost += totalVector(cpuv)
-		totalContainerCost += totalVector(ramv)
-		totalContainerCost += totalVector(gpuv)
-		for _, pv := range pvvs {
-			totalContainerCost += totalVector(pv)
+	for cid, totals := range allTotals {
+
+		cpuCost, err := strconv.ParseFloat(totals.CPUCost[0][1], 64)
+		if err != nil {
+			return nil, err
+		}
+		memCost, err := strconv.ParseFloat(totals.MemCost[0][1], 64)
+		if err != nil {
+			return nil, err
+		}
+		storageCost, err := strconv.ParseFloat(totals.StorageCost[0][1], 64)
+		if err != nil {
+			return nil, err
+		}
+		totalClusterCost := (cpuCost * (1 - discount)) + (memCost * (1 - discount)) + storageCost
+		if err != nil || totalClusterCost == 0.0 {
+			return nil, err
 		}
+		totalClusterCostOverWindow := (totalClusterCost / 730) * windowDuration.Hours()
+		totalContainerCost := 0.0
+		for _, costDatum := range costData {
+			cpuv, ramv, gpuv, pvvs, _ := getPriceVectors(cp, costDatum, "", discount, 1)
+			totalContainerCost += totalVector(cpuv)
+			totalContainerCost += totalVector(ramv)
+			totalContainerCost += totalVector(gpuv)
+			for _, pv := range pvvs {
+				totalContainerCost += totalVector(pv)
+			}
+		}
+
+		coefficients[cid] = aggregateContainerCosts[cid].TotalCost / totalClusterCostOverWindow
 	}
 
-	return (totalContainerCost / totalClusterCostOverWindow), nil
+	return coefficients, nil
 }
 
 // AggregateCostData reduces the dimensions of raw cost data by field and, optionally, by time. The field parameter determines the field

+ 138 - 26
costmodel/cluster.go

@@ -2,6 +2,7 @@ package costmodel
 
 import (
 	"fmt"
+	"os"
 	"time"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
@@ -12,24 +13,24 @@ import (
 
 const (
 	queryClusterCores = `sum(
-		avg(kube_node_status_capacity_cpu_cores %s) by (node) * avg(node_cpu_hourly_cost %s) by (node) * 730 +
-		avg(node_gpu_hourly_cost %s) by (node) * 730
-	  )`
+		avg(kube_node_status_capacity_cpu_cores %s) by (node, cluster_id) * avg(node_cpu_hourly_cost %s) by (node, cluster_id) * 730 +
+		avg(node_gpu_hourly_cost %s) by (node, cluster_id) * 730
+	  ) by (cluster_id)`
 
 	queryClusterRAM = `sum(
-		avg(kube_node_status_capacity_memory_bytes %s) by (node) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node) * 730
-	  )`
+		avg(kube_node_status_capacity_memory_bytes %s) by (node) / 1024 / 1024 / 1024 * avg(node_ram_hourly_cost %s) by (node, cluster_id) * 730
+	  ) by (cluster_id)`
 
 	queryStorage = `sum(
-		avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume) * 730 
-		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume) / 1024 / 1024 / 1024
-	  ) %s`
+		avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730 
+		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
+	  ) by (cluster_id) %s`
 
-	queryTotal = `sum(avg(node_total_hourly_cost) by (node)) * 730 +
+	queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
 	  sum(
-		avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume) * 730 
-		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume) / 1024 / 1024 / 1024
-	  ) %s`
+		avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730 
+		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
+	  ) by (cluster_id) %s`
 )
 
 type Totals struct {
@@ -79,7 +80,9 @@ func resultToTotals(qr interface{}) ([][]string, error) {
 	return totals, nil
 }
 
-func resultToTotal(qr interface{}) ([][]string, error) {
+func resultToTotal(qr interface{}) (map[string][][]string, error) {
+	defaultClusterID := os.Getenv(clusterIDKey)
+
 	data, ok := qr.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(qr)
@@ -99,22 +102,131 @@ func resultToTotal(qr interface{}) ([][]string, error) {
 	if len(results) == 0 {
 		return nil, fmt.Errorf("Not enough data available in the selected time range")
 	}
-	val, ok := results[0].(map[string]interface{})["value"]
-	totals := [][]string{}
-	if !ok {
-		return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
+	toReturn := make(map[string][][]string)
+	for i := range results {
+		metrics, ok := results[i].(map[string]interface{})["metric"]
+		if !ok {
+			return nil, fmt.Errorf("Improperly formatted results from prometheus, metric is not a field in the vector")
+		}
+		metricMap, ok := metrics.(map[string]interface{})
+		cid, ok := metricMap["cluster_id"]
+		if !ok {
+			klog.V(4).Info("Prometheus vector does not have cluster id")
+			cid = defaultClusterID
+		}
+		clusterID, ok := cid.(string)
+		if !ok {
+			return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
+		}
+
+		val, ok := results[i].(map[string]interface{})["value"]
+		if !ok {
+			return nil, fmt.Errorf("Improperly formatted results from prometheus, value is not a field in the vector")
+		}
+		dataPoint, ok := val.([]interface{})
+		if !ok || len(dataPoint) != 2 {
+			return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		}
+		d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
+		toAppend := []string{
+			d0,
+			dataPoint[1].(string),
+		}
+		if t, ok := toReturn[clusterID]; ok {
+			t = append(t, toAppend)
+		} else {
+			toReturn[clusterID] = [][]string{toAppend}
+		}
 	}
-	dataPoint, ok := val.([]interface{})
-	if !ok || len(dataPoint) != 2 {
-		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+	return toReturn, nil
+}
+
+// ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
+func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
+	localStorageQuery, err := cloud.GetLocalStorageQuery()
+	if err != nil {
+		return nil, err
 	}
-	d0 := fmt.Sprintf("%f", dataPoint[0].(float64))
-	toAppend := []string{
-		d0,
-		dataPoint[1].(string),
+	if localStorageQuery != "" {
+		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}
-	totals = append(totals, toAppend)
-	return totals, nil
+
+	// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
+	if offset != "" {
+		offset = fmt.Sprintf("offset %s", offset)
+	}
+
+	qCores := fmt.Sprintf(queryClusterCores, offset, offset, offset)
+	qRAM := fmt.Sprintf(queryClusterRAM, offset, offset)
+	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
+	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
+
+	resultClusterCores, err := Query(cli, qCores)
+	if err != nil {
+		return nil, err
+	}
+	resultClusterRAM, err := Query(cli, qRAM)
+	if err != nil {
+		return nil, err
+	}
+
+	resultStorage, err := Query(cli, qStorage)
+	if err != nil {
+		return nil, err
+	}
+
+	resultTotal, err := Query(cli, qTotal)
+	if err != nil {
+		return nil, err
+	}
+
+	toReturn := make(map[string]*Totals)
+
+	coreTotal, err := resultToTotal(resultClusterCores)
+	if err != nil {
+		return nil, err
+	}
+	for clusterID, total := range coreTotal {
+		if _, ok := toReturn[clusterID]; !ok {
+			toReturn[clusterID] = &Totals{}
+		}
+		toReturn[clusterID].CPUCost = total
+	}
+
+	ramTotal, err := resultToTotal(resultClusterRAM)
+	if err != nil {
+		return nil, err
+	}
+	for clusterID, total := range ramTotal {
+		if _, ok := toReturn[clusterID]; !ok {
+			toReturn[clusterID] = &Totals{}
+		}
+		toReturn[clusterID].MemCost = total
+	}
+
+	storageTotal, err := resultToTotal(resultStorage)
+	if err != nil {
+		return nil, err
+	}
+	for clusterID, total := range storageTotal {
+		if _, ok := toReturn[clusterID]; !ok {
+			toReturn[clusterID] = &Totals{}
+		}
+		toReturn[clusterID].StorageCost = total
+	}
+
+	clusterTotal, err := resultToTotal(resultTotal)
+	if err != nil {
+		return nil, err
+	}
+	for clusterID, total := range clusterTotal {
+		if _, ok := toReturn[clusterID]; !ok {
+			toReturn[clusterID] = &Totals{}
+		}
+		toReturn[clusterID].TotalCost = total
+	}
+
+	return toReturn, nil
 }
 
 // ClusterCosts gives the current full cluster costs averaged over a window of time.

+ 2 - 2
costmodel/costmodel.go

@@ -1868,12 +1868,12 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 		return nil, err
 	}
 
-	resp, body, warnings, err := cli.Do(context.Background(), req)
+	_, body, warnings, err := cli.Do(context.Background(), req)
 	for _, w := range warnings {
 		klog.V(3).Infof("%s", w)
 	}
 	if err != nil {
-		return nil, fmt.Errorf("%s Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
 	}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)

+ 4 - 1
costmodel/router.go

@@ -420,7 +420,10 @@ func (a *Accesses) AggregateCostModel(w http.ResponseWriter, r *http.Request, ps
 	idleCoefficient := 1.0
 	if allocateIdle {
 		windowStr := fmt.Sprintf("%dh", int(dur.Hours()))
-		idleCoefficient, err = ComputeIdleCoefficient(data, a.PrometheusClient, a.Cloud, discount, windowStr, offset)
+		if a.ThanosClient != nil {
+			offset = "3h"
+		}
+		idleCoefficient, err = ComputeIdleCoefficient(data, pClient, a.Cloud, discount, windowStr, offset)
 		if err != nil {
 			klog.V(1).Infof("error computing idle coefficient: windowString=%s, offset=%s, err=%s", windowStr, offset, err)
 			w.Write(wrapData(nil, err))

+ 0 - 2
test/cluster_test.go

@@ -25,8 +25,6 @@ import (
 	"log"
 )
 
-const address = "http://localhost:9003"
-
 const apiPrefix = "/api/v1"
 
 const epQuery = apiPrefix + "/query"

+ 26 - 12
test/historical_pod_test.go

@@ -169,7 +169,7 @@ func TestPodUpDown(t *testing.T) {
 	klog.Infof("Sleeping 5 minutes to wait for steady state.")
 	time.Sleep(5 * time.Minute)
 
-	qr := `label_replace(label_replace(container_cpu_allocation{container='web',namespace='test'}, "container_name", "$1", "container","(.+)"), "pod_name", "$1", "pod","(.+)")`
+	qr := `label_replace(label_replace(container_cpu_allocation{container='web',namespace='test2'}, "container_name", "$1", "container","(.+)"), "pod_name", "$1", "pod","(.+)")`
 
 	end := time.Now()
 	start := end.Add(-1 * time.Duration(3*time.Minute))
@@ -184,12 +184,16 @@ func TestPodUpDown(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-
-	assert.Check(t, len(vectors) > 0)
+	klog.Infof("Found Vectors %+v", vectors)
+	if !(len(vectors) > 0) {
+		panic("Expected vectors to have data")
+	}
 	for _, values := range vectors {
 		assert.Check(t, len(values) > 0)
 		for _, vector := range values {
-			assert.Check(t, vector.Value == 0.25 || vector.Value == 0.125) // It's halved for fractional minute normalization.
+			if vector.Value != 0.25 && vector.Value != 0.125 { // It's halved for fractional minute normalization.
+				panic(fmt.Sprintf("Expected %f to equal 0.25", vector.Value))
+			}
 		}
 	}
 
@@ -215,9 +219,13 @@ func TestPodUpDown(t *testing.T) {
 	if err != nil {
 		panic(err)
 	}
-	assert.Equal(t, len(vectors), 0)
-	provider := &cloud.CustomProvider{
-		Clientset: rclient,
+	if len(vectors) != 0 {
+		panic("Pods are not gone from namespace test2 data")
+	}
+	klog.Infof("Validated that pods are gone from namespace test2 data")
+	provider, err := cloud.NewProvider(rclient, os.Getenv("CLOUD_PROVIDER_API_KEY"))
+	if err != nil {
+		panic(err)
 	}
 	loc, _ := time.LoadLocation("UTC")
 	endTime := time.Now().In(loc)
@@ -236,8 +244,10 @@ func TestPodUpDown(t *testing.T) {
 	}
 
 	agg := costModel.AggregateCostData(provider, data, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
-	_, ok := agg["test"]
-	assert.Assert(t, ok)
+	_, ok := agg["test2"]
+	if !ok {
+		panic("No test2 namespace!")
+	}
 
 	data2, err := cm.ComputeCostData(promCli, rclient, provider, "10m", "", "")
 	if err != nil {
@@ -245,10 +255,14 @@ func TestPodUpDown(t *testing.T) {
 	}
 
 	agg2 := costModel.AggregateCostData(provider, data2, 1, "namespace", []string{""}, "", false, 0.0, 1.0, nil)
-	_, ok2 := agg2["test"]
-	assert.Assert(t, ok2)
+	_, ok2 := agg2["test2"]
+	if !ok2 {
+		panic("No test2 namespace!")
+	}
 
 	agg3 := costModel.AggregateCostData(provider, data, 1, "label", []string{"testaggregation"}, "", false, 0.0, 1.0, nil)
 	_, ok3 := agg3["foo"]
-	assert.Assert(t, ok3)
+	if !ok3 {
+		panic("No label foo aggregate!")
+	}
 }

+ 1 - 0
test/kubernetes/cluster-role.yaml

@@ -44,6 +44,7 @@ rules:
       - list
       - watch
       - create
+      - delete
   - apiGroups:
       - batch
     resources:

+ 2 - 0
test/kubernetes/test-pod.yaml

@@ -16,6 +16,8 @@ spec:
       containers:
         - image: ajaytripathy/kubecost-cost-model-integration:latest
           name: cost-model
+          securityContext:
+            runAsUser: 0
           resources:
             requests:
               cpu: "10m"

+ 3 - 14
test/remote_cluster_test.go

@@ -1,22 +1,10 @@
 package costmodel_test
 
 import (
-	"log"
-	"net"
-	"net/http"
-	"os"
-	"testing"
-	"time"
-
-	"github.com/kubecost/cost-model/cloud"
-	costModel "github.com/kubecost/cost-model/costmodel"
-	"gotest.tools/assert"
-
-	prometheusClient "github.com/prometheus/client_golang/api"
-
 	_ "k8s.io/client-go/plugin/pkg/client/auth"
 )
 
+/*
 func TestClusterConvergence(t *testing.T) {
 	rclient, err := getKubernetesClient()
 	if err != nil {
@@ -32,7 +20,7 @@ func TestClusterConvergence(t *testing.T) {
 	}
 
 	pc := prometheusClient.Config{
-		Address:      address,
+		Address:      os.Getenv(PROMETHEUS_SERVER_ENDPOINT),
 		RoundTripper: LongTimeoutRoundTripper,
 	}
 	promCli, err := prometheusClient.NewClient(pc)
@@ -74,3 +62,4 @@ func TestClusterConvergence(t *testing.T) {
 	assert.Equal(t, agg["kubecost"].TotalCost, agg2["kubecost"].TotalCost)
 
 }
+*/