Просмотр исходного кода

Merge pull request #811 from kubecost/develop

Merge develop into master
Ajay Tripathy 5 лет назад
Родитель
Сommit
69946c287f

+ 29 - 20
pkg/cloud/gcpprovider.go

@@ -141,18 +141,18 @@ func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool, used bool
 
 	fmtCumulativeQuery := `sum(
 		sum_over_time(%s{device!="tmpfs", id="/"}[%s:1m]%s)
-	) by (cluster_id) / 60 / 730 / 1024 / 1024 / 1024 * %f`
+	) by (%s) / 60 / 730 / 1024 / 1024 / 1024 * %f`
 
 	fmtMonthlyQuery := `sum(
 		avg_over_time(%s{device!="tmpfs", id="/"}[%s:1m]%s)
-	) by (cluster_id) / 1024 / 1024 / 1024 * %f`
+	) by (%s) / 1024 / 1024 / 1024 * %f`
 
 	fmtQuery := fmtCumulativeQuery
 	if rate {
 		fmtQuery = fmtMonthlyQuery
 	}
 
-	return fmt.Sprintf(fmtQuery, baseMetric, window, fmtOffset, localStorageCost)
+	return fmt.Sprintf(fmtQuery, baseMetric, window, fmtOffset, env.GetPromClusterLabel(), localStorageCost)
 }
 
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {
@@ -361,16 +361,17 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []stri
 		s = append(s, gcpOOC...)
 		qerr = err
 		*/
-		queryString := fmt.Sprintf(`(
+		queryString := `(
 			SELECT
 				service.description as service,
 				TO_JSON_STRING(labels) as keys,
 				SUM(cost) as cost
-			FROM  %s
-			WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
+			FROM` +
+			fmt.Sprintf(" `%s` ", c.BillingDataDataset) +
+			fmt.Sprintf(`WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
 			AND usage_start_time >= "%s" AND usage_start_time < "%s"
 			GROUP BY service, keys
-		)`, c.BillingDataDataset, aggregator, start, end)
+		)`, aggregator, start, end)
 		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
 		s = append(s, gcpOOC...)
@@ -387,17 +388,18 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []stri
 			}
 		}
 
-		queryString := fmt.Sprintf(`(
+		queryString := `(
 			SELECT
 				service.description as service,
 				TO_JSON_STRING(labels) as keys,
 				SUM(cost) as cost
-		  	FROM  %s
-		 	WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
+		  	FROM` +
+			fmt.Sprintf(" `%s` ", c.BillingDataDataset) +
+			fmt.Sprintf(`WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
 			AND EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "%s" AND l.value = "%s")
 			AND usage_start_time >= "%s" AND usage_start_time < "%s"
 			GROUP BY service, keys
-		)`, c.BillingDataDataset, aggregator, filterType, filterValue, start, end)
+		)`, aggregator, filterType, filterValue, start, end)
 		klog.V(4).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
 		s = append(s, gcpOOC...)
@@ -1365,17 +1367,24 @@ func (gcp *gcpKey) GPUType() string {
 
 // GetKey maps node labels to information needed to retrieve pricing data
 func (gcp *gcpKey) Features() string {
+	var instanceType string
 	it, _ := util.GetInstanceType(gcp.Labels)
-	instanceType := strings.ToLower(strings.Join(strings.Split(it, "-")[:2], ""))
-	if instanceType == "n1highmem" || instanceType == "n1highcpu" {
-		instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
-	} else if instanceType == "n2highmem" || instanceType == "n2highcpu" {
-		instanceType = "n2standard"
-	} else if instanceType == "e2highmem" || instanceType == "e2highcpu" {
-		instanceType = "e2standard"
-	} else if strings.HasPrefix(instanceType, "custom") {
-		instanceType = "custom" // The suffix of custom does not matter
+	if it == "" {
+		log.DedupedErrorf(1, "Missing or Unknown 'node.kubernetes.io/instance-type' node label")
+		instanceType = "unknown"
+	} else {
+		instanceType = strings.ToLower(strings.Join(strings.Split(it, "-")[:2], ""))
+		if instanceType == "n1highmem" || instanceType == "n1highcpu" {
+			instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
+		} else if instanceType == "n2highmem" || instanceType == "n2highcpu" {
+			instanceType = "n2standard"
+		} else if instanceType == "e2highmem" || instanceType == "e2highcpu" {
+			instanceType = "e2standard"
+		} else if strings.HasPrefix(instanceType, "custom") {
+			instanceType = "custom" // The suffix of custom does not matter
+		}
 	}
+
 	r, _ := util.GetRegion(gcp.Labels)
 	region := strings.ToLower(r)
 	var usageType string

+ 87 - 87
pkg/costmodel/allocation.go

@@ -17,14 +17,14 @@ import (
 )
 
 const (
-	queryFmtPods              = `avg(kube_pod_container_status_running{}) by (pod, namespace, cluster_id)[%s:%s]%s`
-	queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id, provider_id)`
-	queryFmtRAMRequests       = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
-	queryFmtRAMUsageAvg       = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
-	queryFmtRAMUsageMax       = `max(max_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
-	queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
-	queryFmtCPURequests       = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
-	queryFmtCPUUsageAvg       = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
+	queryFmtPods              = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]%s`
+	queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s, provider_id)`
+	queryFmtRAMRequests       = `avg(avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
+	queryFmtRAMUsageAvg       = `avg(avg_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
+	queryFmtRAMUsageMax       = `max(max_over_time(container_memory_working_set_bytes{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
+	queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
+	queryFmtCPURequests       = `avg(avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
+	queryFmtCPUUsageAvg       = `avg(rate(container_cpu_usage_seconds_total{container_name!="", container_name!="POD", instance!=""}[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
 
 	// This query could be written without the recording rule
 	// "kubecost_savings_container_cpu_usage_seconds", but we should
@@ -34,23 +34,23 @@ const (
 	//
 	// See PromQL subquery documentation for a rate example:
 	// https://prometheus.io/blog/2019/01/28/subquery-support/#examples
-	queryFmtCPUUsageMax           = `max(max_over_time(kubecost_savings_container_cpu_usage_seconds[%s]%s)) by (container_name, pod_name, namespace, instance, cluster_id)`
-	queryFmtGPUsRequested         = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, cluster_id)`
-	queryFmtNodeCostPerCPUHr      = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type, provider_id)`
-	queryFmtNodeCostPerRAMGiBHr   = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, cluster_id, instance_type, provider_id)`
-	queryFmtNodeCostPerGPUHr      = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, cluster_id, instance_type, provider_id)`
+	queryFmtCPUUsageMax           = `max(max_over_time(kubecost_savings_container_cpu_usage_seconds[%s]%s)) by (container_name, pod_name, namespace, instance, %s)`
+	queryFmtGPUsRequested         = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
+	queryFmtNodeCostPerCPUHr      = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeCostPerRAMGiBHr   = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeCostPerGPUHr      = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
 	queryFmtNodeIsSpot            = `avg_over_time(kubecost_node_is_spot[%s]%s)`
-	queryFmtPVCInfo               = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, cluster_id)[%s:%s]%s`
-	queryFmtPVBytes               = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, cluster_id)`
-	queryFmtPodPVCAllocation      = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, cluster_id)`
-	queryFmtPVCBytesRequested     = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, cluster_id)`
-	queryFmtPVCostPerGiBHour      = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, cluster_id)`
-	queryFmtNetZoneGiB            = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
-	queryFmtNetZoneCostPerGiB     = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (cluster_id)`
-	queryFmtNetRegionGiB          = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
-	queryFmtNetRegionCostPerGiB   = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (cluster_id)`
-	queryFmtNetInternetGiB        = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, cluster_id) / 1024 / 1024 / 1024`
-	queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (cluster_id)`
+	queryFmtPVCInfo               = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]%s`
+	queryFmtPVBytes               = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, %s)`
+	queryFmtPodPVCAllocation      = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
+	queryFmtPVCBytesRequested     = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, %s)`
+	queryFmtPVCostPerGiBHour      = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, %s)`
+	queryFmtNetZoneGiB            = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetZoneCostPerGiB     = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (%s)`
+	queryFmtNetRegionGiB          = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetRegionCostPerGiB   = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
+	queryFmtNetInternetGiB        = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
 	queryFmtNamespaceLabels       = `avg_over_time(kube_namespace_labels[%s]%s)`
 	queryFmtNamespaceAnnotations  = `avg_over_time(kube_namespace_annotations[%s]%s)`
 	queryFmtPodLabels             = `avg_over_time(kube_pod_labels[%s]%s)`
@@ -58,10 +58,10 @@ const (
 	queryFmtServiceLabels         = `avg_over_time(service_selector_labels[%s]%s)`
 	queryFmtDeploymentLabels      = `avg_over_time(deployment_match_labels[%s]%s)`
 	queryFmtStatefulSetLabels     = `avg_over_time(statefulSet_match_labels[%s]%s)`
-	queryFmtDaemonSetLabels       = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, cluster_id)`
-	queryFmtJobLabels             = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,cluster_id)`
-	queryFmtLBCostPerHr           = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, cluster_id)`
-	queryFmtLBActiveMins          = `count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id)[%s:%s]%s`
+	queryFmtDaemonSetLabels       = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, %s)`
+	queryFmtJobLabels             = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
+	queryFmtLBCostPerHr           = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, %s)`
+	queryFmtLBActiveMins          = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
 )
 
 // ComputeAllocation uses the CostModel instance to compute an AllocationSet
@@ -112,76 +112,76 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 
 	ctx := prom.NewContext(cm.PrometheusClient)
 
-	queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr)
+	queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr, env.GetPromClusterLabel())
 	resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
 
-	queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr)
+	queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr, env.GetPromClusterLabel())
 	resChRAMRequests := ctx.Query(queryRAMRequests)
 
-	queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr)
+	queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr, env.GetPromClusterLabel())
 	resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
 
-	queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr)
+	queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr, env.GetPromClusterLabel())
 	resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
 
-	queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr)
+	queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr, env.GetPromClusterLabel())
 	resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
 
-	queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr)
+	queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr, env.GetPromClusterLabel())
 	resChCPURequests := ctx.Query(queryCPURequests)
 
-	queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr)
+	queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
 	resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
 
-	queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr)
+	queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr, env.GetPromClusterLabel())
 	resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
 
-	queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr)
+	queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr, env.GetPromClusterLabel())
 	resChGPUsRequested := ctx.Query(queryGPUsRequested)
 
-	queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr)
+	queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr, env.GetPromClusterLabel())
 	resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
 
-	queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr)
+	queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr, env.GetPromClusterLabel())
 	resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
 
-	queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr)
+	queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr, env.GetPromClusterLabel())
 	resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
 
 	queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
 	resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
 
-	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, durStr, resStr, offStr)
+	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr, offStr)
 	resChPVCInfo := ctx.Query(queryPVCInfo)
 
-	queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr)
+	queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr, env.GetPromClusterLabel())
 	resChPVBytes := ctx.Query(queryPVBytes)
 
-	queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr)
+	queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr, env.GetPromClusterLabel())
 	resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
 
-	queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr)
+	queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr, env.GetPromClusterLabel())
 	resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
 
-	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr)
+	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
 	resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
 
-	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr)
+	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
 	resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
 
-	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr)
+	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
 	resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
 
-	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr)
+	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr, env.GetPromClusterLabel())
 	resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
 
-	queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr)
+	queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
 	resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
 
-	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr)
+	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr, env.GetPromClusterLabel())
 	resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
 
-	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr)
+	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
 	resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
 
 	queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
@@ -205,16 +205,16 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
 	resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
 
-	queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr)
+	queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr, env.GetPromClusterLabel())
 	resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
 
-	queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr)
+	queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr, env.GetPromClusterLabel())
 	resChJobLabels := ctx.Query(queryJobLabels)
 
-	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr)
+	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr, env.GetPromClusterLabel())
 	resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
 
-	queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, durStr, resStr, offStr)
+	queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr, offStr)
 	resChLBActiveMins := ctx.Query(queryLBActiveMins)
 
 	resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
@@ -456,7 +456,7 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
 			}
 
 			// Submit and profile query
-			queryPods := fmt.Sprintf(queryFmtPods, durStr, resStr, offStr)
+			queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr, offStr)
 			queryProfile := time.Now()
 			resPods, err = ctx.Query(queryPods).Await()
 			if err != nil {
@@ -485,7 +485,7 @@ func applyPodResults(window kubecost.Window, resolution time.Duration, podMap ma
 			continue
 		}
 
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -594,7 +594,7 @@ func applyPodResults(window kubecost.Window, resolution time.Duration, podMap ma
 
 func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom.QueryResult) {
 	for _, res := range resCPUCoresAllocated {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
 			continue
@@ -630,7 +630,7 @@ func applyCPUCoresAllocated(podMap map[podKey]*Pod, resCPUCoresAllocated []*prom
 
 func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom.QueryResult) {
 	for _, res := range resCPUCoresRequested {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
 			continue
@@ -670,7 +670,7 @@ func applyCPUCoresRequested(podMap map[podKey]*Pod, resCPUCoresRequested []*prom
 
 func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.QueryResult) {
 	for _, res := range resCPUCoresUsedAvg {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
 			continue
@@ -697,7 +697,7 @@ func applyCPUCoresUsedAvg(podMap map[podKey]*Pod, resCPUCoresUsedAvg []*prom.Que
 
 func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.QueryResult) {
 	for _, res := range resCPUCoresUsedMax {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
 			continue
@@ -730,7 +730,7 @@ func applyCPUCoresUsedMax(podMap map[podKey]*Pod, resCPUCoresUsedMax []*prom.Que
 
 func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom.QueryResult) {
 	for _, res := range resRAMBytesAllocated {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
 			continue
@@ -766,7 +766,7 @@ func applyRAMBytesAllocated(podMap map[podKey]*Pod, resRAMBytesAllocated []*prom
 
 func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom.QueryResult) {
 	for _, res := range resRAMBytesRequested {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
 			continue
@@ -806,7 +806,7 @@ func applyRAMBytesRequested(podMap map[podKey]*Pod, resRAMBytesRequested []*prom
 
 func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.QueryResult) {
 	for _, res := range resRAMBytesUsedAvg {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
 			continue
@@ -833,7 +833,7 @@ func applyRAMBytesUsedAvg(podMap map[podKey]*Pod, resRAMBytesUsedAvg []*prom.Que
 
 func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.QueryResult) {
 	for _, res := range resRAMBytesUsedMax {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
 			continue
@@ -866,7 +866,7 @@ func applyRAMBytesUsedMax(podMap map[podKey]*Pod, resRAMBytesUsedMax []*prom.Que
 
 func applyGPUsRequested(podMap map[podKey]*Pod, resGPUsRequested []*prom.QueryResult) {
 	for _, res := range resGPUsRequested {
-		key, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
 			continue
@@ -896,7 +896,7 @@ func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryR
 	costPerGiBByCluster := map[string]float64{}
 
 	for _, res := range resNetworkCostPerGiB {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -905,7 +905,7 @@ func applyNetworkAllocation(podMap map[podKey]*Pod, resNetworkGiB []*prom.QueryR
 	}
 
 	for _, res := range resNetworkGiB {
-		podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod_name")
+		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod_name")
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
 			continue
@@ -928,7 +928,7 @@ func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceK
 	namespaceLabels := map[namespaceKey]map[string]string{}
 
 	for _, res := range resNamespaceLabels {
-		nsKey, err := resultNamespaceKey(res, "cluster_id", "namespace")
+		nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
 		if err != nil {
 			continue
 		}
@@ -949,7 +949,7 @@ func resToPodLabels(resPodLabels []*prom.QueryResult) map[podKey]map[string]stri
 	podLabels := map[podKey]map[string]string{}
 
 	for _, res := range resPodLabels {
-		podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
 		if err != nil {
 			continue
 		}
@@ -991,7 +991,7 @@ func resToPodAnnotations(resPodAnnotations []*prom.QueryResult) map[podKey]map[s
 	podAnnotations := map[podKey]map[string]string{}
 
 	for _, res := range resPodAnnotations {
-		podKey, err := resultPodKey(res, "cluster_id", "namespace", "pod")
+		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace", "pod")
 		if err != nil {
 			continue
 		}
@@ -1063,7 +1063,7 @@ func getServiceLabels(resServiceLabels []*prom.QueryResult) map[serviceKey]map[s
 	serviceLabels := map[serviceKey]map[string]string{}
 
 	for _, res := range resServiceLabels {
-		serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service")
+		serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
 		if err != nil {
 			continue
 		}
@@ -1096,7 +1096,7 @@ func resToDeploymentLabels(resDeploymentLabels []*prom.QueryResult) map[controll
 	deploymentLabels := map[controllerKey]map[string]string{}
 
 	for _, res := range resDeploymentLabels {
-		controllerKey, err := resultDeploymentKey(res, "cluster_id", "namespace", "deployment")
+		controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
 		if err != nil {
 			continue
 		}
@@ -1129,7 +1129,7 @@ func resToStatefulSetLabels(resStatefulSetLabels []*prom.QueryResult) map[contro
 	statefulSetLabels := map[controllerKey]map[string]string{}
 
 	for _, res := range resStatefulSetLabels {
-		controllerKey, err := resultStatefulSetKey(res, "cluster_id", "namespace", "statefulSet")
+		controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
 		if err != nil {
 			continue
 		}
@@ -1191,7 +1191,7 @@ func resToPodDaemonSetMap(resDaemonSetLabels []*prom.QueryResult) map[podKey]con
 	daemonSetLabels := map[podKey]controllerKey{}
 
 	for _, res := range resDaemonSetLabels {
-		controllerKey, err := resultDaemonSetKey(res, "cluster_id", "namespace", "owner_name")
+		controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
 		if err != nil {
 			continue
 		}
@@ -1213,7 +1213,7 @@ func resToPodJobMap(resJobLabels []*prom.QueryResult) map[podKey]controllerKey {
 	jobLabels := map[podKey]controllerKey{}
 
 	for _, res := range resJobLabels {
-		controllerKey, err := resultJobKey(res, "cluster_id", "namespace", "owner_name")
+		controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
 		if err != nil {
 			continue
 		}
@@ -1295,7 +1295,7 @@ func applyControllersToPods(podMap map[podKey]*Pod, podControllerMap map[podKey]
 func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr []*prom.QueryResult,
 	providerIDParser func(string) string) {
 	for _, res := range resNodeCostPerCPUHr {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -1334,7 +1334,7 @@ func applyNodeCostPerCPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerCPUHr
 func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRAMGiBHr []*prom.QueryResult,
 	providerIDParser func(string) string) {
 	for _, res := range resNodeCostPerRAMGiBHr {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -1373,7 +1373,7 @@ func applyNodeCostPerRAMGiBHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerRA
 func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr []*prom.QueryResult,
 	providerIDParser func(string) string) {
 	for _, res := range resNodeCostPerGPUHr {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -1411,7 +1411,7 @@ func applyNodeCostPerGPUHr(nodeMap map[nodeKey]*NodePricing, resNodeCostPerGPUHr
 
 func applyNodeSpot(nodeMap map[nodeKey]*NodePricing, resNodeIsSpot []*prom.QueryResult) {
 	for _, res := range resNodeIsSpot {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -1465,7 +1465,7 @@ func applyNodeDiscount(nodeMap map[nodeKey]*NodePricing, cm *CostModel) {
 
 func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
 	for _, res := range resPVCostPerGiBHour {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -1488,7 +1488,7 @@ func buildPVMap(pvMap map[pvKey]*PV, resPVCostPerGiBHour []*prom.QueryResult) {
 
 func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
 	for _, res := range resPVBytes {
-		key, err := resultPVKey(res, "cluster_id", "persistentvolume")
+		key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
 		if err != nil {
 			log.Warningf("CostModel.ComputeAllocation: PV bytes query result missing field: %s", err)
 			continue
@@ -1505,7 +1505,7 @@ func applyPVBytes(pvMap map[pvKey]*PV, resPVBytes []*prom.QueryResult) {
 
 func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey]*PV, resPVCInfo []*prom.QueryResult) {
 	for _, res := range resPVCInfo {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -1564,7 +1564,7 @@ func buildPVCMap(window kubecost.Window, pvcMap map[pvcKey]*PVC, pvMap map[pvKey
 
 func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom.QueryResult) {
 	for _, res := range resPVCBytesRequested {
-		key, err := resultPVCKey(res, "cluster_id", "namespace", "persistentvolumeclaim")
+		key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
 		if err != nil {
 			continue
 		}
@@ -1579,7 +1579,7 @@ func applyPVCBytesRequested(pvcMap map[pvcKey]*PVC, resPVCBytesRequested []*prom
 
 func buildPodPVCMap(podPVCMap map[podKey][]*PVC, pvMap map[pvKey]*PV, pvcMap map[pvcKey]*PVC, podMap map[podKey]*Pod, resPodPVCAllocation []*prom.QueryResult) {
 	for _, res := range resPodPVCAllocation {
-		cluster, err := res.GetString("cluster_id")
+		cluster, err := res.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -1752,14 +1752,14 @@ func getLoadBalancerCosts(resLBCost, resLBActiveMins []*prom.QueryResult, resolu
 	lbMap := make(map[serviceKey]*LB)
 	lbHourlyCosts := make(map[serviceKey]float64)
 	for _, res := range resLBCost {
-		serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
+		serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
 		if err != nil {
 			continue
 		}
 		lbHourlyCosts[serviceKey] = res.Values[0].Value
 	}
 	for _, res := range resLBActiveMins {
-		serviceKey, err := resultServiceKey(res, "cluster_id", "namespace", "service_name")
+		serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
 		if err != nil || len(res.Values) == 0 {
 			continue
 		}

+ 76 - 76
pkg/costmodel/cluster.go

@@ -16,26 +16,26 @@ import (
 
 const (
 	queryClusterCores = `sum(
-		avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, cluster_id) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730 +
-		avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, cluster_id) * 730
-	  ) by (cluster_id)`
+		avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s] %s)) by (node, %s) * avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, %s) * 730 +
+		avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, %s) * 730
+	  ) by (%s)`
 
 	queryClusterRAM = `sum(
-		avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, cluster_id) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, cluster_id) * 730
-	  ) by (cluster_id)`
+		avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s] %s)) by (node, %s) / 1024 / 1024 / 1024 * avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, %s) * 730
+	  ) by (%s)`
 
 	queryStorage = `sum(
-		avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, cluster_id) * 730
-		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
-	  ) by (cluster_id) %s`
+		avg(avg_over_time(pv_hourly_cost[%s] %s)) by (persistentvolume, %s) * 730
+		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s] %s)) by (persistentvolume, %s) / 1024 / 1024 / 1024
+	  ) by (%s) %s`
 
-	queryTotal = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 +
+	queryTotal = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 +
 	  sum(
-		avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, cluster_id) * 730
-		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024
-	  ) by (cluster_id) %s`
+		avg(avg_over_time(pv_hourly_cost[1h])) by (persistentvolume, %s) * 730
+		* avg(avg_over_time(kube_persistentvolume_capacity_bytes[1h])) by (persistentvolume, %s) / 1024 / 1024 / 1024
+	  ) by (%s) %s`
 
-	queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
+	queryNodes = `sum(avg(node_total_hourly_cost) by (node, %s)) * 730 %s`
 )
 
 // Costs represents cumulative and monthly cluster costs over a given duration. Costs
@@ -139,14 +139,14 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	costPerGBHr := 0.04 / 730.0
 
 	ctx := prom.NewContext(client)
-	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (cluster_id, persistentvolume)  * on(cluster_id, persistentvolume) group_right avg(pv_hourly_cost) by (cluster_id, persistentvolume,provider_id))[%s:%dm]%s)/1024/1024/1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryPVCost := fmt.Sprintf(`sum_over_time((avg(kube_persistentvolume_capacity_bytes) by (%s, persistentvolume)  * on(%s, persistentvolume) group_right avg(pv_hourly_cost) by (%s, persistentvolume,provider_id))[%s:%dm]%s)/1024/1024/1024 * %f`, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryPVSize := fmt.Sprintf(`avg_over_time(kube_persistentvolume_capacity_bytes[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (cluster_id, persistentvolume)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
 
-	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
-	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
-	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, cluster_id)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (cluster_id, node)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s)`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
+	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
 
 	resChPVCost := ctx.Query(queryPVCost)
 	resChPVSize := ctx.Query(queryPVSize)
@@ -170,7 +170,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	diskMap := map[string]*Disk{}
 
 	for _, result := range resPVCost {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -200,7 +200,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	}
 
 	for _, result := range resPVSize {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -226,7 +226,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	}
 
 	for _, result := range resLocalStorageCost {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -251,7 +251,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	}
 
 	for _, result := range resLocalStorageUsedCost {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -276,7 +276,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	}
 
 	for _, result := range resLocalStorageBytes {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -301,7 +301,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	}
 
 	for _, result := range resActiveMins {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -334,7 +334,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	}
 
 	for _, result := range resLocalActiveMins {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -467,16 +467,16 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	requiredCtx := prom.NewContext(client)
 	optionalCtx := prom.NewContext(client)
 
-	queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (cluster_id, node, instance_type, provider_id)`, durationStr, offsetStr)
-	queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (cluster_id, node)`, durationStr, offsetStr)
-	queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (cluster_id, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr)
-	queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (cluster_id, node)`, durationStr, offsetStr)
-	queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (cluster_id, node, provider_id)`, durationStr, offsetStr)
-	queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (cluster_id, node, instance_type, provider_id)`, durationStr, offsetStr)
-	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
-	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
-	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, cluster_id, provider_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
+	queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
+	queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr, env.GetPromClusterLabel())
+	queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
+	queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (%s, node, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
+	queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
+	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, %s, mode)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel())
+	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
 	queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 
@@ -613,8 +613,8 @@ func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration,
 	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
 
 	ctx := prom.NewContext(client)
-	queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id, ingress_ip))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
-	queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id, ingress_ip)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip))[%s:%dm]%s) * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
 
 	resChLBCost := ctx.Query(queryLBCost)
 	resChActiveMins := ctx.Query(queryActiveMins)
@@ -629,7 +629,7 @@ func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration,
 	loadBalancerMap := map[string]*LoadBalancer{}
 
 	for _, result := range resLBCost {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -667,7 +667,7 @@ func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration,
 	}
 
 	for _, result := range resActiveMins {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -719,49 +719,49 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
 
 	const fmtQueryDataCount = `
-		count_over_time(sum(kube_node_status_capacity_cpu_cores) by (cluster_id)[%s:%dm]%s) * %d
+		count_over_time(sum(kube_node_status_capacity_cpu_cores) by (%s)[%s:%dm]%s) * %d
 	`
 
 	const fmtQueryTotalGPU = `
 		sum(
 			sum_over_time(node_gpu_hourly_cost[%s:%dm]%s) * %f
-		) by (cluster_id)
+		) by (%s)
 	`
 
 	const fmtQueryTotalCPU = `
 		sum(
-			sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, cluster_id)[%s:%dm]%s) *
-			avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
-		) by (cluster_id)
+			sum_over_time(avg(kube_node_status_capacity_cpu_cores) by (node, %s)[%s:%dm]%s) *
+			avg(avg_over_time(node_cpu_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
+		) by (%s)
 	`
 
 	const fmtQueryTotalRAM = `
 		sum(
-			sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
-			avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, cluster_id) * %f
-		) by (cluster_id)
+			sum_over_time(avg(kube_node_status_capacity_memory_bytes) by (node, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
+			avg(avg_over_time(node_ram_hourly_cost[%s:%dm]%s)) by (node, %s) * %f
+		) by (%s)
 	`
 
 	const fmtQueryTotalStorage = `
 		sum(
-			sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, cluster_id)[%s:%dm]%s) / 1024 / 1024 / 1024 *
-			avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, cluster_id) * %f
-		) by (cluster_id)
+			sum_over_time(avg(kube_persistentvolume_capacity_bytes) by (persistentvolume, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 *
+			avg(avg_over_time(pv_hourly_cost[%s:%dm]%s)) by (persistentvolume, %s) * %f
+		) by (%s)
 	`
 
 	const fmtQueryCPUModePct = `
-		sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id, mode) / ignoring(mode)
-		group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (cluster_id)
+		sum(rate(node_cpu_seconds_total[%s]%s)) by (%s, mode) / ignoring(mode)
+		group_left sum(rate(node_cpu_seconds_total[%s]%s)) by (%s)
 	`
 
 	const fmtQueryRAMSystemPct = `
-		sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (cluster_id)
-		/ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
+		sum(sum_over_time(container_memory_usage_bytes{container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (%s)
+		/ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
 	`
 
 	const fmtQueryRAMUserPct = `
-		sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (cluster_id)
-		/ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (cluster_id)
+		sum(sum_over_time(kubecost_cluster_memory_working_set_bytes[%s:%dm]%s)) by (%s)
+		/ sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (%s)
 	`
 
 	// TODO niko/clustercost metric "kubelet_volume_stats_used_bytes" was deprecated in 1.12, then seems to have come back in 1.17
@@ -780,11 +780,11 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 		fmtOffset = fmt.Sprintf("offset %s", offset)
 	}
 
-	queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, minsPerResolution, fmtOffset, minsPerResolution)
-	queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative)
-	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
-	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
-	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, hourlyToCumulative)
+	queryDataCount := fmt.Sprintf(fmtQueryDataCount, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, minsPerResolution)
+	queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, minsPerResolution, fmtOffset, hourlyToCumulative, env.GetPromClusterLabel())
+	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
+	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
+	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), hourlyToCumulative, env.GetPromClusterLabel())
 
 	ctx := prom.NewContext(client)
 
@@ -806,9 +806,9 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 	}
 
 	if withBreakdown {
-		queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, window, fmtOffset)
-		queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
-		queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, window, minsPerResolution, fmtOffset)
+		queryCPUModePct := fmt.Sprintf(fmtQueryCPUModePct, window, fmtOffset, env.GetPromClusterLabel(), window, fmtOffset, env.GetPromClusterLabel())
+		queryRAMSystemPct := fmt.Sprintf(fmtQueryRAMSystemPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
+		queryRAMUserPct := fmt.Sprintf(fmtQueryRAMUserPct, window, minsPerResolution, fmtOffset, env.GetPromClusterLabel(), window, minsPerResolution, fmtOffset, env.GetPromClusterLabel())
 
 		bdResChs := ctx.QueryAll(
 			queryCPUModePct,
@@ -841,7 +841,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 
 	dataMinsByCluster := map[string]float64{}
 	for _, result := range resDataCount {
-		clusterID, _ := result.GetString("cluster_id")
+		clusterID, _ := result.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -875,7 +875,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 	// the intermediate costData structure.
 	setCostsFromResults := func(costData map[string]map[string]float64, results []*prom.QueryResult, name string, discount float64, customDiscount float64) {
 		for _, result := range results {
-			clusterID, _ := result.GetString("cluster_id")
+			clusterID, _ := result.GetString(env.GetPromClusterLabel())
 			if clusterID == "" {
 				clusterID = defaultClusterID
 			}
@@ -914,7 +914,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 		}
 
 		for _, result := range resCPUModePct {
-			clusterID, _ := result.GetString("cluster_id")
+			clusterID, _ := result.GetString(env.GetPromClusterLabel())
 			if clusterID == "" {
 				clusterID = defaultClusterID
 			}
@@ -942,7 +942,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 		}
 
 		for _, result := range resRAMSystemPct {
-			clusterID, _ := result.GetString("cluster_id")
+			clusterID, _ := result.GetString(env.GetPromClusterLabel())
 			if clusterID == "" {
 				clusterID = defaultClusterID
 			}
@@ -953,7 +953,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 			ramBD.System += result.Values[0].Value
 		}
 		for _, result := range resRAMUserPct {
-			clusterID, _ := result.GetString("cluster_id")
+			clusterID, _ := result.GetString(env.GetPromClusterLabel())
 			if clusterID == "" {
 				clusterID = defaultClusterID
 			}
@@ -977,7 +977,7 @@ func (a *Accesses) ComputeClusterCosts(client prometheus.Client, provider cloud.
 				return nil, err
 			}
 			for _, result := range resUsedLocalStorage {
-				clusterID, _ := result.GetString("cluster_id")
+				clusterID, _ := result.GetString(env.GetPromClusterLabel())
 				if clusterID == "" {
 					clusterID = defaultClusterID
 				}
@@ -1081,10 +1081,10 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 		offset = fmt.Sprintf("offset %s", offset)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
-	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
-	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	qStorage := fmt.Sprintf(queryStorage, windowString, offset, env.GetPromClusterLabel(), windowString, offset, env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
+	qTotal := fmt.Sprintf(queryTotal, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), localStorageQuery)
 
 	ctx := prom.NewContext(cli)
 	resChClusterCores := ctx.QueryRange(qCores, start, end, window)
@@ -1134,7 +1134,7 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 		// If clusterTotal query failed, it's likely because there are no PVs, which
 		// causes the qTotal query to return no data. Instead, query only node costs.
 		// If that fails, return an error because something is actually wrong.
-		qNodes := fmt.Sprintf(queryNodes, localStorageQuery)
+		qNodes := fmt.Sprintf(queryNodes, env.GetPromClusterLabel(), localStorageQuery)
 
 		resultNodes, warnings, err := ctx.QueryRangeSync(qNodes, start, end, window)
 		for _, warning := range warnings {

+ 12 - 12
pkg/costmodel/cluster_helpers.go

@@ -37,7 +37,7 @@ func buildCPUCostMap(
 	clusterAndNameToType := make(map[nodeIdentifierNoProviderID]string)
 
 	for _, result := range resNodeCPUCost {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -83,7 +83,7 @@ func buildRAMCostMap(
 	clusterAndNameToType := make(map[nodeIdentifierNoProviderID]string)
 
 	for _, result := range resNodeRAMCost {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -129,7 +129,7 @@ func buildGPUCostMap(
 	clusterAndNameToType := make(map[nodeIdentifierNoProviderID]string)
 
 	for _, result := range resNodeGPUCost {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -177,7 +177,7 @@ func buildGPUCountMap(
 	gpuCountMap := make(map[NodeIdentifier]float64)
 
 	for _, result := range resNodeGPUCount {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -210,7 +210,7 @@ func buildCPUCoresMap(
 	m := make(map[nodeIdentifierNoProviderID]float64)
 
 	for _, result := range resNodeCPUCores {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -238,7 +238,7 @@ func buildRAMBytesMap(resNodeRAMBytes []*prom.QueryResult) map[nodeIdentifierNoP
 	m := make(map[nodeIdentifierNoProviderID]float64)
 
 	for _, result := range resNodeRAMBytes {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -274,7 +274,7 @@ func buildCPUBreakdownMap(resNodeCPUModeTotal []*prom.QueryResult) map[nodeIdent
 	// Build intermediate structures for CPU usage by (cluster, node) and by
 	// (cluster, node, mode) for computing resouce efficiency
 	for _, result := range resNodeCPUModeTotal {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -344,7 +344,7 @@ func buildRAMUserPctMap(resNodeRAMUserPct []*prom.QueryResult) map[nodeIdentifie
 	m := make(map[nodeIdentifierNoProviderID]float64)
 
 	for _, result := range resNodeRAMUserPct {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -373,7 +373,7 @@ func buildRAMSystemPctMap(resNodeRAMSystemPct []*prom.QueryResult) map[nodeIdent
 	m := make(map[nodeIdentifierNoProviderID]float64)
 
 	for _, result := range resNodeRAMSystemPct {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -408,7 +408,7 @@ func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Durat
 	m := make(map[NodeIdentifier]activeData)
 
 	for _, result := range resActiveMins {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -464,7 +464,7 @@ func buildPreemptibleMap(
 		// GCP preemptible label
 		pre := result.Values[0].Value
 
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
@@ -499,7 +499,7 @@ func buildLabelsMap(
 
 	// Copy labels into node
 	for _, result := range resLabels {
-		cluster, err := result.GetString("cluster_id")
+		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
 		}

+ 4 - 3
pkg/costmodel/containerkeys.go

@@ -4,8 +4,9 @@ import (
 	"errors"
 	"strings"
 
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
-	"k8s.io/api/core/v1"
+	v1 "k8s.io/api/core/v1"
 )
 
 var (
@@ -21,7 +22,7 @@ var (
 	NoNamespaceErr     error = errors.New("Prometheus vector does not have namespace")
 	NoNamespaceNameErr error = errors.New("Prometheus vector does not have string namespace")
 	NoNodeNameErr      error = errors.New("Prometheus vector does not have string node")
-	NoClusterIDErr     error = errors.New("Prometheus vector does not have string cluster_id")
+	NoClusterIDErr     error = errors.New("Prometheus vector does not have string cluster id")
 )
 
 //--------------------------------------------------------------------------
@@ -191,7 +192,7 @@ func NewContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClu
 	if !ok {
 		return nil, NoNodeNameErr
 	}
-	cid, ok := metrics["cluster_id"]
+	cid, ok := metrics[env.GetPromClusterLabel()]
 	if !ok {
 		log.Debugf("Prometheus vector does not have cluster id")
 		cid = defaultClusterID

+ 44 - 44
pkg/costmodel/costmodel.go

@@ -141,16 +141,16 @@ const (
 					count_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD", node!=""}[%s] %s)
 					*
 					avg_over_time(kube_pod_container_resource_requests_memory_bytes{container!="",container!="POD", node!=""}[%s] %s)
-				) by (namespace,container,pod,node,cluster_id) , "container_name","$1","container","(.+)"
+				) by (namespace,container,pod,node,%s) , "container_name","$1","container","(.+)"
 			), "pod_name","$1","pod","(.+)"
 		)
-	) by (namespace,container_name,pod_name,node,cluster_id)`
+	) by (namespace,container_name,pod_name,node,%s)`
 	queryRAMUsageStr = `sort_desc(
 		avg(
 			label_replace(count_over_time(container_memory_working_set_bytes{container_name!="",container_name!="POD", instance!=""}[%s] %s), "node", "$1", "instance","(.+)")
 			*
 			label_replace(avg_over_time(container_memory_working_set_bytes{container_name!="",container_name!="POD", instance!=""}[%s] %s), "node", "$1", "instance","(.+)")
-		) by (namespace,container_name,pod_name,node,cluster_id)
+		) by (namespace,container_name,pod_name,node,%s)
 	)`
 	queryCPURequestsStr = `avg(
 		label_replace(
@@ -159,17 +159,17 @@ const (
 					count_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD", node!=""}[%s] %s)
 					*
 					avg_over_time(kube_pod_container_resource_requests_cpu_cores{container!="",container!="POD", node!=""}[%s] %s)
-				) by (namespace,container,pod,node,cluster_id) , "container_name","$1","container","(.+)"
+				) by (namespace,container,pod,node,%s) , "container_name","$1","container","(.+)"
 			), "pod_name","$1","pod","(.+)"
 		)
-	) by (namespace,container_name,pod_name,node,cluster_id)`
+	) by (namespace,container_name,pod_name,node,%s)`
 	queryCPUUsageStr = `avg(
 		label_replace(
 		rate(
 			container_cpu_usage_seconds_total{container_name!="",container_name!="POD",instance!=""}[%s] %s
 		) , "node", "$1", "instance", "(.+)"
 		)
-	) by (namespace,container_name,pod_name,node,cluster_id)`
+	) by (namespace,container_name,pod_name,node,%s)`
 	queryGPURequestsStr = `avg(
 		label_replace(
 			label_replace(
@@ -178,15 +178,15 @@ const (
 					*
 					avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s] %s)
 					* %f
-				) by (namespace,container,pod,node,cluster_id) , "container_name","$1","container","(.+)"
+				) by (namespace,container,pod,node,%s) , "container_name","$1","container","(.+)"
 			), "pod_name","$1","pod","(.+)"
 		)
-	) by (namespace,container_name,pod_name,node,cluster_id)
-	* on (pod_name, namespace, cluster_id) group_left(container) label_replace(avg(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s)) by (pod,namespace,cluster_id), "pod_name","$1","pod","(.+)")`
-	queryPVRequestsStr = `avg(avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id, kubernetes_node)
+	) by (namespace,container_name,pod_name,node,%s)
+	* on (pod_name, namespace, %s) group_left(container) label_replace(avg(avg_over_time(kube_pod_status_phase{phase="Running"}[%s] %s)) by (pod,namespace,%s), "pod_name","$1","pod","(.+)")`
+	queryPVRequestsStr = `avg(avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, namespace, volumename, %s, kubernetes_node)
 	*
-	on (persistentvolumeclaim, namespace, cluster_id, kubernetes_node) group_right(storageclass, volumename)
-	sum(kube_persistentvolumeclaim_resource_requests_storage_bytes{}) by (persistentvolumeclaim, namespace, cluster_id, kubernetes_node, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, cluster_id, volumename, kubernetes_node)`
+	on (persistentvolumeclaim, namespace, %s, kubernetes_node) group_right(storageclass, volumename)
+	sum(kube_persistentvolumeclaim_resource_requests_storage_bytes{}) by (persistentvolumeclaim, namespace, %s, kubernetes_node, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, %s, volumename, kubernetes_node)`
 	// queryRAMAllocationByteHours yields the total byte-hour RAM allocation over the given
 	// window, aggregated by container.
 	//  [line 3]  sum_over_time(each byte) = [byte*scrape] by metric
@@ -197,7 +197,7 @@ const (
 		label_replace(label_replace(
 			sum(
 				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s])
-			) by (namespace,container,pod,node,cluster_id) * %f / 60 / 60
+			) by (namespace,container,pod,node,%s) * %f / 60 / 60
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryCPUAllocationVCPUHours yields the total VCPU-hour CPU allocation over the given
 	// window, aggregated by container.
@@ -209,11 +209,11 @@ const (
 		label_replace(label_replace(
 			sum(
 				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s])
-			) by (namespace,container,pod,node,cluster_id) * %f / 60 / 60
+			) by (namespace,container,pod,node,%s) * %f / 60 / 60
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryPVCAllocationFmt yields the total byte-hour PVC allocation over the given window.
 	// sum_over_time(each byte) = [byte*scrape] by metric *(scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by pod
-	queryPVCAllocationFmt     = `sum(sum_over_time(pod_pvc_allocation[%s])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) * %f/60/60`
+	queryPVCAllocationFmt     = `sum(sum_over_time(pod_pvc_allocation[%s])) by (%s, namespace, pod, persistentvolume, persistentvolumeclaim) * %f/60/60`
 	queryPVHourlyCostFmt      = `avg_over_time(pv_hourly_cost[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
@@ -221,23 +221,23 @@ const (
 	queryPodAnnotations       = `avg_over_time(kube_pod_annotations[%s])`
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
 	queryStatefulsetLabels    = `avg_over_time(statefulSet_match_labels[%s])`
-	queryPodDaemonsets        = `sum(kube_pod_owner{owner_kind="DaemonSet"}) by (namespace,pod,owner_name,cluster_id)`
-	queryPodJobs              = `sum(kube_pod_owner{owner_kind="Job"}) by (namespace,pod,owner_name,cluster_id)`
+	queryPodDaemonsets        = `sum(kube_pod_owner{owner_kind="DaemonSet"}) by (namespace,pod,owner_name,%s)`
+	queryPodJobs              = `sum(kube_pod_owner{owner_kind="Job"}) by (namespace,pod,owner_name,%s)`
 	queryServiceLabels        = `avg_over_time(service_selector_labels[%s])`
-	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
-	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
-	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
+	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s] %s)) by (namespace,pod_name,%s) / 1024 / 1024 / 1024`
+	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,%s) / 1024 / 1024 / 1024`
+	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,%s) / 1024 / 1024 / 1024`
 	normalizationStr          = `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[%s] %s))`
 )
 
 func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
-	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
-	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, window, offset)
-	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, window, offset, window, offset, 1.0, window, offset)
-	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
-	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, window, "")
-	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, window, "")
-	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, window, "")
+	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset, env.GetPromClusterLabel())
+	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, window, offset, env.GetPromClusterLabel())
+	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, window, offset, window, offset, 1.0, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), window, offset, env.GetPromClusterLabel())
+	queryPVRequests := fmt.Sprintf(queryPVRequestsStr, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, window, "", env.GetPromClusterLabel())
+	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, window, "", env.GetPromClusterLabel())
+	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, window, "", env.GetPromClusterLabel())
 	queryNormalization := fmt.Sprintf(normalizationStr, window, offset)
 
 	// Cluster ID is specific to the source cluster
@@ -747,9 +747,9 @@ func findDeletedNodeInfo(cli prometheusClient.Client, missingNodes map[string]*c
 			offsetStr = fmt.Sprintf("offset %s", offset)
 		}
 
-		queryHistoricalCPUCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, instance, cluster_id)`, window, offsetStr)
-		queryHistoricalRAMCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, instance, cluster_id)`, window, offsetStr)
-		queryHistoricalGPUCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, instance, cluster_id)`, window, offsetStr)
+		queryHistoricalCPUCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s] %s)) by (node, instance, %s)`, window, offsetStr, env.GetPromClusterLabel())
+		queryHistoricalRAMCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s] %s)) by (node, instance, %s)`, window, offsetStr, env.GetPromClusterLabel())
+		queryHistoricalGPUCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s] %s)) by (node, instance, %s)`, window, offsetStr, env.GetPromClusterLabel())
 
 		ctx := prom.NewContext(cli)
 		cpuCostResCh := ctx.Query(queryHistoricalCPUCost)
@@ -1569,19 +1569,19 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 
 	ctx := prom.NewContext(cli)
 
-	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, resStr, scrapeIntervalSeconds)
-	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, resStr, scrapeIntervalSeconds)
-	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, resStr, "", resStr, "")
-	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, resStr, "", resStr, "")
-	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, resStr, "", resStr, "")
-	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, resStr, "")
-	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, resStr, "", resStr, "", resolution.Hours(), resStr, "")
-	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
-	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, resStr, scrapeIntervalSeconds)
+	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, resStr, env.GetPromClusterLabel(), scrapeIntervalSeconds)
+	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, resStr, env.GetPromClusterLabel(), scrapeIntervalSeconds)
+	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, resStr, "", resStr, "", env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, resStr, "", resStr, "", env.GetPromClusterLabel())
+	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, resStr, "", resStr, "", env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, resStr, "", env.GetPromClusterLabel())
+	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, resStr, "", resStr, "", resolution.Hours(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), resStr, "", env.GetPromClusterLabel())
+	queryPVRequests := fmt.Sprintf(queryPVRequestsStr, env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, resStr, env.GetPromClusterLabel(), scrapeIntervalSeconds)
 	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, resStr)
-	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, resStr, "")
-	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, resStr, "")
-	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, resStr, "")
+	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, resStr, "", env.GetPromClusterLabel())
+	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, resStr, "", env.GetPromClusterLabel())
+	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, resStr, "", env.GetPromClusterLabel())
 	queryNormalization := fmt.Sprintf(normalizationStr, resStr, "")
 
 	// Submit all queries for concurrent evaluation
@@ -1605,8 +1605,8 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, cp costAnalyzerC
 	resChServiceLabels := ctx.QueryRange(fmt.Sprintf(queryServiceLabels, resStr), start, end, resolution)
 	resChDeploymentLabels := ctx.QueryRange(fmt.Sprintf(queryDeploymentLabels, resStr), start, end, resolution)
 	resChStatefulsetLabels := ctx.QueryRange(fmt.Sprintf(queryStatefulsetLabels, resStr), start, end, resolution)
-	resChJobs := ctx.QueryRange(queryPodJobs, start, end, resolution)
-	resChDaemonsets := ctx.QueryRange(queryPodDaemonsets, start, end, resolution)
+	resChJobs := ctx.QueryRange(fmt.Sprintf(queryPodJobs, env.GetPromClusterLabel()), start, end, resolution)
+	resChDaemonsets := ctx.QueryRange(fmt.Sprintf(queryPodDaemonsets, env.GetPromClusterLabel()), start, end, resolution)
 	resChNormalization := ctx.QueryRange(queryNormalization, start, end, resolution)
 
 	// Pull k8s pod, controller, service, and namespace details

+ 2 - 1
pkg/costmodel/networkcosts.go

@@ -2,6 +2,7 @@ package costmodel
 
 import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -151,7 +152,7 @@ func getNetworkUsage(qrs []*prom.QueryResult, defaultClusterID string) (map[stri
 			return nil, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			log.Debugf("Prometheus vector does not have cluster id")
 			clusterID = defaultClusterID

+ 14 - 13
pkg/costmodel/promparsers.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -38,7 +39,7 @@ func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*Pe
 	toReturn := make(map[string]*PersistentVolumeClaimData)
 
 	for _, val := range qrs {
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -62,7 +63,7 @@ func GetPVInfo(qrs []*prom.QueryResult, defaultClusterID string) (map[string]*Pe
 		pvClass, err := val.GetString("storageclass")
 		if err != nil {
 			// TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
-			log.Warningf("Storage Class not found for claim \"%s/%s\".", ns, pvcName)
+			log.DedupedWarningf(5, "Storage Class not found for claim \"%s/%s\".", ns, pvcName)
 			pvClass = ""
 		}
 
@@ -84,7 +85,7 @@ func GetPVAllocationMetrics(qrs []*prom.QueryResult, defaultClusterID string) (m
 	toReturn := make(map[string][]*PersistentVolumeClaimData)
 
 	for _, val := range qrs {
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -130,7 +131,7 @@ func GetPVCostMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[str
 	toReturn := make(map[string]*costAnalyzerCloud.PV)
 
 	for _, val := range qrs {
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -159,7 +160,7 @@ func GetNamespaceLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string)
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -191,7 +192,7 @@ func GetPodLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID string) (map[
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -220,7 +221,7 @@ func GetNamespaceAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID st
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -252,7 +253,7 @@ func GetPodAnnotationsMetrics(qrs []*prom.QueryResult, defaultClusterID string)
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -285,7 +286,7 @@ func GetStatefulsetMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -311,7 +312,7 @@ func GetPodDaemonsetsWithMetrics(qrs []*prom.QueryResult, defaultClusterID strin
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -342,7 +343,7 @@ func GetPodJobsWithMetrics(qrs []*prom.QueryResult, defaultClusterID string) (ma
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -374,7 +375,7 @@ func GetDeploymentMatchLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID s
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}
@@ -401,7 +402,7 @@ func GetServiceSelectorLabelsMetrics(qrs []*prom.QueryResult, defaultClusterID s
 			return toReturn, err
 		}
 
-		clusterID, err := val.GetString("cluster_id")
+		clusterID, err := val.GetString(env.GetPromClusterLabel())
 		if clusterID == "" {
 			clusterID = defaultClusterID
 		}

+ 7 - 0
pkg/env/costmodelenv.go

@@ -67,6 +67,8 @@ const (
 	ETLMaxBatchHours             = "ETL_MAX_BATCH_HOURS"
 	ETLResolutionSeconds         = "ETL_RESOLUTION_SECONDS"
 	LegacyExternalAPIDisabledVar = "LEGACY_EXTERNAL_API_DISABLED"
+
+	PromClusterIDLabelEnvVar = "PROM_CLUSTER_ID_LABEL"
 )
 
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
@@ -376,3 +378,8 @@ func GetETLResolution() time.Duration {
 func LegacyExternalCostsAPIDisabled() bool {
 	return GetBool(LegacyExternalAPIDisabledVar, false)
 }
+
+// GetPromClusterLabel returns the environemnt variable value for PromClusterIDLabel
+func GetPromClusterLabel() string {
+	return Get(PromClusterIDLabelEnvVar, "cluster_id")
+}

+ 257 - 95
pkg/kubecost/allocation.go

@@ -693,6 +693,7 @@ func NewAllocationSet(start, end time.Time, allocs ...*Allocation) *AllocationSe
 type AllocationAggregationOptions struct {
 	FilterFuncs       []AllocationMatchFunc
 	SplitIdle         bool
+	IdleByNode        bool
 	MergeUnallocated  bool
 	ShareFuncs        []AllocationMatchFunc
 	ShareIdle         string
@@ -818,7 +819,7 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 
 	// (2) In order to correctly share idle and shared costs, we first compute
 	// sharing coefficients, which represent the proportion of each cost to
-	// share with each allocation. Idle allocations are shared per-cluster,
+	// share with each allocation. Idle allocations are shared per-cluster or per-node,
 	// per-allocation, and per-resource, while shared resources are shared per-
 	// allocation only.
 	//
@@ -880,7 +881,7 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 	//  idle:       20.00
 	//
 	// Note that this can happen for any field, not just cluster, so we again
-	// need to track this on a per-cluster, per-allocation, per-resource basis.
+	// need to track this on a per-cluster or per-node, per-allocation, per-resource basis.
 	var idleFiltrationCoefficients map[string]map[string]map[string]float64
 	if len(options.FilterFuncs) > 0 && options.ShareIdle == ShareNone {
 		idleFiltrationCoefficients, err = computeIdleCoeffs(options, as, shareSet)
@@ -927,10 +928,10 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 
 	// (3-5) Filter, distribute idle cost, and aggregate (in that order)
 	for _, alloc := range as.allocations {
-		cluster := alloc.Properties.Cluster
-		if cluster == "" {
-			log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
-			return fmt.Errorf("ClusterProp is not set")
+		idleKey, err := alloc.getIdleKey(options)
+		if err != nil {
+			log.DedupedInfof(5,"AllocationSet.AggregateBy: missing idleKey for allocation: %s", alloc.Name)
+			continue
 		}
 
 		skip := false
@@ -948,7 +949,7 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 			// entry will result in that proportional amount being removed
 			// from the idle allocation at the end of the process.)
 			if idleFiltrationCoefficients != nil {
-				if ifcc, ok := idleFiltrationCoefficients[cluster]; ok {
+				if ifcc, ok := idleFiltrationCoefficients[idleKey]; ok {
 					delete(ifcc, alloc.Name)
 				}
 			}
@@ -961,35 +962,35 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 		// all idle allocations will be in the aggSet at this point, so idleSet
 		// will be empty and we won't enter this block.
 		if idleSet.Length() > 0 {
-			// Distribute idle allocations by coefficient per-cluster, per-allocation
+			// Distribute idle allocations by coefficient per-idleKey, per-allocation
 			for _, idleAlloc := range idleSet.allocations {
-				// Only share idle if the cluster matches; i.e. the allocation
-				// is from the same cluster as the idle costs
-				idleCluster := idleAlloc.Properties.Cluster
-				if idleCluster == "" {
-					return fmt.Errorf("ClusterProp is not set")
+				// Only share idle if the idleKey matches; i.e. the allocation
+				// is from the same idleKey as the idle costs
+				iaIdleKey, err := idleAlloc.getIdleKey(options)
+				if err != nil {
+					return err
 				}
-				if idleCluster != cluster {
+				if iaIdleKey != idleKey {
 					continue
 				}
 
 				// Make sure idle coefficients exist
-				if _, ok := idleCoefficients[cluster]; !ok {
-					log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no cluster '%s' for '%s'", cluster, alloc.Name)
+				if _, ok := idleCoefficients[idleKey]; !ok {
+					log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleKey '%s' for '%s'", idleKey, alloc.Name)
 					continue
 				}
-				if _, ok := idleCoefficients[cluster][alloc.Name]; !ok {
+				if _, ok := idleCoefficients[idleKey][alloc.Name]; !ok {
 					log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
 					continue
 				}
 
-				alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[cluster][alloc.Name]["cpu"]
-				alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[cluster][alloc.Name]["gpu"]
-				alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[cluster][alloc.Name]["ram"]
+				alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleKey][alloc.Name]["cpu"]
+				alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleKey][alloc.Name]["gpu"]
+				alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleKey][alloc.Name]["ram"]
 
-				idleCPUCost := idleAlloc.CPUCost * idleCoefficients[cluster][alloc.Name]["cpu"]
-				idleGPUCost := idleAlloc.GPUCost * idleCoefficients[cluster][alloc.Name]["gpu"]
-				idleRAMCost := idleAlloc.RAMCost * idleCoefficients[cluster][alloc.Name]["ram"]
+				idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleKey][alloc.Name]["cpu"]
+				idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleKey][alloc.Name]["gpu"]
+				idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleKey][alloc.Name]["ram"]
 				alloc.CPUCost += idleCPUCost
 				alloc.GPUCost += idleGPUCost
 				alloc.RAMCost += idleRAMCost
@@ -1015,41 +1016,41 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 	// before sharing with the aggregated allocations.
 	if idleSet.Length() > 0 && shareSet.Length() > 0 {
 		for _, alloc := range shareSet.allocations {
-			cluster := alloc.Properties.Cluster
-			if cluster == "" {
-				log.Warningf("AllocationSet.AggregateBy: missing cluster for allocation: %s", alloc.Name)
-				return err
+			idleKey, err := alloc.getIdleKey(options)
+			if err != nil {
+				log.DedupedWarningf(5, "AllocationSet.AggregateBy: missing idleKey for allocation: %s", alloc.Name)
+				continue
 			}
 
-			// Distribute idle allocations by coefficient per-cluster, per-allocation
+			// Distribute idle allocations by coefficient per-idleKey, per-allocation
 			for _, idleAlloc := range idleSet.allocations {
-				// Only share idle if the cluster matches; i.e. the allocation
-				// is from the same cluster as the idle costs
-				idleCluster := idleAlloc.Properties.Cluster
-				if idleCluster == "" {
-					return fmt.Errorf("ClusterProp is not set")
+				// Only share idle if the idleKey matches; i.e. the allocation
+				// is from the same idleKey as the idle costs
+				iaIdleKey, err := idleAlloc.getIdleKey(options)
+				if err != nil {
+					return nil
 				}
-				if idleCluster != cluster {
+				if iaIdleKey != idleKey {
 					continue
 				}
 
 				// Make sure idle coefficients exist
-				if _, ok := idleCoefficients[cluster]; !ok {
-					log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no cluster '%s' for '%s'", cluster, alloc.Name)
+				if _, ok := idleCoefficients[idleKey]; !ok {
+					log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient: no idleKey '%s' for '%s'", idleKey, alloc.Name)
 					continue
 				}
-				if _, ok := idleCoefficients[cluster][alloc.Name]; !ok {
+				if _, ok := idleCoefficients[idleKey][alloc.Name]; !ok {
 					log.Warningf("AllocationSet.AggregateBy: error getting idle coefficient for '%s'", alloc.Name)
 					continue
 				}
 
-				alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[cluster][alloc.Name]["cpu"]
-				alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[cluster][alloc.Name]["gpu"]
-				alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[cluster][alloc.Name]["ram"]
+				alloc.CPUCoreHours += idleAlloc.CPUCoreHours * idleCoefficients[idleKey][alloc.Name]["cpu"]
+				alloc.GPUHours += idleAlloc.GPUHours * idleCoefficients[idleKey][alloc.Name]["gpu"]
+				alloc.RAMByteHours += idleAlloc.RAMByteHours * idleCoefficients[idleKey][alloc.Name]["ram"]
 
-				idleCPUCost := idleAlloc.CPUCost * idleCoefficients[cluster][alloc.Name]["cpu"]
-				idleGPUCost := idleAlloc.GPUCost * idleCoefficients[cluster][alloc.Name]["gpu"]
-				idleRAMCost := idleAlloc.RAMCost * idleCoefficients[cluster][alloc.Name]["ram"]
+				idleCPUCost := idleAlloc.CPUCost * idleCoefficients[idleKey][alloc.Name]["cpu"]
+				idleGPUCost := idleAlloc.GPUCost * idleCoefficients[idleKey][alloc.Name]["gpu"]
+				idleRAMCost := idleAlloc.RAMCost * idleCoefficients[idleKey][alloc.Name]["ram"]
 				alloc.CPUCost += idleCPUCost
 				alloc.GPUCost += idleGPUCost
 				alloc.RAMCost += idleRAMCost
@@ -1057,17 +1058,18 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 		}
 	}
 
-	// clusterIdleFiltrationCoeffs is used to track per-resource idle
-	// coefficients on a cluster-by-cluster basis. It is, essentailly, an
-	// aggregation of idleFiltrationCoefficients after they have been
+	// groupingIdleFiltrationCoeffs is used to track per-resource idle
+	// coefficients on a cluster-by-cluster or node-by-node basis depending
+	// on the IdleByNode option. It is, essentailly, an aggregation of
+	// idleFiltrationCoefficients after they have been
 	// filtered above (in step 3)
-	var clusterIdleFiltrationCoeffs map[string]map[string]float64
+	var groupingIdleFiltrationCoeffs map[string]map[string]float64
 	if idleFiltrationCoefficients != nil {
-		clusterIdleFiltrationCoeffs = map[string]map[string]float64{}
+		groupingIdleFiltrationCoeffs = map[string]map[string]float64{}
 
-		for cluster, m := range idleFiltrationCoefficients {
-			if _, ok := clusterIdleFiltrationCoeffs[cluster]; !ok {
-				clusterIdleFiltrationCoeffs[cluster] = map[string]float64{
+		for idleKey, m := range idleFiltrationCoefficients {
+			if _, ok := groupingIdleFiltrationCoeffs[idleKey]; !ok {
+				groupingIdleFiltrationCoeffs[idleKey] = map[string]float64{
 					"cpu": 0.0,
 					"gpu": 0.0,
 					"ram": 0.0,
@@ -1076,7 +1078,7 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 
 			for _, n := range m {
 				for resource, val := range n {
-					clusterIdleFiltrationCoeffs[cluster][resource] += val
+					groupingIdleFiltrationCoeffs[idleKey][resource] += val
 				}
 			}
 		}
@@ -1084,17 +1086,17 @@ func (as *AllocationSet) AggregateBy(aggregateBy []string, options *AllocationAg
 
 	// (7) If we have both un-shared idle allocations and idle filtration
 	// coefficients then apply those. See step (2b) for an example.
-	if len(aggSet.idleKeys) > 0 && clusterIdleFiltrationCoeffs != nil {
+	if len(aggSet.idleKeys) > 0 && groupingIdleFiltrationCoeffs != nil {
 		for idleKey := range aggSet.idleKeys {
 			idleAlloc := aggSet.Get(idleKey)
 
-			cluster := idleAlloc.Properties.Cluster
-			if cluster == "" {
-				log.Warningf("AllocationSet.AggregateBy: idle allocation without cluster: %s", idleAlloc)
+			iaIdleKey, err := idleAlloc.getIdleKey(options)
+			if err != nil {
+				log.Warningf("AllocationSet.AggregateBy: idle allocation without IdleKey: %s", idleAlloc)
 				continue
 			}
 
-			if resourceCoeffs, ok := clusterIdleFiltrationCoeffs[cluster]; ok {
+			if resourceCoeffs, ok := groupingIdleFiltrationCoeffs[iaIdleKey]; ok {
 				idleAlloc.CPUCost *= resourceCoeffs["cpu"]
 				idleAlloc.CPUCoreHours *= resourceCoeffs["cpu"]
 				idleAlloc.RAMCost *= resourceCoeffs["ram"]
@@ -1236,43 +1238,43 @@ func computeIdleCoeffs(options *AllocationAggregationOptions, as *AllocationSet,
 			continue
 		}
 
-		// We need to key the allocations by cluster id
-		clusterID := alloc.Properties.Cluster
-		if clusterID == "" {
-			return nil, fmt.Errorf("ClusterProp is not set")
+		idleKey, err := alloc.getIdleKey(options)
+		if err != nil {
+			// skip allocations that are missing idleKey
+			continue
 		}
 
 		// get the name key for the allocation
 		name := alloc.Name
 
-		// Create cluster based tables if they don't exist
-		if _, ok := coeffs[clusterID]; !ok {
-			coeffs[clusterID] = map[string]map[string]float64{}
+		// Create key based tables if they don't exist
+		if _, ok := coeffs[idleKey]; !ok {
+			coeffs[idleKey] = map[string]map[string]float64{}
 		}
-		if _, ok := totals[clusterID]; !ok {
-			totals[clusterID] = map[string]float64{}
+		if _, ok := totals[idleKey]; !ok {
+			totals[idleKey] = map[string]float64{}
 		}
 
-		if _, ok := coeffs[clusterID][name]; !ok {
-			coeffs[clusterID][name] = map[string]float64{}
+		if _, ok := coeffs[idleKey][name]; !ok {
+			coeffs[idleKey][name] = map[string]float64{}
 		}
 
 		if shareType == ShareEven {
 			for _, r := range types {
 				// Not additive - hard set to 1.0
-				coeffs[clusterID][name][r] = 1.0
+				coeffs[idleKey][name][r] = 1.0
 
 				// totals are additive
-				totals[clusterID][r] += 1.0
+				totals[idleKey][r] += 1.0
 			}
 		} else {
-			coeffs[clusterID][name]["cpu"] += alloc.CPUTotalCost()
-			coeffs[clusterID][name]["gpu"] += alloc.GPUTotalCost()
-			coeffs[clusterID][name]["ram"] += alloc.RAMTotalCost()
+			coeffs[idleKey][name]["cpu"] += alloc.CPUTotalCost()
+			coeffs[idleKey][name]["gpu"] += alloc.GPUTotalCost()
+			coeffs[idleKey][name]["ram"] += alloc.RAMTotalCost()
 
-			totals[clusterID]["cpu"] += alloc.CPUTotalCost()
-			totals[clusterID]["gpu"] += alloc.GPUTotalCost()
-			totals[clusterID]["ram"] += alloc.RAMTotalCost()
+			totals[idleKey]["cpu"] += alloc.CPUTotalCost()
+			totals[idleKey]["gpu"] += alloc.GPUTotalCost()
+			totals[idleKey]["ram"] += alloc.RAMTotalCost()
 		}
 	}
 
@@ -1283,43 +1285,43 @@ func computeIdleCoeffs(options *AllocationAggregationOptions, as *AllocationSet,
 			continue
 		}
 
-		// We need to key the allocations by cluster id
-		clusterID := alloc.Properties.Cluster
-		if clusterID == "" {
-			return nil, fmt.Errorf("ClusterProp is not set")
+		// idleKey will be providerId or cluster
+		idleKey, err := alloc.getIdleKey(options)
+		if err != nil {
+			return nil, err
 		}
 
 		// get the name key for the allocation
 		name := alloc.Name
 
-		// Create cluster based tables if they don't exist
-		if _, ok := coeffs[clusterID]; !ok {
-			coeffs[clusterID] = map[string]map[string]float64{}
+		// Create idleKey based tables if they don't exist
+		if _, ok := coeffs[idleKey]; !ok {
+			coeffs[idleKey] = map[string]map[string]float64{}
 		}
-		if _, ok := totals[clusterID]; !ok {
-			totals[clusterID] = map[string]float64{}
+		if _, ok := totals[idleKey]; !ok {
+			totals[idleKey] = map[string]float64{}
 		}
 
-		if _, ok := coeffs[clusterID][name]; !ok {
-			coeffs[clusterID][name] = map[string]float64{}
+		if _, ok := coeffs[idleKey][name]; !ok {
+			coeffs[idleKey][name] = map[string]float64{}
 		}
 
 		if shareType == ShareEven {
 			for _, r := range types {
 				// Not additive - hard set to 1.0
-				coeffs[clusterID][name][r] = 1.0
+				coeffs[idleKey][name][r] = 1.0
 
 				// totals are additive
-				totals[clusterID][r] += 1.0
+				totals[idleKey][r] += 1.0
 			}
 		} else {
-			coeffs[clusterID][name]["cpu"] += alloc.CPUTotalCost()
-			coeffs[clusterID][name]["gpu"] += alloc.GPUTotalCost()
-			coeffs[clusterID][name]["ram"] += alloc.RAMTotalCost()
+			coeffs[idleKey][name]["cpu"] += alloc.CPUTotalCost()
+			coeffs[idleKey][name]["gpu"] += alloc.GPUTotalCost()
+			coeffs[idleKey][name]["ram"] += alloc.RAMTotalCost()
 
-			totals[clusterID]["cpu"] += alloc.CPUTotalCost()
-			totals[clusterID]["gpu"] += alloc.GPUTotalCost()
-			totals[clusterID]["ram"] += alloc.RAMTotalCost()
+			totals[idleKey]["cpu"] += alloc.CPUTotalCost()
+			totals[idleKey]["gpu"] += alloc.GPUTotalCost()
+			totals[idleKey]["ram"] += alloc.RAMTotalCost()
 		}
 	}
 
@@ -1337,6 +1339,26 @@ func computeIdleCoeffs(options *AllocationAggregationOptions, as *AllocationSet,
 	return coeffs, nil
 }
 
+// getIdleKey returns the providerId or cluster of an Allocation depending on the IdleByNode
+// option in the AllocationAggregationOptions and an error if the respective field is missing
+func (a *Allocation) getIdleKey(options *AllocationAggregationOptions) (string, error) {
+	var idleKey string
+	if options.IdleByNode {
+		// Key allocations to ProviderId to match against node
+		idleKey = a.Properties.ProviderID
+		if idleKey == "" {
+			return idleKey, fmt.Errorf("ProviderId is not set")
+		}
+	} else {
+		// key the allocations by cluster id
+		idleKey = a.Properties.Cluster
+		if idleKey == "" {
+			return idleKey, fmt.Errorf("ClusterProp is not set")
+		}
+	}
+	return idleKey, nil
+}
+
 func (a *Allocation) generateKey(aggregateBy []string) string {
 	if a == nil {
 		return ""
@@ -1545,6 +1567,7 @@ func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]
 				// the entire node cost and we should make everything 0
 				// without dividing by 0.
 				adjustmentRate = 0.0
+				log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
 			} else if node.Adjustment() != 0.0 {
 				// adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
 				// to cost-without-adjustment (i.e. TotalCost - Adjustment).
@@ -1631,6 +1654,145 @@ func (as *AllocationSet) ComputeIdleAllocations(assetSet *AssetSet) (map[string]
 	return idleAllocs, nil
 }
 
+// ComputeIdleAllocationsByNode computes the idle allocations for the AllocationSet,
+// given a set of Assets. Ideally, assetSet should contain only Nodes, but if
+// it contains other Assets, they will be ignored; only CPU, GPU and RAM are
+// considered for idle allocation. If the Nodes have adjustments, then apply
+// the adjustments proportionally to each of the resources so that total
+// allocation with idle reflects the adjusted node costs. One idle allocation
+// per-node will be computed and returned, keyed by cluster_id.
+func (as *AllocationSet) ComputeIdleAllocationsByNode(assetSet *AssetSet) (map[string]*Allocation, error) {
+	if as == nil {
+		return nil, fmt.Errorf("cannot compute idle allocation for nil AllocationSet")
+	}
+
+	if assetSet == nil {
+		return nil, fmt.Errorf("cannot compute idle allocation with nil AssetSet")
+	}
+
+	if !as.Window.Equal(assetSet.Window) {
+		return nil, fmt.Errorf("cannot compute idle allocation for sets with mismatched windows: %s != %s", as.Window, assetSet.Window)
+	}
+
+	window := as.Window
+
+	// Build a map of cumulative cluster asset costs, per resource; i.e.
+	// cluster-to-{cpu|gpu|ram}-to-cost.
+	assetNodeResourceCosts := map[string]map[string]float64{}
+	nodesByProviderId := map[string]*Node{}
+	assetSet.Each(func(key string, a Asset) {
+		if node, ok := a.(*Node); ok {
+			if _, ok := assetNodeResourceCosts[node.Properties().ProviderID]; ok || node.Properties().ProviderID == "" {
+				return
+			}
+
+			nodesByProviderId[node.Properties().ProviderID] = node
+			assetNodeResourceCosts[node.Properties().ProviderID] = map[string]float64{}
+
+			// adjustmentRate is used to scale resource costs proportionally
+			// by the adjustment. This is necessary because we only get one
+			// adjustment per Node, not one per-resource-per-Node.
+			//
+			// e.g. total cost = $90, adjustment = -$10 => 0.9
+			// e.g. total cost = $150, adjustment = -$300 => 0.3333
+			// e.g. total cost = $150, adjustment = $50 => 1.5
+			adjustmentRate := 1.0
+			if node.TotalCost()-node.Adjustment() == 0 {
+				// If (totalCost - adjustment) is 0.0 then adjustment cancels
+				// the entire node cost and we should make everything 0
+				// without dividing by 0.
+				adjustmentRate = 0.0
+				log.DedupedWarningf(5, "Compute Idle Allocations: Node Cost Adjusted to $0.00 for %s", node.properties.Name)
+			} else if node.Adjustment() != 0.0 {
+				// adjustmentRate is the ratio of cost-with-adjustment (i.e. TotalCost)
+				// to cost-without-adjustment (i.e. TotalCost - Adjustment).
+				adjustmentRate = node.TotalCost() / (node.TotalCost() - node.Adjustment())
+			}
+
+			cpuCost := node.CPUCost * (1.0 - node.Discount) * adjustmentRate
+			gpuCost := node.GPUCost * (1.0 - node.Discount) * adjustmentRate
+			ramCost := node.RAMCost * (1.0 - node.Discount) * adjustmentRate
+
+			assetNodeResourceCosts[node.Properties().ProviderID]["cpu"] += cpuCost
+			assetNodeResourceCosts[node.Properties().ProviderID]["gpu"] += gpuCost
+			assetNodeResourceCosts[node.Properties().ProviderID]["ram"] += ramCost
+		}
+	})
+
+	// Determine start, end on a per-cluster basis
+	nodeStarts := map[string]time.Time{}
+	nodeEnds := map[string]time.Time{}
+
+	// Subtract allocated costs from asset costs, leaving only the remaining
+	// idle costs.
+	as.Each(func(name string, a *Allocation) {
+		providerId := a.Properties.ProviderID
+		if providerId == "" {
+			// Failed to find allocation's node
+			return
+		}
+
+		if _, ok := assetNodeResourceCosts[providerId]; !ok {
+			// Failed to find assets for allocation's node
+			return
+		}
+
+		// Set cluster (start, end) if they are either not currently set,
+		// or if the detected (start, end) of the current allocation falls
+		// before or after, respectively, the current values.
+		if s, ok := nodeStarts[providerId]; !ok || a.Start.Before(s) {
+			nodeStarts[providerId] = a.Start
+		}
+		if e, ok := nodeEnds[providerId]; !ok || a.End.After(e) {
+			nodeEnds[providerId] = a.End
+		}
+
+		assetNodeResourceCosts[providerId]["cpu"] -= a.CPUTotalCost()
+		assetNodeResourceCosts[providerId]["gpu"] -= a.GPUTotalCost()
+		assetNodeResourceCosts[providerId]["ram"] -= a.RAMTotalCost()
+	})
+
+	// Turn remaining un-allocated asset costs into idle allocations
+	idleAllocs := map[string]*Allocation{}
+	for providerId, resources := range assetNodeResourceCosts {
+		// Default start and end to the (start, end) of the given window, but
+		// use the actual, detected (start, end) pair if they are available.
+		start := *window.Start()
+		if s, ok := nodeStarts[providerId]; ok && window.Contains(s) {
+			start = s
+		}
+		end := *window.End()
+		if e, ok := nodeEnds[providerId]; ok && window.Contains(e) {
+			end = e
+		}
+		node := nodesByProviderId[providerId]
+		idleAlloc := &Allocation{
+			Name:   fmt.Sprintf("%s/%s", node.properties.Name, IdleSuffix),
+			Window: window.Clone(),
+			Properties: &AllocationProperties{
+				Cluster:    node.properties.Cluster,
+				Node:       node.properties.Name,
+				ProviderID: providerId,
+			},
+			Start:   start,
+			End:     end,
+			CPUCost: resources["cpu"],
+			GPUCost: resources["gpu"],
+			RAMCost: resources["ram"],
+		}
+
+		// Do not continue if multiple idle allocations are computed for a
+		// single node.
+		if _, ok := idleAllocs[providerId]; ok {
+			return nil, fmt.Errorf("duplicate idle allocations for node Provider ID: %s", providerId)
+		}
+
+		idleAllocs[providerId] = idleAlloc
+	}
+
+	return idleAllocs, nil
+}
+
 // Reconcile calculate the exact cost of Allocation by resource(cpu, ram, gpu etc) based on Asset(s) on which
 // the Allocation depends.
 func (as *AllocationSet) Reconcile(assetSet *AssetSet) error {

Разница между файлами не показана из-за своего большого размера
+ 937 - 607
pkg/kubecost/allocation_test.go


+ 6 - 6
pkg/kubecost/kubecost_codecs_test.go

@@ -25,9 +25,9 @@ func BenchmarkAllocationSetRange_BinaryEncoding(b *testing.B) {
 	var err error
 
 	asr0 = NewAllocationSetRange(
-		generateAllocationSet(startD0),
-		generateAllocationSet(startD1),
-		generateAllocationSet(startD2),
+		generateAllocationSetClusterIdle(startD0),
+		generateAllocationSetClusterIdle(startD1),
+		generateAllocationSetClusterIdle(startD2),
 	)
 
 	for it := 0; it < b.N; it++ {
@@ -90,9 +90,9 @@ func TestAllocationSetRange_BinaryEncoding(t *testing.T) {
 	var err error
 
 	asr0 = NewAllocationSetRange(
-		generateAllocationSet(startD0),
-		generateAllocationSet(startD1),
-		generateAllocationSet(startD2),
+		generateAllocationSetClusterIdle(startD0),
+		generateAllocationSetClusterIdle(startD1),
+		generateAllocationSetClusterIdle(startD2),
 	)
 
 	bs, err = asr0.MarshalBinary()

Некоторые файлы не были показаны из-за большого количества измененных файлов