|
|
@@ -74,13 +74,15 @@ type ClusterCosts struct {
|
|
|
|
|
|
// NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
|
|
|
// the associated monthly rate data, and returns the Costs.
|
|
|
-func NewClusterCostsFromCumulative(cpu, ram, storage float64, window, offset string) (*ClusterCosts, error) {
|
|
|
+func NewClusterCostsFromCumulative(cpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
|
|
|
start, end, err := util.ParseTimeRange(window, offset)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- hours := end.Sub(*start).Hours()
|
|
|
+ if dataHours == 0 {
|
|
|
+ dataHours = end.Sub(*start).Hours()
|
|
|
+ }
|
|
|
|
|
|
cc := &ClusterCosts{
|
|
|
Start: start,
|
|
|
@@ -89,9 +91,9 @@ func NewClusterCostsFromCumulative(cpu, ram, storage float64, window, offset str
|
|
|
RAMCumulative: ram,
|
|
|
StorageCumulative: storage,
|
|
|
TotalCumulative: cpu + ram + storage,
|
|
|
- CPUMonthly: cpu / hours * (util.HoursPerDay * util.DaysPerMonth),
|
|
|
- RAMMonthly: ram / hours * (util.HoursPerDay * util.DaysPerMonth),
|
|
|
- StorageMonthly: storage / hours * (util.HoursPerDay * util.DaysPerMonth),
|
|
|
+ CPUMonthly: cpu / dataHours * (util.HoursPerDay * util.DaysPerMonth),
|
|
|
+ RAMMonthly: ram / dataHours * (util.HoursPerDay * util.DaysPerMonth),
|
|
|
+ StorageMonthly: storage / dataHours * (util.HoursPerDay * util.DaysPerMonth),
|
|
|
}
|
|
|
cc.TotalMonthly = cc.CPUMonthly + cc.RAMMonthly + cc.StorageMonthly
|
|
|
|
|
|
@@ -100,13 +102,15 @@ func NewClusterCostsFromCumulative(cpu, ram, storage float64, window, offset str
|
|
|
|
|
|
// NewClusterCostsFromMonthly takes monthly-rate cost data over a given time range, computes
|
|
|
// the associated cumulative cost data, and returns the Costs.
|
|
|
-func NewClusterCostsFromMonthly(cpuMonthly, ramMonthly, storageMonthly float64, window, offset string) (*ClusterCosts, error) {
|
|
|
+func NewClusterCostsFromMonthly(cpuMonthly, ramMonthly, storageMonthly float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
|
|
|
start, end, err := util.ParseTimeRange(window, offset)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- hours := end.Sub(*start).Hours()
|
|
|
+ if dataHours == 0 {
|
|
|
+ dataHours = end.Sub(*start).Hours()
|
|
|
+ }
|
|
|
|
|
|
cc := &ClusterCosts{
|
|
|
Start: start,
|
|
|
@@ -115,9 +119,9 @@ func NewClusterCostsFromMonthly(cpuMonthly, ramMonthly, storageMonthly float64,
|
|
|
RAMMonthly: ramMonthly,
|
|
|
StorageMonthly: storageMonthly,
|
|
|
TotalMonthly: cpuMonthly + ramMonthly + storageMonthly,
|
|
|
- CPUCumulative: cpuMonthly / util.HoursPerMonth * hours,
|
|
|
- RAMCumulative: ramMonthly / util.HoursPerMonth * hours,
|
|
|
- StorageCumulative: storageMonthly / util.HoursPerMonth * hours,
|
|
|
+ CPUCumulative: cpuMonthly / util.HoursPerMonth * dataHours,
|
|
|
+ RAMCumulative: ramMonthly / util.HoursPerMonth * dataHours,
|
|
|
+ StorageCumulative: storageMonthly / util.HoursPerMonth * dataHours,
|
|
|
}
|
|
|
cc.TotalCumulative = cc.CPUCumulative + cc.RAMCumulative + cc.StorageCumulative
|
|
|
|
|
|
@@ -126,35 +130,49 @@ func NewClusterCostsFromMonthly(cpuMonthly, ramMonthly, storageMonthly float64,
|
|
|
|
|
|
// ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
|
|
|
func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
|
|
|
+ // Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
|
|
|
+ start, end, err := util.ParseTimeRange(window, offset)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ mins := end.Sub(*start).Minutes()
|
|
|
+
|
|
|
+ // TODO revise use of 1m resolution
|
|
|
+
|
|
|
+ const fmtQueryDataCount = `max(count_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s))`
|
|
|
+
|
|
|
const fmtQueryTotalCPU = `sum(
|
|
|
- sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1h]%s)) by (node, cluster_id) *
|
|
|
- avg(avg_over_time(node_cpu_hourly_cost[%s:1h]%s)) by (node, cluster_id)
|
|
|
+ sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s)) by (node, cluster_id) *
|
|
|
+ avg(avg_over_time(node_cpu_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
|
|
|
)`
|
|
|
|
|
|
const fmtQueryTotalRAM = `sum(
|
|
|
- sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1h]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
|
|
|
- avg(avg_over_time(node_ram_hourly_cost[%s:1h]%s)) by (node, cluster_id)
|
|
|
+ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
|
|
|
+ avg(avg_over_time(node_ram_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
|
|
|
)`
|
|
|
|
|
|
const fmtQueryTotalStorage = `sum(
|
|
|
- sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1h]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
|
|
|
- avg(avg_over_time(pv_hourly_cost[%s:1h]%s)) by (persistentvolume, cluster_id)
|
|
|
- )`
|
|
|
-
|
|
|
- // TODO local storage
|
|
|
+ sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1m]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
|
|
|
+ avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
|
|
|
+ )%s`
|
|
|
|
|
|
- // TODO norm for interpolating missed scrapes?
|
|
|
+ queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false)
|
|
|
+ if queryTotalLocalStorage != "" {
|
|
|
+ queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
|
|
|
+ }
|
|
|
|
|
|
fmtOffset := ""
|
|
|
if offset != "" {
|
|
|
fmtOffset = fmt.Sprintf("offset %s", offset)
|
|
|
}
|
|
|
|
|
|
+ queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, fmtOffset)
|
|
|
queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
|
|
|
queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
|
|
|
- queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset)
|
|
|
- numQueries := 3
|
|
|
+ queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
|
|
|
+ numQueries := 4
|
|
|
|
|
|
+ klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
|
|
|
klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
|
|
|
klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
|
|
|
klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
|
|
|
@@ -165,6 +183,9 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
ctx := PromQueryContext{client, &ec, &wg}
|
|
|
ctx.wg.Add(numQueries)
|
|
|
|
|
|
+ chDataCount := make(chan []*PromQueryResult, 1)
|
|
|
+ go AsyncPromQuery(queryDataCount, chDataCount, ctx)
|
|
|
+
|
|
|
chTotalCPU := make(chan []*PromQueryResult, 1)
|
|
|
go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
|
|
|
|
|
|
@@ -177,6 +198,9 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
// After queries complete, retrieve results
|
|
|
wg.Wait()
|
|
|
|
|
|
+ resultsDataCount := <-chDataCount
|
|
|
+ close(chDataCount)
|
|
|
+
|
|
|
resultsTotalCPU := <-chTotalCPU
|
|
|
close(chTotalCPU)
|
|
|
|
|
|
@@ -186,6 +210,13 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
resultsTotalStorage := <-chTotalStorage
|
|
|
close(chTotalStorage)
|
|
|
|
|
|
+ dataMins := mins
|
|
|
+ if len(resultsDataCount) > 0 && len(resultsDataCount[0].Values) > 0 {
|
|
|
+ dataMins = resultsDataCount[0].Values[0].Value
|
|
|
+ } else {
|
|
|
+ klog.V(3).Infof("[Warning] cluster cost data count returned no results")
|
|
|
+ }
|
|
|
+
|
|
|
// Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
|
|
|
costData := make(map[string]map[string]float64)
|
|
|
defaultClusterID := os.Getenv(clusterIDKey)
|
|
|
@@ -211,10 +242,12 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
setCostsFromResults(costData, resultsTotalRAM, "ram")
|
|
|
setCostsFromResults(costData, resultsTotalStorage, "storage")
|
|
|
|
|
|
+ // TODO consider using a heuristic threshold to interpolate for missed scrapes (e.g. within 5% of expected minutes, scale totals)
|
|
|
+
|
|
|
// Convert intermediate structure to Costs instances
|
|
|
costsByCluster := map[string]*ClusterCosts{}
|
|
|
for id, cd := range costData {
|
|
|
- costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["ram"], cd["storage"], window, offset)
|
|
|
+ costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["ram"], cd["storage"], window, offset, dataMins/util.MinsPerHour)
|
|
|
if err != nil {
|
|
|
klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
|
|
|
return nil, err
|
|
|
@@ -295,22 +328,20 @@ func resultToTotal(qr interface{}) (map[string][][]string, error) {
|
|
|
}
|
|
|
|
|
|
// ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
|
|
|
-func ClusterCostsForAllClusters(cli prometheus.Client, cloud cloud.Provider, window, offset string) (map[string]*Totals, error) {
|
|
|
- if offset != "" {
|
|
|
- offset = fmt.Sprintf("offset %s", offset)
|
|
|
- }
|
|
|
-
|
|
|
- localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+func ClusterCostsForAllClusters(cli prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*Totals, error) {
|
|
|
+ localStorageQuery := provider.GetLocalStorageQuery(window, offset, true)
|
|
|
if localStorageQuery != "" {
|
|
|
localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
|
|
|
}
|
|
|
|
|
|
- qCores := fmt.Sprintf(queryClusterCores, window, offset, window, offset, window, offset)
|
|
|
- qRAM := fmt.Sprintf(queryClusterRAM, window, offset, window, offset)
|
|
|
- qStorage := fmt.Sprintf(queryStorage, window, offset, window, offset, localStorageQuery)
|
|
|
+ fmtOffset := ""
|
|
|
+ if offset != "" {
|
|
|
+ fmtOffset = fmt.Sprintf("offset %s", offset)
|
|
|
+ }
|
|
|
+
|
|
|
+ qCores := fmt.Sprintf(queryClusterCores, window, fmtOffset, window, fmtOffset, window, fmtOffset)
|
|
|
+ qRAM := fmt.Sprintf(queryClusterRAM, window, fmtOffset, window, fmtOffset)
|
|
|
+ qStorage := fmt.Sprintf(queryStorage, window, fmtOffset, window, fmtOffset, localStorageQuery)
|
|
|
|
|
|
klog.V(4).Infof("Running query %s", qCores)
|
|
|
resultClusterCores, err := Query(cli, qCores)
|
|
|
@@ -370,23 +401,21 @@ func ClusterCostsForAllClusters(cli prometheus.Client, cloud cloud.Provider, win
|
|
|
|
|
|
// AverageClusterTotals gives the current full cluster costs averaged over a window of time.
|
|
|
// Used to be ClutserCosts, but has been deprecated for that use.
|
|
|
-func AverageClusterTotals(cli prometheus.Client, cloud cloud.Provider, windowString, offset string) (*Totals, error) {
|
|
|
+func AverageClusterTotals(cli prometheus.Client, provider cloud.Provider, windowString, offset string) (*Totals, error) {
|
|
|
// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
|
|
|
+ fmtOffset := ""
|
|
|
if offset != "" {
|
|
|
- offset = fmt.Sprintf("offset %s", offset)
|
|
|
+ fmtOffset = fmt.Sprintf("offset %s", offset)
|
|
|
}
|
|
|
|
|
|
- localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+ localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
|
|
|
if localStorageQuery != "" {
|
|
|
localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
|
|
|
}
|
|
|
|
|
|
- qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
|
|
|
- qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
|
|
|
- qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
|
|
|
+ qCores := fmt.Sprintf(queryClusterCores, windowString, fmtOffset, windowString, fmtOffset, windowString, fmtOffset)
|
|
|
+ qRAM := fmt.Sprintf(queryClusterRAM, windowString, fmtOffset, windowString, fmtOffset)
|
|
|
+ qStorage := fmt.Sprintf(queryStorage, windowString, fmtOffset, windowString, fmtOffset, localStorageQuery)
|
|
|
qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
|
|
|
|
|
|
resultClusterCores, err := Query(cli, qCores)
|
|
|
@@ -439,12 +468,8 @@ func AverageClusterTotals(cli prometheus.Client, cloud cloud.Provider, windowStr
|
|
|
}
|
|
|
|
|
|
// ClusterCostsOverTime gives the full cluster costs over time
|
|
|
-func ClusterCostsOverTime(cli prometheus.Client, cloud cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
|
|
|
-
|
|
|
- localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
|
|
|
+ localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
|
|
|
if localStorageQuery != "" {
|
|
|
localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
|
|
|
}
|