|
|
@@ -3,6 +3,7 @@ package costmodel
|
|
|
import (
|
|
|
"fmt"
|
|
|
"os"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
|
|
|
@@ -120,10 +121,6 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
|
|
|
qRAM := fmt.Sprintf(queryClusterRAM, window, offset, window, offset)
|
|
|
qStorage := fmt.Sprintf(queryStorage, window, offset, window, offset, localStorageQuery)
|
|
|
|
|
|
- klog.Infof("[Debug] qCores: %s", qCores)
|
|
|
- klog.Infof("[Debug] qRAM: %s", qRAM)
|
|
|
- klog.Infof("[Debug] qStorage: %s", qStorage)
|
|
|
-
|
|
|
klog.V(4).Infof("Running query %s", qCores)
|
|
|
resultClusterCores, err := Query(cli, qCores)
|
|
|
if err != nil {
|
|
|
@@ -180,8 +177,120 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
|
|
|
return toReturn, nil
|
|
|
}
|
|
|
|
|
|
-// ClusterCosts gives the current full cluster costs averaged over a window of time.
|
|
|
-func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
|
|
|
+// TODO move this to a package-accessible helper struct
|
|
|
+type PromQueryContext struct {
|
|
|
+ client prometheusClient.Client
|
|
|
+ ec *ErrorCollector
|
|
|
+ wg *sync.WaitGroup
|
|
|
+}
|
|
|
+
|
|
|
+// TODO move this to a package-accessible helper function
|
|
|
+func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
|
|
|
+ if ctx.wg != nil {
|
|
|
+ defer ctx.wg.Done()
|
|
|
+ }
|
|
|
+
|
|
|
+ raw, promErr := Query(ctx.client, query)
|
|
|
+ ctx.ec.Report(promErr)
|
|
|
+
|
|
|
+ results, parseErr := NewQueryResults(raw)
|
|
|
+ ctx.ec.Report(parseErr)
|
|
|
+
|
|
|
+ resultCh <- results
|
|
|
+}
|
|
|
+
|
|
|
+type ClusterCosts struct {
|
|
|
+ CPU float64 `json:"cpu"`
|
|
|
+ RAM float64 `json:"ram"`
|
|
|
+ Storage float64 `json:"storage"`
|
|
|
+ Total float64 `json:"total"`
|
|
|
+}
|
|
|
+
|
|
|
+// CumulativeClusterCostsForAllClusters gives the cumulative cluster costs summed over a window of time for all clusters.
|
|
|
+func CumulativeClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, window, offset string) (map[string]ClusterCosts, error) {
|
|
|
+ const fmtQueryTotalCPU = `sum(
|
|
|
+ sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1h]%s)) by (node, cluster_id) *
|
|
|
+ avg(avg_over_time(node_cpu_hourly_cost[%s:1h]%s)) by (node, cluster_id)
|
|
|
+ )`
|
|
|
+
|
|
|
+ const fmtQueryTotalRAM = `sum(
|
|
|
+ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1h]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
|
|
|
+ avg(avg_over_time(node_ram_hourly_cost[%s:1h]%s)) by (node, cluster_id)
|
|
|
+ )`
|
|
|
+
|
|
|
+ const fmtQueryTotalStorage = `sum(
|
|
|
+ sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1h]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
|
|
|
+ avg(avg_over_time(pv_hourly_cost[%s:1h]%s)) by (persistentvolume, cluster_id)
|
|
|
+ )`
|
|
|
+
|
|
|
+ // TODO local storage
|
|
|
+
|
|
|
+ if offset != "" {
|
|
|
+ offset = fmt.Sprintf("offset %s", offset)
|
|
|
+ }
|
|
|
+
|
|
|
+ queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, offset, window, offset)
|
|
|
+ queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, offset, window, offset)
|
|
|
+ queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, offset, window, offset)
|
|
|
+ numQueries := 3
|
|
|
+
|
|
|
+ var ec ErrorCollector
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(numQueries)
|
|
|
+ ctx := PromQueryContext{cli, &ec, &wg}
|
|
|
+
|
|
|
+ klog.Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
|
|
|
+ chTotalCPU := make(chan []*PromQueryResult)
|
|
|
+ go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
|
|
|
+
|
|
|
+ klog.Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
|
|
|
+ chTotalRAM := make(chan []*PromQueryResult)
|
|
|
+ go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
|
|
|
+
|
|
|
+ klog.Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
|
|
|
+ chTotalStorage := make(chan []*PromQueryResult)
|
|
|
+ go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
|
|
|
+
|
|
|
+ costsPerCluster := make(map[string]ClusterCosts)
|
|
|
+
|
|
|
+ // coreTotal, err := resultToTotal(resultClusterCores)
|
|
|
+ // if err != nil {
|
|
|
+ // return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
|
|
|
+ // }
|
|
|
+ // for clusterID, total := range coreTotal {
|
|
|
+ // if _, ok := toReturn[clusterID]; !ok {
|
|
|
+ // toReturn[clusterID] = &Totals{}
|
|
|
+ // }
|
|
|
+ // toReturn[clusterID].CPUCost = total
|
|
|
+ // }
|
|
|
+
|
|
|
+ // ramTotal, err := resultToTotal(resultClusterRAM)
|
|
|
+ // if err != nil {
|
|
|
+ // return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
|
|
|
+ // }
|
|
|
+ // for clusterID, total := range ramTotal {
|
|
|
+ // if _, ok := toReturn[clusterID]; !ok {
|
|
|
+ // toReturn[clusterID] = &Totals{}
|
|
|
+ // }
|
|
|
+ // toReturn[clusterID].MemCost = total
|
|
|
+ // }
|
|
|
+
|
|
|
+ // storageTotal, err := resultToTotal(resultStorage)
|
|
|
+ // if err != nil {
|
|
|
+ // return nil, fmt.Errorf("Error for query %s: %s", qStorage, err.Error())
|
|
|
+ // }
|
|
|
+ // for clusterID, total := range storageTotal {
|
|
|
+ // if _, ok := toReturn[clusterID]; !ok {
|
|
|
+ // toReturn[clusterID] = &Totals{}
|
|
|
+ // }
|
|
|
+ // toReturn[clusterID].StorageCost = total
|
|
|
+ // }
|
|
|
+
|
|
|
+ return costsPerCluster, nil
|
|
|
+}
|
|
|
+
|
|
|
+// ComputeClusterCosts gives the current full cluster costs averaged over a window of time.
|
|
|
+func ComputeClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
|
|
|
// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
|
|
|
if offset != "" {
|
|
|
offset = fmt.Sprintf("offset %s", offset)
|