|
|
@@ -123,7 +123,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
|
|
|
// minsPerResolution determines accuracy and resource use for the following
|
|
|
// queries. Smaller values (higher resolution) result in better accuracy,
|
|
|
// but more expensive queries, and vice-a-versa.
|
|
|
- minsPerResolution := 1
|
|
|
+ minsPerResolution := 5
|
|
|
|
|
|
// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
|
|
|
// value, converts it to a cumulative value; i.e.
|
|
|
@@ -144,10 +144,10 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
|
|
|
resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
|
|
|
resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
|
|
|
|
|
|
- resPVCost := resChPVCost.Await()
|
|
|
- resPVSize := resChPVSize.Await()
|
|
|
- resLocalStorageCost := resChLocalStorageCost.Await()
|
|
|
- resLocalStorageBytes := resChLocalStorageBytes.Await()
|
|
|
+ resPVCost, _ := resChPVCost.Await()
|
|
|
+ resPVSize, _ := resChPVSize.Await()
|
|
|
+ resLocalStorageCost, _ := resChLocalStorageCost.Await()
|
|
|
+ resLocalStorageBytes, _ := resChLocalStorageBytes.Await()
|
|
|
if ctx.ErrorCollector.IsError() {
|
|
|
return nil, ctx.Errors()
|
|
|
}
|
|
|
@@ -176,7 +176,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
|
|
|
Name: name,
|
|
|
}
|
|
|
}
|
|
|
- diskMap[key].Cost = cost
|
|
|
+ diskMap[key].Cost += cost
|
|
|
}
|
|
|
|
|
|
for _, result := range resPVSize {
|
|
|
@@ -227,7 +227,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
|
|
|
Local: true,
|
|
|
}
|
|
|
}
|
|
|
- diskMap[key].Cost = cost
|
|
|
+ diskMap[key].Cost += cost
|
|
|
}
|
|
|
|
|
|
for _, result := range resLocalStorageBytes {
|
|
|
@@ -283,7 +283,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
// minsPerResolution determines accuracy and resource use for the following
|
|
|
// queries. Smaller values (higher resolution) result in better accuracy,
|
|
|
// but more expensive queries, and vice-a-versa.
|
|
|
- minsPerResolution := 1
|
|
|
+ minsPerResolution := 5
|
|
|
|
|
|
// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
|
|
|
// value, converts it to a cumulative value; i.e.
|
|
|
@@ -291,11 +291,11 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
|
|
|
|
|
|
ctx := prom.NewContext(client)
|
|
|
- queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
|
|
|
+ queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
|
|
|
queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
|
|
|
- queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
|
|
|
+ queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
|
|
|
queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
|
|
|
- queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
|
|
|
+ queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost) by (cluster_id, node, provider_id))[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
|
|
|
queryNodeLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
|
|
|
|
|
|
resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
|
|
|
@@ -305,12 +305,12 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
|
|
|
resChNodeLabels := ctx.Query(queryNodeLabels)
|
|
|
|
|
|
- resNodeCPUCost := resChNodeCPUCost.Await()
|
|
|
- resNodeCPUCores := resChNodeCPUCores.Await()
|
|
|
- resNodeGPUCost := resChNodeGPUCost.Await()
|
|
|
- resNodeRAMCost := resChNodeRAMCost.Await()
|
|
|
- resNodeRAMBytes := resChNodeRAMBytes.Await()
|
|
|
- resNodeLabels := resChNodeLabels.Await()
|
|
|
+ resNodeCPUCost, _ := resChNodeCPUCost.Await()
|
|
|
+ resNodeCPUCores, _ := resChNodeCPUCores.Await()
|
|
|
+ resNodeGPUCost, _ := resChNodeGPUCost.Await()
|
|
|
+ resNodeRAMCost, _ := resChNodeRAMCost.Await()
|
|
|
+ resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
|
|
|
+ resNodeLabels, _ := resChNodeLabels.Await()
|
|
|
if ctx.ErrorCollector.IsError() {
|
|
|
return nil, ctx.Errors()
|
|
|
}
|
|
|
@@ -329,22 +329,21 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- nodeType, err := result.GetString("instance_type")
|
|
|
- if err != nil {
|
|
|
- log.Warningf("ClusterNodes: CPU cost data missing node type")
|
|
|
- }
|
|
|
+ nodeType, _ := result.GetString("instance_type")
|
|
|
+ providerID, _ := result.GetString("provider_id")
|
|
|
|
|
|
cpuCost := result.Values[0].Value
|
|
|
|
|
|
key := fmt.Sprintf("%s/%s", cluster, name)
|
|
|
if _, ok := nodeMap[key]; !ok {
|
|
|
nodeMap[key] = &Node{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
- NodeType: nodeType,
|
|
|
+ Cluster: cluster,
|
|
|
+ Name: name,
|
|
|
+ NodeType: nodeType,
|
|
|
+ ProviderID: cp.ParseID(providerID),
|
|
|
}
|
|
|
}
|
|
|
- nodeMap[key].CPUCost = cpuCost
|
|
|
+ nodeMap[key].CPUCost += cpuCost
|
|
|
nodeMap[key].NodeType = nodeType
|
|
|
}
|
|
|
|
|
|
@@ -384,22 +383,21 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- nodeType, err := result.GetString("instance_type")
|
|
|
- if err != nil {
|
|
|
- log.Warningf("ClusterNodes: RAM cost data missing node type")
|
|
|
- }
|
|
|
+ nodeType, _ := result.GetString("instance_type")
|
|
|
+ providerID, _ := result.GetString("provider_id")
|
|
|
|
|
|
ramCost := result.Values[0].Value
|
|
|
|
|
|
key := fmt.Sprintf("%s/%s", cluster, name)
|
|
|
if _, ok := nodeMap[key]; !ok {
|
|
|
nodeMap[key] = &Node{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
- NodeType: nodeType,
|
|
|
+ Cluster: cluster,
|
|
|
+ Name: name,
|
|
|
+ NodeType: nodeType,
|
|
|
+ ProviderID: cp.ParseID(providerID),
|
|
|
}
|
|
|
}
|
|
|
- nodeMap[key].RAMCost = ramCost
|
|
|
+ nodeMap[key].RAMCost += ramCost
|
|
|
nodeMap[key].NodeType = nodeType
|
|
|
}
|
|
|
|
|
|
@@ -439,19 +437,24 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
+ nodeType, _ := result.GetString("instance_type")
|
|
|
+ providerID, _ := result.GetString("provider_id")
|
|
|
+
|
|
|
gpuCost := result.Values[0].Value
|
|
|
|
|
|
key := fmt.Sprintf("%s/%s", cluster, name)
|
|
|
if _, ok := nodeMap[key]; !ok {
|
|
|
nodeMap[key] = &Node{
|
|
|
- Cluster: cluster,
|
|
|
- Name: name,
|
|
|
+ Cluster: cluster,
|
|
|
+ Name: name,
|
|
|
+ NodeType: nodeType,
|
|
|
+ ProviderID: cp.ParseID(providerID),
|
|
|
}
|
|
|
}
|
|
|
- nodeMap[key].GPUCost = gpuCost
|
|
|
+ nodeMap[key].GPUCost += gpuCost
|
|
|
}
|
|
|
|
|
|
- // node_labels label_cloud_google_com_gke_preemptible
|
|
|
+ // Determine preemptibility with node labels
|
|
|
for _, result := range resNodeLabels {
|
|
|
nodeName, err := result.GetString("node")
|
|
|
if err != nil {
|
|
|
@@ -465,6 +468,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
}
|
|
|
|
|
|
// TODO AWS preemptible
|
|
|
+
|
|
|
// TODO Azure preemptible
|
|
|
}
|
|
|
|
|
|
@@ -472,24 +476,20 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
|
|
|
if err != nil {
|
|
|
return nil, []error{err}
|
|
|
}
|
|
|
+
|
|
|
discount, err := ParsePercentString(c.Discount)
|
|
|
if err != nil {
|
|
|
return nil, []error{err}
|
|
|
}
|
|
|
+
|
|
|
negotiatedDiscount, err := ParsePercentString(c.NegotiatedDiscount)
|
|
|
if err != nil {
|
|
|
return nil, []error{err}
|
|
|
}
|
|
|
|
|
|
for _, node := range nodeMap {
|
|
|
- if !node.Preemptible {
|
|
|
- // TODO determine discount(s) based on:
|
|
|
- // - custom settings
|
|
|
- // - node RI data
|
|
|
- // - provider-specific rules, e.g.
|
|
|
- // cp.GetDiscount(instanceType string) float64
|
|
|
- node.Discount = (1.0 - (1.0-discount)*(1.0-negotiatedDiscount))
|
|
|
- }
|
|
|
+ // TODO take RI into account
|
|
|
+ node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
|
|
|
}
|
|
|
|
|
|
return nodeMap, nil
|
|
|
@@ -624,10 +624,19 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
resChs = append(resChs, bdResChs...)
|
|
|
}
|
|
|
|
|
|
+ resDataCount, _ := resChs[0].Await()
|
|
|
+ resTotalGPU, _ := resChs[1].Await()
|
|
|
+ resTotalCPU, _ := resChs[2].Await()
|
|
|
+ resTotalRAM, _ := resChs[3].Await()
|
|
|
+ resTotalStorage, _ := resChs[4].Await()
|
|
|
+ if ctx.HasErrors() {
|
|
|
+ return nil, ctx.Errors()[0]
|
|
|
+ }
|
|
|
+
|
|
|
defaultClusterID := env.GetClusterID()
|
|
|
|
|
|
dataMinsByCluster := map[string]float64{}
|
|
|
- for _, result := range resChs[0].Await() {
|
|
|
+ for _, result := range resDataCount {
|
|
|
clusterID, _ := result.GetString("cluster_id")
|
|
|
if clusterID == "" {
|
|
|
clusterID = defaultClusterID
|
|
|
@@ -676,20 +685,31 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
}
|
|
|
}
|
|
|
// Apply both sustained use and custom discounts to RAM and CPU
|
|
|
- setCostsFromResults(costData, resChs[2].Await(), "cpu", discount, customDiscount)
|
|
|
- setCostsFromResults(costData, resChs[3].Await(), "ram", discount, customDiscount)
|
|
|
+ setCostsFromResults(costData, resTotalCPU, "cpu", discount, customDiscount)
|
|
|
+ setCostsFromResults(costData, resTotalRAM, "ram", discount, customDiscount)
|
|
|
// Apply only custom discount to GPU and storage
|
|
|
- setCostsFromResults(costData, resChs[1].Await(), "gpu", 0.0, customDiscount)
|
|
|
- setCostsFromResults(costData, resChs[4].Await(), "storage", 0.0, customDiscount)
|
|
|
+ setCostsFromResults(costData, resTotalGPU, "gpu", 0.0, customDiscount)
|
|
|
+ setCostsFromResults(costData, resTotalStorage, "storage", 0.0, customDiscount)
|
|
|
if queryTotalLocalStorage != "" {
|
|
|
- setCostsFromResults(costData, resChs[5].Await(), "localstorage", 0.0, customDiscount)
|
|
|
+ resTotalLocalStorage, err := resChs[5].Await()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ setCostsFromResults(costData, resTotalLocalStorage, "localstorage", 0.0, customDiscount)
|
|
|
}
|
|
|
|
|
|
cpuBreakdownMap := map[string]*ClusterCostsBreakdown{}
|
|
|
ramBreakdownMap := map[string]*ClusterCostsBreakdown{}
|
|
|
pvUsedCostMap := map[string]float64{}
|
|
|
if withBreakdown {
|
|
|
- for _, result := range resChs[6].Await() {
|
|
|
+ resCPUModePct, _ := resChs[6].Await()
|
|
|
+ resRAMSystemPct, _ := resChs[7].Await()
|
|
|
+ resRAMUserPct, _ := resChs[8].Await()
|
|
|
+ if ctx.HasErrors() {
|
|
|
+ return nil, ctx.Errors()[0]
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, result := range resCPUModePct {
|
|
|
clusterID, _ := result.GetString("cluster_id")
|
|
|
if clusterID == "" {
|
|
|
clusterID = defaultClusterID
|
|
|
@@ -717,7 +737,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for _, result := range resChs[7].Await() {
|
|
|
+ for _, result := range resRAMSystemPct {
|
|
|
clusterID, _ := result.GetString("cluster_id")
|
|
|
if clusterID == "" {
|
|
|
clusterID = defaultClusterID
|
|
|
@@ -728,7 +748,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
ramBD := ramBreakdownMap[clusterID]
|
|
|
ramBD.System += result.Values[0].Value
|
|
|
}
|
|
|
- for _, result := range resChs[8].Await() {
|
|
|
+ for _, result := range resRAMUserPct {
|
|
|
clusterID, _ := result.GetString("cluster_id")
|
|
|
if clusterID == "" {
|
|
|
clusterID = defaultClusterID
|
|
|
@@ -748,7 +768,11 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
|
|
|
}
|
|
|
|
|
|
if queryUsedLocalStorage != "" {
|
|
|
- for _, result := range resChs[9].Await() {
|
|
|
+ resUsedLocalStorage, err := resChs[9].Await()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ for _, result := range resUsedLocalStorage {
|
|
|
clusterID, _ := result.GetString("cluster_id")
|
|
|
if clusterID == "" {
|
|
|
clusterID = defaultClusterID
|
|
|
@@ -804,20 +828,12 @@ type Totals struct {
|
|
|
StorageCost [][]string `json:"storageCost"`
|
|
|
}
|
|
|
|
|
|
-func resultToTotals(qr interface{}) ([][]string, error) {
|
|
|
- // TODO: Provide an actual query instead of resultToTotals
|
|
|
- qResults, err := prom.NewQueryResults("resultToTotals", qr)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- results := qResults.Results
|
|
|
-
|
|
|
- if len(results) == 0 {
|
|
|
+func resultToTotals(qrs []*prom.QueryResult) ([][]string, error) {
|
|
|
+ if len(qrs) == 0 {
|
|
|
return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
|
|
|
}
|
|
|
|
|
|
- result := results[0]
|
|
|
+ result := qrs[0]
|
|
|
totals := [][]string{}
|
|
|
for _, value := range result.Values {
|
|
|
d0 := fmt.Sprintf("%f", value.Timestamp)
|
|
|
@@ -866,22 +882,28 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
|
|
|
qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
|
|
|
qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
|
|
|
|
|
|
- resultClusterCores, err := QueryRange(cli, qCores, start, end, window)
|
|
|
+ ctx := prom.NewContext(cli)
|
|
|
+ resChClusterCores := ctx.QueryRange(qCores, start, end, window)
|
|
|
+ resChClusterRAM := ctx.QueryRange(qRAM, start, end, window)
|
|
|
+ resChStorage := ctx.QueryRange(qStorage, start, end, window)
|
|
|
+ resChTotal := ctx.QueryRange(qTotal, start, end, window)
|
|
|
+
|
|
|
+ resultClusterCores, err := resChClusterCores.Await()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- resultClusterRAM, err := QueryRange(cli, qRAM, start, end, window)
|
|
|
+ resultClusterRAM, err := resChClusterRAM.Await()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- resultStorage, err := QueryRange(cli, qStorage, start, end, window)
|
|
|
+ resultStorage, err := resChStorage.Await()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- resultTotal, err := QueryRange(cli, qTotal, start, end, window)
|
|
|
+ resultTotal, err := resChTotal.Await()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -910,7 +932,7 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
|
|
|
// If that fails, return an error because something is actually wrong.
|
|
|
qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
|
|
|
|
|
|
- resultNodes, err := QueryRange(cli, qNodes, start, end, window)
|
|
|
+ resultNodes, err := ctx.QueryRangeSync(qNodes, start, end, window)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|