Sfoglia il codice sorgente

Fix ComputeClusterCosts when cluster has no PVs

Niko Kovacevic 6 anni fa
parent
commit
6df6b85e6e
1 ha cambiato i file con 29 aggiunte e 87 eliminazioni
  1. 29 87
      pkg/costmodel/cluster.go

+ 29 - 87
pkg/costmodel/cluster.go

@@ -7,6 +7,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 	"k8s.io/klog"
@@ -152,7 +153,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	const fmtQueryTotalStorage = `sum(
 	const fmtQueryTotalStorage = `sum(
 		sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:1m]%s) / 1024 / 1024 / 1024 *
 		sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:1m]%s) / 1024 / 1024 / 1024 *
 		avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
 		avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
-	) by (cluster_id) %s`
+	) by (cluster_id)`
 
 
 	const fmtQueryCPUModePct = `sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
 	const fmtQueryCPUModePct = `sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
 	group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)`
 	group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)`
@@ -183,90 +184,30 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, fmtOffset)
 	queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, fmtOffset)
 	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
 	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
 	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
 	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
-	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
+	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset)
 	queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
 	queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
 	queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, fmtOffset, window, fmtOffset)
 	queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, fmtOffset, window, fmtOffset)
 	queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, fmtOffset, window, fmtOffset)
 	queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, fmtOffset, window, fmtOffset)
 
 
-	numQueries := 9
-
-	klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
-	klog.V(4).Infof("[Debug] queryTotalGPU: %s", queryTotalGPU)
-	klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
-	klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
-	klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
-	klog.V(4).Infof("[Debug] queryCPUModePct: %s", queryCPUModePct)
-	klog.V(4).Infof("[Debug] queryRAMSystemPct: %s", queryRAMSystemPct)
-	klog.V(4).Infof("[Debug] queryRAMUserPct: %s", queryRAMUserPct)
-	klog.V(4).Infof("[Debug] queryUsedLocalStorage: %s", queryUsedLocalStorage)
-
-	// Submit queries to Prometheus asynchronously
-	var ec util.ErrorCollector
-	var wg sync.WaitGroup
-	ctx := PromQueryContext{client, &ec, &wg}
-	ctx.WaitGroup.Add(numQueries)
-
-	chDataCount := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryDataCount, chDataCount, ctx)
-
-	chTotalGPU := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryTotalGPU, chTotalGPU, ctx)
-
-	chTotalCPU := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
-
-	chTotalRAM := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
-
-	chTotalStorage := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
-
-	chCPUModePct := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryCPUModePct, chCPUModePct, ctx)
-
-	chRAMSystemPct := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryRAMSystemPct, chRAMSystemPct, ctx)
-
-	chRAMUserPct := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryRAMUserPct, chRAMUserPct, ctx)
-
-	chUsedLocalStorage := make(chan []*PromQueryResult, 1)
-	go AsyncPromQuery(queryUsedLocalStorage, chUsedLocalStorage, ctx)
-
-	// After queries complete, retrieve results
-	wg.Wait()
-
-	resultsDataCount := <-chDataCount
-	close(chDataCount)
-
-	resultsTotalGPU := <-chTotalGPU
-	close(chTotalGPU)
-
-	resultsTotalCPU := <-chTotalCPU
-	close(chTotalCPU)
-
-	resultsTotalRAM := <-chTotalRAM
-	close(chTotalRAM)
-
-	resultsTotalStorage := <-chTotalStorage
-	close(chTotalStorage)
-
-	resultsCPUModePct := <-chCPUModePct
-	close(chCPUModePct)
-
-	resultsRAMSystemPct := <-chRAMSystemPct
-	close(chRAMSystemPct)
-
-	resultsRAMUserPct := <-chRAMUserPct
-	close(chRAMUserPct)
-
-	resultsUsedLocalStorage := <-chUsedLocalStorage
-	close(chUsedLocalStorage)
+	ctx := prom.NewContext(client)
+
+	resChs := ctx.QueryAll(
+		queryDataCount,
+		queryTotalGPU,
+		queryTotalCPU,
+		queryTotalRAM,
+		queryTotalStorage,
+		queryTotalLocalStorage,
+		queryCPUModePct,
+		queryRAMSystemPct,
+		queryRAMUserPct,
+		queryUsedLocalStorage,
+	)
 
 
 	defaultClusterID := os.Getenv(clusterIDKey)
 	defaultClusterID := os.Getenv(clusterIDKey)
 
 
 	dataMinsByCluster := map[string]float64{}
 	dataMinsByCluster := map[string]float64{}
-	for _, result := range resultsDataCount {
+	for _, result := range resChs[0].Await() {
 		clusterID, _ := result.GetString("cluster_id")
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 		if clusterID == "" {
 			clusterID = defaultClusterID
 			clusterID = defaultClusterID
@@ -299,7 +240,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 
 
 	// Helper function to iterate over Prom query results, parsing the raw values into
 	// Helper function to iterate over Prom query results, parsing the raw values into
 	// the intermediate costData structure.
 	// the intermediate costData structure.
-	setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string, discount float64, customDiscount float64) {
+	setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
 		for _, result := range results {
 		for _, result := range results {
 			clusterID, _ := result.GetString("cluster_id")
 			clusterID, _ := result.GetString("cluster_id")
 			if clusterID == "" {
 			if clusterID == "" {
@@ -315,14 +256,15 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		}
 		}
 	}
 	}
 	// Apply both sustained use and custom discounts to RAM and CPU
 	// Apply both sustained use and custom discounts to RAM and CPU
-	setCostsFromResults(costData, resultsTotalCPU, "cpu", discount, customDiscount)
-	setCostsFromResults(costData, resultsTotalRAM, "ram", discount, customDiscount)
+	setCostsFromResults(costData, resChs[2].Await(), "cpu", discount, customDiscount)
+	setCostsFromResults(costData, resChs[3].Await(), "ram", discount, customDiscount)
 	// Apply only custom discount to GPU and storage
 	// Apply only custom discount to GPU and storage
-	setCostsFromResults(costData, resultsTotalGPU, "gpu", 0.0, customDiscount)
-	setCostsFromResults(costData, resultsTotalStorage, "storage", 0.0, customDiscount)
+	setCostsFromResults(costData, resChs[1].Await(), "gpu", 0.0, customDiscount)
+	setCostsFromResults(costData, resChs[4].Await(), "storage", 0.0, customDiscount)
+	setCostsFromResults(costData, resChs[5].Await(), "localstorage", 0.0, customDiscount)
 
 
 	cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
-	for _, result := range resultsCPUModePct {
+	for _, result := range resChs[6].Await() {
 		clusterID, _ := result.GetString("cluster_id")
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 		if clusterID == "" {
 			clusterID = defaultClusterID
 			clusterID = defaultClusterID
@@ -351,7 +293,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	}
 	}
 
 
 	ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
 	ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
-	for _, result := range resultsRAMSystemPct {
+	for _, result := range resChs[7].Await() {
 		clusterID, _ := result.GetString("cluster_id")
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 		if clusterID == "" {
 			clusterID = defaultClusterID
 			clusterID = defaultClusterID
@@ -362,7 +304,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		ramBD := ramBreakdownMap[clusterID]
 		ramBD := ramBreakdownMap[clusterID]
 		ramBD.System += result.Values[0].Value
 		ramBD.System += result.Values[0].Value
 	}
 	}
-	for _, result := range resultsRAMUserPct {
+	for _, result := range resChs[8].Await() {
 		clusterID, _ := result.GetString("cluster_id")
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 		if clusterID == "" {
 			clusterID = defaultClusterID
 			clusterID = defaultClusterID
@@ -382,7 +324,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 	}
 	}
 
 
 	pvUsedCostMap := map[string]float64{}
 	pvUsedCostMap := map[string]float64{}
-	for _, result := range resultsUsedLocalStorage {
+	for _, result := range resChs[9].Await() {
 		clusterID, _ := result.GetString("cluster_id")
 		clusterID, _ := result.GetString("cluster_id")
 		if clusterID == "" {
 		if clusterID == "" {
 			clusterID = defaultClusterID
 			clusterID = defaultClusterID
@@ -398,7 +340,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 			dataMins = mins
 			dataMins = mins
 			klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
 			klog.V(3).Infof("[Warning] cluster cost data count not found for cluster %s", id)
 		}
 		}
-		costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"], window, offset, dataMins/util.MinsPerHour)
+		costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"]+cd["localstorage"], window, offset, dataMins/util.MinsPerHour)
 		if err != nil {
 		if err != nil {
 			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
 			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
 			return nil, err
 			return nil, err