Ver código fonte

CostModel.ComputeAllocation WIP

Niko Kovacevic 5 anos atrás
pai
commit
73d02052ff
3 arquivos alterados com 37 adições e 17 exclusões
  1. 27 14
      pkg/costmodel/costmodel.go
  2. 8 1
      pkg/costmodel/router.go
  3. 2 2
      test/cloud_test.go

+ 27 - 14
pkg/costmodel/costmodel.go

@@ -55,7 +55,7 @@ type CostModel struct {
 	pricingMetadata *costAnalyzerCloud.PricingMatchMetadata
 	// TODO niko/cdmr both, or just one?
 	PrometheusClient prometheus.Client
-	ThanosClient     prometheus.Client
+	// ThanosClient     prometheus.Client
 }
 
 func NewCostModel(client prometheus.Client, cache clustercache.ClusterCache, clusterMap clusters.ClusterMap, scrapeInterval time.Duration) *CostModel {
@@ -1564,7 +1564,8 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	// If using Thanos, increase offset to 3 hours, reducing the duration by
 	// equal measure to maintain the same starting point.
 	thanosDur := thanos.OffsetDuration()
-	if offset < thanosDur && cm.ThanosClient != nil {
+	// TODO niko/cdmr confirm that this flag works interchangeably with ThanosClient != nil
+	if offset < thanosDur && env.IsThanosEnabled() {
 		diff := thanosDur - offset
 		offset += diff
 		duration -= diff
@@ -1593,12 +1594,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	resStr := "1m"
 	// resPerHr := 60
 
-	var ctx *prom.Context
-	if cm.ThanosClient != nil {
-		ctx = prom.NewContext(cm.ThanosClient)
-	} else {
-		ctx = prom.NewContext(cm.PrometheusClient)
-	}
+	ctx := prom.NewContext(cm.PrometheusClient)
 
 	// TODO niko/cdmr retries? (That should probably go into the Store.)
 
@@ -1617,12 +1613,12 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	// 	) by (container, pod, namespace, node, cluster_id) / %f
 	// `, durStr, resStr, offStr, resPerHr)
 
-	queryRAMAlloc := fmt.Sprintf(`
+	queryRAMBytesAllocated := fmt.Sprintf(`
 		avg(
 			avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s:%s]%s)
 		) by (container, pod, namespace, node, cluster_id)
 	`, durStr, resStr, offStr)
-	resChRAMAlloc := ctx.Query(queryRAMAlloc)
+	resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
 
 	// queryRAMRequests := fmt.Sprintf()
 
@@ -1636,12 +1632,12 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	// 	) by (container, pod, namespace, node, cluster_id) / %f
 	// `, durStr, resStr, offStr, resPerHr)
 
-	queryCPUAlloc := fmt.Sprintf(`
+	queryCPUCoresAllocated := fmt.Sprintf(`
 		avg(
 			avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s:%s]%s)
 		) by (container, pod, namespace, node, cluster_id)
 	`, durStr, resStr, offStr)
-	resChCPUAlloc := ctx.Query(queryCPUAlloc)
+	resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
 
 	// queryCPURequests := fmt.Sprintf()
 
@@ -1661,13 +1657,26 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 	// ) by (namespace,container_name,pod_name,node,cluster_id)
 	// * on (pod_name, namespace, cluster_id) group_left(container) label_replace(avg(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s)) by (pod,namespace,cluster_id), "pod_name","$1","pod","(.+)")
 
-	queryGPURequests := fmt.Sprintf(`
+	// TODO niko/cdmr find an env with GPUs to test this (generate one?)
+	queryGPUsRequested := fmt.Sprintf(`
 		avg(
 			avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s:%s]%s)
 		) by (container, pod, namespace, node, cluster_id)
 	`)
+	resChGPUsRequested := ctx.Query(queryGPUsRequested)
+
+	// avg(avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id, kubernetes_node)
+	// *
+	// on (persistentvolumeclaim, namespace, cluster_id, kubernetes_node) group_right(storageclass, volumename)
+	// sum(kube_persistentvolumeclaim_resource_requests_storage_bytes{}) by (persistentvolumeclaim, namespace, cluster_id, kubernetes_node, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, cluster_id, volumename, kubernetes_node)
+	queryPVCs := fmt.Sprintf(`
+		avg(
+			avg_over_time(kube_persistentvolumeclaim_info{volumename != ""}[%s:%s]%s)
+		) by (persistentvolumeclaim, storageclass, volumename, namespace, kubernetes_node, cluster_id)
+	`)
+	resChPVCs := ctx.Query(queryPVCs)
 
-	// queryPVRequests := fmt.Sprintf()
+	// sum(kube_persistentvolumeclaim_resource_requests_storage_bytes{}) by (persistentvolumeclaim, namespace, cluster_id, kubernetes_node, kubernetes_name)
 
 	// queryPVCAllocation := fmt.Sprintf()
 
@@ -1679,6 +1688,10 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time) (*kubecost.Allocati
 
 	// queryNetInternetRequests := fmt.Sprintf()
 
+	resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
+	resRAMBytesAllocated, _ := resChRAMBytesAllocated.Await()
+	resGPUsRequested, _ := resChGPUsRequested.Await()
+	resPVCs, _ := resChPVCs.Await()
 	resMinutes, _ := resChMinutes.Await()
 
 	// Build out a map of allocations, starting with (start, end) so that we

+ 8 - 1
pkg/costmodel/router.go

@@ -28,6 +28,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
+	prometheus "github.com/prometheus/client_golang/api"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -1013,7 +1014,13 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		30 * day: maxCacheMinutes30d * time.Minute,
 	}
 
-	costModel := NewCostModel(k8sCache, clusterMap, scrapeInterval)
+	var pc prometheus.Client
+	if thanosClient != nil {
+		pc = thanosClient
+	} else {
+		pc = promCli
+	}
+	costModel := NewCostModel(pc, k8sCache, clusterMap, scrapeInterval)
 	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
 
 	a := &Accesses{

+ 2 - 2
test/cloud_test.go

@@ -279,7 +279,7 @@ func TestNodePriceFromCSVWithBadConfig(t *testing.T) {
 	fm := FakeClusterMap{}
 	d, _ := time.ParseDuration("1m")
 
-	model := costmodel.NewCostModel(fc, fm, d)
+	model := costmodel.NewCostModel(nil, fc, fm, d)
 
 	_, err := model.GetNodeCost(c)
 	if err != nil {
@@ -333,7 +333,7 @@ func TestSourceMatchesFromCSV(t *testing.T) {
 	fm := FakeClusterMap{}
 	d, _ := time.ParseDuration("1m")
 
-	model := costmodel.NewCostModel(fc, fm, d)
+	model := costmodel.NewCostModel(nil, fc, fm, d)
 
 	_, err = model.GetNodeCost(c)
 	if err != nil {