Browse Source

Update allocations to use DataSource

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt 1 year ago
parent
commit
dcf7fd823d

+ 56 - 0
core/pkg/source/datasource.go

@@ -118,12 +118,68 @@ type ClusterMetricsQuerier interface {
 	QueryClusterNodesByProvider(provider string, start, end time.Time, step time.Duration) QueryResultsChan
 }
 
+type AllocationMetricsQuerier interface {
+	QueryPods(start, end time.Time) QueryResultsChan
+	QueryPodsUID(start, end time.Time) QueryResultsChan
+	QueryRAMBytesAllocated(start, end time.Time) QueryResultsChan
+	QueryRAMRequests(start, end time.Time) QueryResultsChan
+	QueryRAMUsageAvg(start, end time.Time) QueryResultsChan
+	QueryRAMUsageMax(start, end time.Time) QueryResultsChan
+	QueryCPUCoresAllocated(start, end time.Time) QueryResultsChan
+	QueryCPURequests(start, end time.Time) QueryResultsChan
+	QueryCPUUsageAvg(start, end time.Time) QueryResultsChan
+	QueryCPUUsageMax(start, end time.Time) QueryResultsChan
+	QueryGPUsRequested(start, end time.Time) QueryResultsChan
+	QueryGPUsUsageAvg(start, end time.Time) QueryResultsChan
+	QueryGPUsUsageMax(start, end time.Time) QueryResultsChan
+	QueryGPUsAllocated(start, end time.Time) QueryResultsChan
+	QueryNodeCostPerCPUHr(start, end time.Time) QueryResultsChan
+	QueryNodeCostPerRAMGiBHr(start, end time.Time) QueryResultsChan
+	QueryNodeCostPerGPUHr(start, end time.Time) QueryResultsChan
+	QueryNodeIsSpot2(start, end time.Time) QueryResultsChan
+	QueryPVCInfo2(start, end time.Time) QueryResultsChan
+	QueryPodPVCAllocation(start, end time.Time) QueryResultsChan
+	QueryPVCBytesRequested(start, end time.Time) QueryResultsChan
+	QueryPVActiveMins(start, end time.Time) QueryResultsChan
+	QueryPVBytes(start, end time.Time) QueryResultsChan
+	QueryPVCostPerGiBHour(start, end time.Time) QueryResultsChan
+	QueryPVMeta(start, end time.Time) QueryResultsChan
+	QueryNetZoneGiB(start, end time.Time) QueryResultsChan
+	QueryNetZoneCostPerGiB(start, end time.Time) QueryResultsChan
+	QueryNetRegionGiB(start, end time.Time) QueryResultsChan
+	QueryNetRegionCostPerGiB(start, end time.Time) QueryResultsChan
+	QueryNetInternetGiB(start, end time.Time) QueryResultsChan
+	QueryNetInternetCostPerGiB(start, end time.Time) QueryResultsChan
+	QueryNetReceiveBytes(start, end time.Time) QueryResultsChan
+	QueryNetTransferBytes(start, end time.Time) QueryResultsChan
+	QueryNodeLabels(start, end time.Time) QueryResultsChan
+	QueryNamespaceLabels(start, end time.Time) QueryResultsChan
+	QueryNamespaceAnnotations(start, end time.Time) QueryResultsChan
+	QueryPodLabels(start, end time.Time) QueryResultsChan
+	QueryPodAnnotations(start, end time.Time) QueryResultsChan
+	QueryServiceLabels(start, end time.Time) QueryResultsChan
+	QueryDeploymentLabels(start, end time.Time) QueryResultsChan
+	QueryStatefulSetLabels(start, end time.Time) QueryResultsChan
+	QueryDaemonSetLabels(start, end time.Time) QueryResultsChan
+	QueryJobLabels(start, end time.Time) QueryResultsChan
+	QueryPodsWithReplicaSetOwner(start, end time.Time) QueryResultsChan
+	QueryReplicaSetsWithoutOwners(start, end time.Time) QueryResultsChan
+	QueryReplicaSetsWithRollout(start, end time.Time) QueryResultsChan
+	QueryLBCostPerHr(start, end time.Time) QueryResultsChan
+	QueryLBActiveMins(start, end time.Time) QueryResultsChan
+	QueryDataCoverage(limitDays int) (time.Time, time.Time, error)
+	QueryIsGPUShared(start, end time.Time) QueryResultsChan
+	QueryGetGPUInfo(start, end time.Time) QueryResultsChan
+}
+
 type OpenCostDataSource interface {
 	InstantMetricsQuerier
 	RangeMetricsQuerier
 	ClusterMetricsQuerier
+	AllocationMetricsQuerier
 
 	RegisterEndPoints(router *httprouter.Router)
 
 	BatchDuration() time.Duration
+	Resolution() time.Duration
 }

+ 929 - 19
modules/prometheus-source/pkg/prom/datasource.go

@@ -228,7 +228,7 @@ func NewPrometheusDataSource(promConfig *OpenCostPrometheusConfig, thanosConfig
 				thanosClient = thanosCli
 			}
 
-			thanosContexts = NewContextFactory(thanosClient, thanosContexts.config)
+			thanosContexts = NewContextFactory(thanosClient, thanosConfig.OpenCostPrometheusConfig)
 		} else {
 			log.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
 		}
@@ -264,15 +264,17 @@ func (pds *PrometheusDataSource) prometheusRecordingRules(w http.ResponseWriter,
 
 	req, err := http.NewRequest(http.MethodGet, u.String(), nil)
 	if err != nil {
-		fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
+		fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
+		return
 	}
 
 	_, body, err := pds.promClient.Do(r.Context(), req)
 	if err != nil {
-		fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
-	} else {
-		w.Write(body)
+		fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
+		return
 	}
+
+	w.Write(body)
 }
 
 // prometheusConfig returns the current configuration of the prometheus server
@@ -301,15 +303,17 @@ func (pds *PrometheusDataSource) prometheusTargets(w http.ResponseWriter, r *htt
 
 	req, err := http.NewRequest(http.MethodGet, u.String(), nil)
 	if err != nil {
-		fmt.Fprintf(w, "Error creating Prometheus rule request: "+err.Error())
+		fmt.Fprintf(w, "error creating Prometheus rule request: %s", err)
+		return
 	}
 
 	_, body, err := pds.promClient.Do(r.Context(), req)
 	if err != nil {
-		fmt.Fprintf(w, "Error making Prometheus rule request: "+err.Error())
-	} else {
-		w.Write(body)
+		fmt.Fprintf(w, "error making Prometheus rule request: %s", err)
+		return
 	}
+
+	w.Write(body)
 }
 
 // status returns the status of the prometheus client
@@ -322,9 +326,9 @@ func (pds *PrometheusDataSource) status(w http.ResponseWriter, r *http.Request,
 	api := prometheusAPI.NewAPI(pds.promClient)
 	result, err := api.Buildinfo(r.Context())
 	if err != nil {
-		fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
+		fmt.Fprintf(w, "Using Prometheus at %s, Error: %s", promServer, err)
 	} else {
-		fmt.Fprintf(w, "Using Prometheus at "+promServer+". Version: "+result.Version)
+		fmt.Fprintf(w, "Using Prometheus at %s, version: %s", promServer, result.Version)
 	}
 }
 
@@ -379,7 +383,7 @@ func (pds *PrometheusDataSource) prometheusQueryRange(w http.ResponseWriter, r *
 
 	start, end, duration, err := toStartEndStep(qp)
 	if err != nil {
-		fmt.Fprintf(w, err.Error())
+		fmt.Fprintf(w, "error: %s", err)
 		return
 	}
 
@@ -455,7 +459,7 @@ func (pds *PrometheusDataSource) thanosQueryRange(w http.ResponseWriter, r *http
 
 	start, end, duration, err := toStartEndStep(qp)
 	if err != nil {
-		fmt.Fprintf(w, err.Error())
+		fmt.Fprintf(w, "error: %s", err)
 		return
 	}
 
@@ -542,6 +546,10 @@ func (pds *PrometheusDataSource) BatchDuration() time.Duration {
 	return pds.promConfig.MaxQueryDuration
 }
 
+func (pds *PrometheusDataSource) Resolution() time.Duration {
+	return pds.promConfig.DataResolution
+}
+
 func (pds *PrometheusDataSource) QueryRAMUsage(window string, offset string) source.QueryResultsChan {
 	const ramUsageQuery = `avg(
 		label_replace(
@@ -1966,12 +1974,914 @@ func (pds *PrometheusDataSource) QueryClusterNodesByProvider(provider string, st
 	return ctx.QueryRange(clusterNodesCostQuery, start, end, step)
 }
 
-func newEmptyResult() source.QueryResultsChan {
-	ch := make(source.QueryResultsChan)
-	go func() {
-		results := source.NewQueryResults("")
-		ch <- results
-	}()
+// AllocationMetricQuerier
+
+func (pds *PrometheusDataSource) QueryPods(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPods = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
+	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
+
+	cfg := pds.promConfig
+	resolution := cfg.DataResolution
+	resStr := timeutil.DurationString(resolution)
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPods")
+	}
+
+	queryPods := fmt.Sprintf(queryFmtPods, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPods, end)
+}
+
+func (pds *PrometheusDataSource) QueryPodsUID(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
+	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
+
+	cfg := pds.promConfig
+	resolution := cfg.DataResolution
+	resStr := timeutil.DurationString(resolution)
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPodsUID")
+	}
+
+	queryPodsUID := fmt.Sprintf(queryFmtPodsUID, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPodsUID, end)
+}
+
+func (pds *PrometheusDataSource) QueryRAMBytesAllocated(start, end time.Time) source.QueryResultsChan {
+	const queryFmtRAMBytesAllocated = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s, provider_id)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryRAMBytesAllocated")
+	}
+
+	queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryRAMBytesAllocated, end)
+}
+
+func (pds *PrometheusDataSource) QueryRAMRequests(start, end time.Time) source.QueryResultsChan {
+	const queryFmtRAMRequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryRAMRequests")
+	}
+
+	queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryRAMRequests, end)
+}
+
+func (pds *PrometheusDataSource) QueryRAMUsageAvg(start, end time.Time) source.QueryResultsChan {
+	const queryFmtRAMUsageAvg = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryRAMUsageAvg")
+	}
+
+	queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryRAMUsageAvg, end)
+}
+
+func (pds *PrometheusDataSource) QueryRAMUsageMax(start, end time.Time) source.QueryResultsChan {
+	const queryFmtRAMUsageMax = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryRAMUsageMax")
+	}
+
+	queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryRAMUsageMax, end)
+}
+
+func (pds *PrometheusDataSource) QueryCPUCoresAllocated(start, end time.Time) source.QueryResultsChan {
+	const queryFmtCPUCoresAllocated = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryCPUCoresAllocated")
+	}
+
+	queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryCPUCoresAllocated, end)
+}
+
+func (pds *PrometheusDataSource) QueryCPURequests(start, end time.Time) source.QueryResultsChan {
+	const queryFmtCPURequests = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryCPURequests")
+	}
+
+	queryCPURequests := fmt.Sprintf(queryFmtCPURequests, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryCPURequests, end)
+}
+
+func (pds *PrometheusDataSource) QueryCPUUsageAvg(start, end time.Time) source.QueryResultsChan {
+	const queryFmtCPUUsageAvg = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryCPUUsageAvg")
+	}
+
+	queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryCPUUsageAvg, end)
+}
+
+func (pds *PrometheusDataSource) QueryCPUUsageMax(start, end time.Time) source.QueryResultsChan {
+	// Because we use container_cpu_usage_seconds_total to calculate CPU usage
+	// at any given "instant" of time, we need to use an irate or rate. To then
+	// calculate a max (or any aggregation) we have to perform an aggregation
+	// query on top of an instant-by-instant maximum. Prometheus supports this
+	// type of query with a "subquery" [1], however it is reportedly expensive
+	// to make such a query. By default, Kubecost's Prometheus config includes
+	// a recording rule that keeps track of the instant-by-instant irate for CPU
+	// usage. The metric in this query is created by that recording rule.
+	//
+	// [1] https://prometheus.io/blog/2019/01/28/subquery-support/
+	//
+	// If changing the name of the recording rule, make sure to update the
+	// corresponding diagnostic query to avoid confusion.
+	const queryFmtCPUUsageMaxRecordingRule = `max(max_over_time(kubecost_container_cpu_usage_irate{%s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	// This is the subquery equivalent of the above recording rule query. It is
+	// more expensive, but does not require the recording rule. It should be
+	// used as a fallback query if the recording rule data does not exist.
+	//
+	// The parameter after the colon [:<thisone>] in the subquery affects the
+	// resolution of the subquery.
+	// The parameter after the metric ...{}[<thisone>] should be set to 2x
+	// the resolution, to make sure the irate always has two points to query
+	// in case the Prom scrape duration has been reduced to be equal to the
+	// ETL resolution.
+	const queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%s])[%s:%s])) by (container, pod_name, pod, namespace, instance, %s)`
+	// env.GetPromClusterFilter(), doubleResStr, durStr, resStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryCPUUsageMax")
+	}
+
+	queryCPUUsageMaxRecordingRule := fmt.Sprintf(queryFmtCPUUsageMaxRecordingRule, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	resCPUUsageMaxRR := ctx.QueryAtTime(queryCPUUsageMaxRecordingRule, end)
+	resCPUUsageMax, _ := resCPUUsageMaxRR.Await()
+
+	if len(resCPUUsageMax) > 0 {
+		return wrapResults(queryCPUUsageMaxRecordingRule, resCPUUsageMax)
+	}
+
+	resolution := cfg.DataResolution
+	resStr := timeutil.DurationString(resolution)
+	doubleResStr := timeutil.DurationString(2 * resolution)
+
+	queryCPUUsageMaxSubquery := fmt.Sprintf(queryFmtCPUUsageMaxSubquery, cfg.ClusterFilter, doubleResStr, durStr, resStr, cfg.ClusterLabel)
+	return ctx.QueryAtTime(queryCPUUsageMaxSubquery, end)
+}
+
+func (pds *PrometheusDataSource) QueryGPUsRequested(start, end time.Time) source.QueryResultsChan {
+	const queryFmtGPUsRequested = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryGPUsRequested")
+	}
+
+	queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryGPUsRequested, end)
+}
+
+func (pds *PrometheusDataSource) QueryGPUsUsageAvg(start, end time.Time) source.QueryResultsChan {
+	const queryFmtGPUsUsageAvg = `avg(avg_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
+	// durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryGPUsUsageAvg")
+	}
+
+	queryGPUsUsageAvg := fmt.Sprintf(queryFmtGPUsUsageAvg, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryGPUsUsageAvg, end)
+}
+
+func (pds *PrometheusDataSource) QueryGPUsUsageMax(start, end time.Time) source.QueryResultsChan {
+	const queryFmtGPUsUsageMax = `max(max_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
+	// durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryGPUsUsageMax")
+	}
+
+	queryGPUsUsageMax := fmt.Sprintf(queryFmtGPUsUsageMax, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryGPUsUsageMax, end)
+}
+
+func (pds *PrometheusDataSource) QueryGPUsAllocated(start, end time.Time) source.QueryResultsChan {
+	const queryFmtGPUsAllocated = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryGPUsAllocated")
+	}
+
+	queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryGPUsAllocated, end)
+}
+
+func (pds *PrometheusDataSource) QueryNodeCostPerCPUHr(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNodeCostPerCPUHr = `avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNodeCostPerCPUHr")
+	}
+
+	queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
+}
+
+func (pds *PrometheusDataSource) QueryNodeCostPerRAMGiBHr(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNodeCostPerRAMGiBHr = `avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNodeCostPerRAMGiBHr")
+	}
+
+	queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
+}
+
+func (pds *PrometheusDataSource) QueryNodeCostPerGPUHr(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNodeCostPerGPUHr = `avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNodeCostPerGPUHr")
+	}
+
+	queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
+}
+
+func (pds *PrometheusDataSource) QueryNodeIsSpot2(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNodeIsSpot = `avg_over_time(kubecost_node_is_spot{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr)
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNodeIsSpot2")
+	}
+
+	queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNodeIsSpot, end)
+}
+
+func (pds *PrometheusDataSource) QueryPVCInfo2(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
+	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
+
+	cfg := pds.promConfig
+	resolution := cfg.DataResolution
+	resStr := timeutil.DurationString(resolution)
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPVCInfo2")
+	}
+
+	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPVCInfo, end)
+}
+
+func (pds *PrometheusDataSource) QueryPodPVCAllocation(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPodPVCAllocation = `avg(avg_over_time(pod_pvc_allocation{%s}[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPodPVCAllocation")
+	}
+
+	queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPodPVCAllocation, end)
+}
+
+func (pds *PrometheusDataSource) QueryPVCBytesRequested(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPVCBytesRequested = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{%s}[%s])) by (persistentvolumeclaim, namespace, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPVCBytesRequested")
+	}
+
+	queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPVCBytesRequested, end)
+}
+
+func (pds *PrometheusDataSource) QueryPVActiveMins(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPVActiveMins = `count(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%s]`
+	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
+
+	cfg := pds.promConfig
+	resolution := cfg.DataResolution
+	resStr := timeutil.DurationString(resolution)
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPVActiveMins")
+	}
+
+	queryPVActiveMins := fmt.Sprintf(queryFmtPVActiveMins, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPVActiveMins, end)
+}
+
+func (pds *PrometheusDataSource) QueryPVBytes(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPVBytes = `avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (persistentvolume, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPVBytes")
+	}
+
+	queryPVBytes := fmt.Sprintf(queryFmtPVBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPVBytes, end)
+}
+
+func (pds *PrometheusDataSource) QueryPVCostPerGiBHour(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPVCostPerGiBHour = `avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (volumename, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPVCostPerGiBHour")
+	}
+
+	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPVCostPerGiBHour, end)
+}
+
+func (pds *PrometheusDataSource) QueryPVMeta(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPVMeta = `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, provider_id)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPVMeta")
+	}
+
+	queryPVMeta := fmt.Sprintf(queryFmtPVMeta, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPVMeta, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetZoneGiB(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetZoneGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetZoneGiB")
+	}
+
+	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetZoneGiB, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetZoneCostPerGiB(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetZoneCostPerGiB = `avg(avg_over_time(kubecost_network_zone_egress_cost{%s}[%s])) by (%s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetZoneCostPerGiB")
+	}
+
+	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetRegionGiB(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetRegionGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetRegionGiB")
+	}
+
+	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetRegionGiB, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetRegionCostPerGiB(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetRegionCostPerGiB = `avg(avg_over_time(kubecost_network_region_egress_cost{%s}[%s])) by (%s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetRegionCostPerGiB")
+	}
+
+	queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetInternetGiB(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetInternetGiB = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetInternetGiB")
+	}
+
+	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetInternetGiB, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetInternetCostPerGiB(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetInternetCostPerGiB = `avg(avg_over_time(kubecost_network_internet_egress_cost{%s}[%s])) by (%s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel()
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetInternetCostPerGiB")
+	}
+
+	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetReceiveBytes(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetReceiveBytes = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetReceiveBytes")
+	}
+
+	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetReceiveBytes, end)
+}
+
+func (pds *PrometheusDataSource) QueryNetTransferBytes(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNetTransferBytes = `sum(increase(container_network_transmit_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNetTransferBytes")
+	}
+
+	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNetTransferBytes, end)
+}
+
+func (pds *PrometheusDataSource) QueryNodeLabels2(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNodeLabels = `avg_over_time(kube_node_labels{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNodeLabels2")
+	}
+
+	queryNodeLabels := fmt.Sprintf(queryFmtNodeLabels, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNodeLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryNamespaceLabels(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNamespaceLabels = `avg_over_time(kube_namespace_labels{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNamespaceLabels")
+	}
+
+	queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNamespaceLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryNamespaceAnnotations(start, end time.Time) source.QueryResultsChan {
+	const queryFmtNamespaceAnnotations = `avg_over_time(kube_namespace_annotations{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNamespaceAnnotations")
+	}
+
+	queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryNamespaceAnnotations, end)
+}
+
+func (pds *PrometheusDataSource) QueryPodLabels(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPodLabels = `avg_over_time(kube_pod_labels{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPodLabels")
+	}
+
+	queryPodLabels := fmt.Sprintf(queryFmtPodLabels, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPodLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryPodAnnotations(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPodAnnotations = `avg_over_time(kube_pod_annotations{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPodAnnotations")
+	}
+
+	queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPodAnnotations, end)
+}
+
+func (pds *PrometheusDataSource) QueryServiceLabels(start, end time.Time) source.QueryResultsChan {
+	const queryFmtServiceLabels = `avg_over_time(service_selector_labels{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryServiceLabels")
+	}
+
+	queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryServiceLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryDeploymentLabels(start, end time.Time) source.QueryResultsChan {
+	const queryFmtDeploymentLabels = `avg_over_time(deployment_match_labels{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryNamespaceAnnotations")
+	}
+
+	queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryDeploymentLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryStatefulSetLabels(start, end time.Time) source.QueryResultsChan {
+	const queryFmtStatefulSetLabels = `avg_over_time(statefulSet_match_labels{%s}[%s])`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryStatefulSetLabels")
+	}
+
+	queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryStatefulSetLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryDaemonSetLabels(start, end time.Time) source.QueryResultsChan {
+	const queryFmtDaemonSetLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet", %s}[%s])) by (pod, owner_name, namespace, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryDaemonSetLabels")
+	}
+
+	queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryDaemonSetLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryJobLabels(start, end time.Time) source.QueryResultsChan {
+	const queryFmtJobLabels = `sum(avg_over_time(kube_pod_owner{owner_kind="Job", %s}[%s])) by (pod, owner_name, namespace ,%s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryJobLabels")
+	}
+
+	queryJobLabels := fmt.Sprintf(queryFmtJobLabels, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryJobLabels, end)
+}
+
+func (pds *PrometheusDataSource) QueryPodsWithReplicaSetOwner(start, end time.Time) source.QueryResultsChan {
+	const queryFmtPodsWithReplicaSetOwner = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet", %s}[%s])) by (pod, owner_name, namespace ,%s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryPodsWithReplicaSetOwner")
+	}
+
+	queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
+}
+
+func (pds *PrometheusDataSource) QueryReplicaSetsWithoutOwners(start, end time.Time) source.QueryResultsChan {
+	const queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>", %s}[%s])) by (replicaset, namespace, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryReplicaSetsWithoutOwners")
+	}
+
+	queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
+}
+
+func (pds *PrometheusDataSource) QueryReplicaSetsWithRollout(start, end time.Time) source.QueryResultsChan {
+	const queryFmtReplicaSetsWithRolloutOwner = `avg(avg_over_time(kube_replicaset_owner{owner_kind="Rollout", %s}[%s])) by (replicaset, namespace, owner_kind, owner_name, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryReplicaSetsWithRollout")
+	}
+
+	queryReplicaSetsWithRolloutOwner := fmt.Sprintf(queryFmtReplicaSetsWithRolloutOwner, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryReplicaSetsWithRolloutOwner, end)
+}
+
+func (pds *PrometheusDataSource) QueryLBCostPerHr(start, end time.Time) source.QueryResultsChan {
+	const queryFmtLBCostPerHr = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, ingress_ip, %s)`
+	// env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryLBCostPerHr")
+	}
+
+	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, cfg.ClusterFilter, durStr, cfg.ClusterLabel)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryLBCostPerHr, end)
+}
+
+func (pds *PrometheusDataSource) QueryLBActiveMins(start, end time.Time) source.QueryResultsChan {
+	const queryFmtLBActiveMins = `count(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s)[%s:%s]`
+	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
+
+	cfg := pds.promConfig
+	resolution := cfg.DataResolution
+	resStr := timeutil.DurationString(resolution)
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryLBActiveMins")
+	}
+
+	queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryLBActiveMins, end)
+}
+
+func (pds *PrometheusDataSource) QueryDataCoverage(limitDays int) (time.Time, time.Time, error) {
+	const (
+		queryFmtOldestSample = `min_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
+		queryFmtNewestSample = `max_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
+	)
+
+	cfg := pds.promConfig
+	now := time.Now()
+	durStr := fmt.Sprintf("%dd", limitDays)
+
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	queryOldest := fmt.Sprintf(queryFmtOldestSample, cfg.ClusterFilter, durStr, "1h")
+	resOldestFut := ctx.QueryAtTime(queryOldest, now)
+
+	resOldest, err := resOldestFut.Await()
+	if err != nil {
+		return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: %w", err)
+	}
+	if len(resOldest) == 0 || len(resOldest[0].Values) == 0 {
+		return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: %w", err)
+	}
+
+	oldest := time.Unix(int64(resOldest[0].Values[0].Value), 0)
+
+	queryNewest := fmt.Sprintf(queryFmtNewestSample, cfg.ClusterFilter, durStr, "1h")
+	resNewestFut := ctx.QueryAtTime(queryNewest, now)
+
+	resNewest, err := resNewestFut.Await()
+	if err != nil {
+		return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: %w", err)
+	}
+	if len(resNewest) == 0 || len(resNewest[0].Values) == 0 {
+		return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: %w", err)
+	}
+
+	newest := time.Unix(int64(resNewest[0].Values[0].Value), 0)
+
+	return oldest, newest, nil
+}
+
+func (pds *PrometheusDataSource) QueryIsGPUShared(start, end time.Time) source.QueryResultsChan {
+	const queryFmtIsGPUShared = `avg(avg_over_time(kube_pod_container_resource_requests{container!="", node != "", pod != "", container!= "", unit = "integer",  %s}[%s])) by (container, pod, namespace, node, resource)`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryIsGPUShared")
+	}
+
+	queryIsGPUShared := fmt.Sprintf(queryFmtIsGPUShared, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryIsGPUShared, end)
+}
+
+func (pds *PrometheusDataSource) QueryGetGPUInfo(start, end time.Time) source.QueryResultsChan {
+	const queryFmtGetGPUInfo = `avg(avg_over_time(DCGM_FI_DEV_DEC_UTIL{container!="",%s}[%s])) by (container, pod, namespace, device, modelName, UUID)`
+	// env.GetPromClusterFilter(), durStr
+
+	cfg := pds.promConfig
+
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryGetGPUInfo")
+	}
+
+	queryGetGPUInfo := fmt.Sprintf(queryFmtGetGPUInfo, cfg.ClusterFilter, durStr)
+	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
+	return ctx.QueryAtTime(queryGetGPUInfo, end)
+}
+
+func newEmptyResult() source.QueryResultsChan {
+	ch := make(source.QueryResultsChan)
+	go func() {
+		results := source.NewQueryResults("")
+		ch <- results
+	}()
+	return ch
+}
+
+func wrapResults(query string, results []*source.QueryResult) source.QueryResultsChan {
+	ch := make(source.QueryResultsChan)
+
+	go func() {
+		r := source.NewQueryResults(query)
+		r.Results = results
+		ch <- r
+	}()
+
 	return ch
 }
 

+ 2 - 2
modules/prometheus-source/pkg/prom/diagnostics.go

@@ -213,7 +213,7 @@ func GetPrometheusMetrics(client prometheus.Client, config *OpenCostPrometheusCo
 
 		// log the errror, append to results anyways, and continue
 		if err != nil {
-			log.Errorf(err.Error())
+			log.Errorf("error: %s", err.Error())
 		}
 		result = append(result, pd)
 	}
@@ -233,7 +233,7 @@ func GetPrometheusMetricsByID(ids []string, client prometheus.Client, config *Op
 
 			// log the errror, append to results anyways, and continue
 			if err != nil {
-				log.Errorf(err.Error())
+				log.Errorf("error: %s", err.Error())
 			}
 			result = append(result, pd)
 		} else {

+ 1 - 3
modules/prometheus-source/pkg/prom/query_test.go

@@ -10,9 +10,7 @@ import (
 )
 
 func TestWarningsFrom(t *testing.T) {
-	var results interface{}
-
-	results = map[string]interface{}{
+	var results any = map[string]interface{}{
 		"status": "success",
 		"warnings": []string{
 			"Warning #1",

+ 1 - 1
modules/prometheus-source/pkg/prom/result.go

@@ -89,7 +89,7 @@ func NewQueryResults(query string, queryResult interface{}, resultKeys *source.R
 			qrs.Error = err
 			return qrs
 		}
-		qrs.Error = fmt.Errorf(e)
+		qrs.Error = fmt.Errorf("%s", e)
 		return qrs
 	}
 

+ 1 - 1
pkg/cloud/gcp/provider.go

@@ -959,7 +959,7 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]models.Key, pvKeys m
 		if t == "nextPageToken" {
 			pageToken, err := dec.Token()
 			if err != nil {
-				log.Errorf("Error parsing nextpage token: " + err.Error())
+				log.Errorf("Error parsing nextpage token: %s", err)
 				return nil, "", err
 			}
 			if pageToken.(string) != "" {

+ 63 - 267
pkg/costmodel/allocation.go

@@ -10,92 +10,6 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/pkg/env"
-	"github.com/opencost/opencost/pkg/prom"
-)
-
-const (
-	// https://kubecost.atlassian.net/browse/BURNDOWN-234
-	// upstream KSM has implementation change vs OC internal KSM - it sets metric to 0 when pod goes down
-	// VS OC implementation which stops emitting it
-	// by adding != 0 filter, we keep just the active times in the prom result
-	queryFmtPods    = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
-	queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
-
-	queryFmtRAMBytesAllocated           = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s, provider_id)`
-	queryFmtRAMRequests                 = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtRAMUsageAvg                 = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtRAMUsageMax                 = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtCPUCoresAllocated           = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtCPURequests                 = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtCPUUsageAvg                 = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD", %s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtGPUsRequested               = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtGPUsUsageAvg                = `avg(avg_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
-	queryFmtGPUsUsageMax                = `max(max_over_time(DCGM_FI_PROF_GR_ENGINE_ACTIVE{container!=""}[%s])) by (container, pod, namespace, %s)`
-	queryFmtGPUsAllocated               = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`
-	queryFmtNodeCostPerCPUHr            = `avg(avg_over_time(node_cpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeCostPerRAMGiBHr         = `avg(avg_over_time(node_ram_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeCostPerGPUHr            = `avg(avg_over_time(node_gpu_hourly_cost{%s}[%s])) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeIsSpot                  = `avg_over_time(kubecost_node_is_spot{%s}[%s])`
-	queryFmtPVCInfo                     = `avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
-	queryFmtPodPVCAllocation            = `avg(avg_over_time(pod_pvc_allocation{%s}[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
-	queryFmtPVCBytesRequested           = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{%s}[%s])) by (persistentvolumeclaim, namespace, %s)`
-	queryFmtPVActiveMins                = `count(kube_persistentvolume_capacity_bytes{%s}) by (persistentvolume, %s)[%s:%s]`
-	queryFmtPVBytes                     = `avg(avg_over_time(kube_persistentvolume_capacity_bytes{%s}[%s])) by (persistentvolume, %s)`
-	queryFmtPVCostPerGiBHour            = `avg(avg_over_time(pv_hourly_cost{%s}[%s])) by (volumename, %s)`
-	queryFmtPVMeta                      = `avg(avg_over_time(kubecost_pv_info{%s}[%s])) by (%s, persistentvolume, provider_id)`
-	queryFmtNetZoneGiB                  = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetZoneCostPerGiB           = `avg(avg_over_time(kubecost_network_zone_egress_cost{%s}[%s])) by (%s)`
-	queryFmtNetRegionGiB                = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", same_zone="false", same_region="false", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetRegionCostPerGiB         = `avg(avg_over_time(kubecost_network_region_egress_cost{%s}[%s])) by (%s)`
-	queryFmtNetInternetGiB              = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetInternetCostPerGiB       = `avg(avg_over_time(kubecost_network_internet_egress_cost{%s}[%s])) by (%s)`
-	queryFmtNetReceiveBytes             = `sum(increase(container_network_receive_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
-	queryFmtNetTransferBytes            = `sum(increase(container_network_transmit_bytes_total{pod!="", %s}[%s])) by (pod_name, pod, namespace, %s)`
-	queryFmtNodeLabels                  = `avg_over_time(kube_node_labels{%s}[%s])`
-	queryFmtNamespaceLabels             = `avg_over_time(kube_namespace_labels{%s}[%s])`
-	queryFmtNamespaceAnnotations        = `avg_over_time(kube_namespace_annotations{%s}[%s])`
-	queryFmtPodLabels                   = `avg_over_time(kube_pod_labels{%s}[%s])`
-	queryFmtPodAnnotations              = `avg_over_time(kube_pod_annotations{%s}[%s])`
-	queryFmtServiceLabels               = `avg_over_time(service_selector_labels{%s}[%s])`
-	queryFmtDeploymentLabels            = `avg_over_time(deployment_match_labels{%s}[%s])`
-	queryFmtStatefulSetLabels           = `avg_over_time(statefulSet_match_labels{%s}[%s])`
-	queryFmtDaemonSetLabels             = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet", %s}[%s])) by (pod, owner_name, namespace, %s)`
-	queryFmtJobLabels                   = `sum(avg_over_time(kube_pod_owner{owner_kind="Job", %s}[%s])) by (pod, owner_name, namespace ,%s)`
-	queryFmtPodsWithReplicaSetOwner     = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet", %s}[%s])) by (pod, owner_name, namespace ,%s)`
-	queryFmtReplicaSetsWithoutOwners    = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>", %s}[%s])) by (replicaset, namespace, %s)`
-	queryFmtReplicaSetsWithRolloutOwner = `avg(avg_over_time(kube_replicaset_owner{owner_kind="Rollout", %s}[%s])) by (replicaset, namespace, owner_kind, owner_name, %s)`
-	queryFmtLBCostPerHr                 = `avg(avg_over_time(kubecost_load_balancer_cost{%s}[%s])) by (namespace, service_name, ingress_ip, %s)`
-	queryFmtLBActiveMins                = `count(kubecost_load_balancer_cost{%s}) by (namespace, service_name, %s)[%s:%s]`
-	queryFmtOldestSample                = `min_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
-	queryFmtNewestSample                = `max_over_time(timestamp(group(node_cpu_hourly_cost{%s}))[%s:%s])`
-	queryFmtIsGPuShared                 = `avg(avg_over_time(kube_pod_container_resource_requests{container!="", node != "", pod != "", container!= "", unit = "integer",  %s}[%s])) by (container, pod, namespace, node, resource)`
-	queryFmtGetGPuInfo                  = `avg(avg_over_time(DCGM_FI_DEV_DEC_UTIL{container!="",%s}[%s])) by (container, pod, namespace, device, modelName, UUID)`
-
-	// Because we use container_cpu_usage_seconds_total to calculate CPU usage
-	// at any given "instant" of time, we need to use an irate or rate. To then
-	// calculate a max (or any aggregation) we have to perform an aggregation
-	// query on top of an instant-by-instant maximum. Prometheus supports this
-	// type of query with a "subquery" [1], however it is reportedly expensive
-	// to make such a query. By default, Kubecost's Prometheus config includes
-	// a recording rule that keeps track of the instant-by-instant irate for CPU
-	// usage. The metric in this query is created by that recording rule.
-	//
-	// [1] https://prometheus.io/blog/2019/01/28/subquery-support/
-	//
-	// If changing the name of the recording rule, make sure to update the
-	// corresponding diagnostic query to avoid confusion.
-	queryFmtCPUUsageMaxRecordingRule = `max(max_over_time(kubecost_container_cpu_usage_irate{%s}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	// This is the subquery equivalent of the above recording rule query. It is
-	// more expensive, but does not require the recording rule. It should be
-	// used as a fallback query if the recording rule data does not exist.
-	//
-	// The parameter after the colon [:<thisone>] in the subquery affects the
-	// resolution of the subquery.
-	// The parameter after the metric ...{}[<thisone>] should be set to 2x
-	// the resolution, to make sure the irate always has two points to query
-	// in case the Prom scrape duration has been reduced to be equal to the
-	// ETL resolution.
-	queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%s])[%s:%s])) by (container, pod_name, pod, namespace, instance, %s)`
 )
 
 // Constants for Network Cost Subtype
@@ -305,29 +219,8 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 
 // DateRange checks the data (up to 90 days in the past), and returns the oldest and newest sample timestamp from opencost scraping metric
 // it supposed to be a good indicator of available allocation data
-func (cm *CostModel) DateRange() (time.Time, time.Time, error) {
-	ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
-	exportCsvDaysFmt := fmt.Sprintf("%dd", env.GetExportCSVMaxDays())
-
-	resOldest, _, err := ctx.QuerySync(fmt.Sprintf(queryFmtOldestSample, env.GetPromClusterFilter(), exportCsvDaysFmt, "1h"))
-	if err != nil {
-		return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: %w", err)
-	}
-	if len(resOldest) == 0 || len(resOldest[0].Values) == 0 {
-		return time.Time{}, time.Time{}, fmt.Errorf("querying oldest sample: no results")
-	}
-	oldest := time.Unix(int64(resOldest[0].Values[0].Value), 0)
-
-	resNewest, _, err := ctx.QuerySync(fmt.Sprintf(queryFmtNewestSample, env.GetPromClusterFilter(), exportCsvDaysFmt, "1h"))
-	if err != nil {
-		return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: %w", err)
-	}
-	if len(resNewest) == 0 || len(resNewest[0].Values) == 0 {
-		return time.Time{}, time.Time{}, fmt.Errorf("querying newest sample: no results")
-	}
-	newest := time.Unix(int64(resNewest[0].Values[0].Value), 0)
-
-	return oldest, newest, nil
+func (cm *CostModel) DateRange(limitDays int) (time.Time, time.Time, error) {
+	return cm.DataSource.QueryDataCoverage(limitDays)
 }
 
 func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, map[nodeKey]*nodePricing, error) {
@@ -378,7 +271,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	}
 
 	// TODO:CLEANUP remove "max batch" idea and clusterStart/End
-	err := cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
+	err := cm.buildPodMap(window, cm.BatchDuration, podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
 	if err != nil {
 		log.Errorf("CostModel.ComputeAllocation: failed to build pod map: %s", err.Error())
 	}
@@ -390,178 +283,83 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 		return allocSet, nil, fmt.Errorf("illegal duration value for %s", opencost.NewClosedWindow(start, end))
 	}
 
-	// Convert resolution duration to a query-ready string
-	resStr := timeutil.DurationString(resolution)
+	grp := source.NewQueryGroup()
+	ds := cm.DataSource
 
-	ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
+	resChRAMBytesAllocated := grp.With(ds.QueryRAMBytesAllocated(start, end))
+	resChRAMRequests := grp.With(ds.QueryRAMRequests(start, end))
+	resChRAMUsageAvg := grp.With(ds.QueryRAMUsageAvg(start, end))
+	resChRAMUsageMax := grp.With(ds.QueryRAMUsageMax(start, end))
 
-	queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
-
-	queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
-
-	queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
-
-	queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
-
-	queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
-
-	queryCPURequests := fmt.Sprintf(queryFmtCPURequests, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
-
-	queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
-
-	queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMaxRecordingRule, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
+	resChCPUCoresAllocated := grp.With(ds.QueryCPUCoresAllocated(start, end))
+	resChCPURequests := grp.With(ds.QueryCPURequests(start, end))
+	resChCPUUsageAvg := grp.With(ds.QueryCPUUsageAvg(start, end))
+	resChCPUUsageMax := grp.With(ds.QueryCPUUsageMax(start, end))
 	resCPUUsageMax, _ := resChCPUUsageMax.Await()
-	// If the recording rule has no data, try to fall back to the subquery.
-	if len(resCPUUsageMax) == 0 {
-		// The parameter after the metric ...{}[<thisone>] should be set to 2x
-		// the resolution, to make sure the irate always has two points to query
-		// in case the Prom scrape duration has been reduced to be equal to the
-		// resolution.
-		doubleResStr := timeutil.DurationString(2 * resolution)
-		queryCPUUsageMax = fmt.Sprintf(queryFmtCPUUsageMaxSubquery, env.GetPromClusterFilter(), doubleResStr, durStr, resStr, env.GetPromClusterLabel())
-		resChCPUUsageMax = ctx.QueryAtTime(queryCPUUsageMax, end)
-		resCPUUsageMax, _ = resChCPUUsageMax.Await()
-
-		// This avoids logspam if there is no data for either metric (e.g. if
-		// the Prometheus didn't exist in the queried window of time).
-		if len(resCPUUsageMax) > 0 {
-			log.Debugf("CPU usage recording rule query returned an empty result when queried at %s over %s. Fell back to subquery. Consider setting up Kubecost CPU usage recording role to reduce query load on Prometheus; subqueries are expensive.", end.String(), durStr)
-		}
+	// This avoids logspam if there is no data for either metric (e.g. if
+	// the Prometheus didn't exist in the queried window of time).
+	if len(resCPUUsageMax) > 0 {
+		log.Debugf("CPU usage recording rule query returned an empty result when queried at %s over %s. Fell back to subquery. Consider setting up Kubecost CPU usage recording role to reduce query load on Prometheus; subqueries are expensive.", end.String(), durStr)
 	}
 
 	// GPU Queries
-	//queryIsGpuShared := fmt.Sprintf(queryFmtIsGPuShared, durStr)
-	queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
-
-	queryGPUsUsageAvg := fmt.Sprintf(queryFmtGPUsUsageAvg, durStr, env.GetPromClusterLabel())
-	resChGPUsUsageAvg := ctx.Query(queryGPUsUsageAvg)
-
-	queryGPUsUsageMax := fmt.Sprintf(queryFmtGPUsUsageMax, durStr, env.GetPromClusterLabel())
-	resChGPUsUsageMax := ctx.Query(queryGPUsUsageMax)
-
-	queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
+	resChIsGpuShared := grp.With(ds.QueryIsGPUShared(start, end))
+	resChGPUsAllocated := grp.With(ds.QueryGPUsAllocated(start, end))
+	resChGPUsRequested := grp.With(ds.QueryGPUsRequested(start, end))
+	resChGPUsUsageAvg := grp.With(ds.QueryGPUsUsageAvg(start, end))
+	resChGPUsUsageMax := grp.With(ds.QueryGPUsUsageMax(start, end))
+	resChGetGPUInfo := grp.With(ds.QueryGetGPUInfo(start, end))
 
-	queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
+	resChNodeCostPerCPUHr := grp.With(ds.QueryNodeCostPerCPUHr(start, end))
+	resChNodeCostPerRAMGiBHr := grp.With(ds.QueryNodeCostPerRAMGiBHr(start, end))
+	resChNodeCostPerGPUHr := grp.With(ds.QueryNodeCostPerGPUHr(start, end))
 
-	queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
+	resChNodeIsSpot := grp.With(ds.QueryNodeIsSpot2(start, end))
+	resChPVCInfo := grp.With(ds.QueryPVCInfo2(start, end))
 
-	queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
+	resChPodPVCAllocation := grp.With(ds.QueryPodPVCAllocation(start, end))
+	resChPVCBytesRequested := grp.With(ds.QueryPVCBytesRequested(start, end))
+	resChPVActiveMins := grp.With(ds.QueryPVActiveMins(start, end))
+	resChPVBytes := grp.With(ds.QueryPVBytes(start, end))
+	resChPVCostPerGiBHour := grp.With(ds.QueryPVCostPerGiBHour(start, end))
+	resChPVMeta := grp.With(ds.QueryPVMeta(start, end))
 
-	queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, env.GetPromClusterFilter(), durStr)
-	resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
+	resChNetTransferBytes := grp.With(ds.QueryNetTransferBytes(start, end))
+	resChNetReceiveBytes := grp.With(ds.QueryNetReceiveBytes(start, end))
 
-	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
-	resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
+	resChNetZoneGiB := grp.With(ds.QueryNetZoneGiB(start, end))
+	resChNetZoneCostPerGiB := grp.With(ds.QueryNetZoneCostPerGiB(start, end))
 
-	queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
+	resChNetRegionGiB := grp.With(ds.QueryNetRegionGiB(start, end))
+	resChNetRegionCostPerGiB := grp.With(ds.QueryNetRegionCostPerGiB(start, end))
 
-	queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
+	resChNetInternetGiB := grp.With(ds.QueryNetInternetGiB(start, end))
+	resChNetInternetCostPerGiB := grp.With(ds.QueryNetInternetCostPerGiB(start, end))
 
-	queryPVActiveMins := fmt.Sprintf(queryFmtPVActiveMins, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
-	resChPVActiveMins := ctx.QueryAtTime(queryPVActiveMins, end)
-
-	queryPVBytes := fmt.Sprintf(queryFmtPVBytes, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
-
-	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
-
-	queryPVMeta := fmt.Sprintf(queryFmtPVMeta, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChPVMeta := ctx.QueryAtTime(queryPVMeta, end)
-
-	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
-
-	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
-
-	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
-
-	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
-
-	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
-
-	queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
-
-	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
-
-	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
-
-	//GPU Queries
-	queryIsGpuShared := fmt.Sprintf(queryFmtIsGPuShared, env.GetPromClusterFilter(), durStr)
-	resChIsGpuShared := ctx.QueryAtTime(queryIsGpuShared, end)
-
-	queryGetGPUInfo := fmt.Sprintf(queryFmtGetGPuInfo, env.GetPromClusterFilter(), durStr)
-	resChGetGPUInfo := ctx.QueryAtTime(queryGetGPUInfo, end)
-
-	var resChNodeLabels source.QueryResultsChan
+	var resChNodeLabels *source.QueryGroupAsyncResult
 	if env.GetAllocationNodeLabelsEnabled() {
-		queryNodeLabels := fmt.Sprintf(queryFmtNodeLabels, env.GetPromClusterFilter(), durStr)
-		resChNodeLabels = ctx.QueryAtTime(queryNodeLabels, end)
+		resChNodeLabels = grp.With(ds.QueryNodeLabels(start, end))
 	}
 
-	queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, env.GetPromClusterFilter(), durStr)
-	resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
-
-	queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, env.GetPromClusterFilter(), durStr)
-	resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
-
-	queryPodLabels := fmt.Sprintf(queryFmtPodLabels, env.GetPromClusterFilter(), durStr)
-	resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
-
-	queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, env.GetPromClusterFilter(), durStr)
-	resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
-
-	queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, env.GetPromClusterFilter(), durStr)
-	resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
+	resChNamespaceLabels := grp.With(ds.QueryNamespaceLabels(start, end))
+	resChNamespaceAnnotations := grp.With(ds.QueryNamespaceAnnotations(start, end))
 
-	queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, env.GetPromClusterFilter(), durStr)
-	resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
+	resChPodLabels := grp.With(ds.QueryPodLabels(start, end))
+	resChPodAnnotations := grp.With(ds.QueryPodAnnotations(start, end))
 
-	queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, env.GetPromClusterFilter(), durStr)
-	resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
+	resChServiceLabels := grp.With(ds.QueryServiceLabels(start, end))
+	resChDeploymentLabels := grp.With(ds.QueryDeploymentLabels(start, end))
+	resChStatefulSetLabels := grp.With(ds.QueryStatefulSetLabels(start, end))
+	resChDaemonSetLabels := grp.With(ds.QueryDaemonSetLabels(start, end))
 
-	queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
+	resChPodsWithReplicaSetOwner := grp.With(ds.QueryPodsWithReplicaSetOwner(start, end))
+	resChReplicaSetsWithoutOwners := grp.With(ds.QueryReplicaSetsWithoutOwners(start, end))
+	resChReplicaSetsWithRolloutOwner := grp.With(ds.QueryReplicaSetsWithRollout(start, end))
 
-	queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
+	resChJobLabels := grp.With(ds.QueryJobLabels(start, end))
 
-	queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
-
-	queryReplicaSetsWithRolloutOwner := fmt.Sprintf(queryFmtReplicaSetsWithRolloutOwner, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChReplicaSetsWithRolloutOwner := ctx.QueryAtTime(queryReplicaSetsWithRolloutOwner, end)
-
-	queryJobLabels := fmt.Sprintf(queryFmtJobLabels, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
-
-	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, env.GetPromClusterFilter(), durStr, env.GetPromClusterLabel())
-	resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
-
-	queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
-	resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
+	resChLBCostPerHr := grp.With(ds.QueryLBCostPerHr(start, end))
+	resChLBActiveMins := grp.With(ds.QueryLBActiveMins(start, end))
 
 	resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
 	resCPURequests, _ := resChCPURequests.Await()
@@ -582,7 +380,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	resNodeCostPerRAMGiBHr, _ := resChNodeCostPerRAMGiBHr.Await()
 	resNodeCostPerGPUHr, _ := resChNodeCostPerGPUHr.Await()
 	resNodeIsSpot, _ := resChNodeIsSpot.Await()
-	nodeExtendedData, _ := queryExtendedNodeData(ctx, start, end, durStr, resStr)
+	nodeExtendedData, _ := queryExtendedNodeData(grp, ds, start, end)
 
 	resPVActiveMins, _ := resChPVActiveMins.Await()
 	resPVBytes, _ := resChPVBytes.Await()
@@ -604,9 +402,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 
 	var resNodeLabels []*source.QueryResult
 	if env.GetAllocationNodeLabelsEnabled() {
-		if env.GetAllocationNodeLabelsEnabled() {
-			resNodeLabels, _ = resChNodeLabels.Await()
-		}
+		resNodeLabels, _ = resChNodeLabels.Await()
 	}
 	resNamespaceLabels, _ := resChNamespaceLabels.Await()
 	resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
@@ -623,12 +419,12 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	resLBCostPerHr, _ := resChLBCostPerHr.Await()
 	resLBActiveMins, _ := resChLBActiveMins.Await()
 
-	if ctx.HasErrors() {
-		for _, err := range ctx.Errors() {
+	if grp.HasErrors() {
+		for _, err := range grp.Errors() {
 			log.Errorf("CostModel.ComputeAllocation: query context error %s", err)
 		}
 
-		return allocSet, nil, ctx.ErrorCollection()
+		return allocSet, nil, grp.Error()
 	}
 
 	// We choose to apply allocation before requests in the cases of RAM and

+ 67 - 83
pkg/costmodel/allocation_helpers.go

@@ -14,7 +14,6 @@ import (
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/cloud/provider"
 	"github.com/opencost/opencost/pkg/env"
-	"github.com/opencost/opencost/pkg/prom"
 	"k8s.io/apimachinery/pkg/labels"
 )
 
@@ -39,14 +38,12 @@ const (
 
 /* Pod Helpers */
 
-func (cm *CostModel) buildPodMap(window opencost.Window, resolution, maxBatchSize time.Duration, podMap map[podKey]*pod, clusterStart, clusterEnd map[string]time.Time, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) error {
+func (cm *CostModel) buildPodMap(window opencost.Window, maxBatchSize time.Duration, podMap map[podKey]*pod, clusterStart, clusterEnd map[string]time.Time, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) error {
 	// Assumes that window is positive and closed
 	start, end := *window.Start(), *window.End()
 
-	// Convert resolution duration to a query-ready string
-	resStr := timeutil.DurationString(resolution)
-
-	ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
+	grp := source.NewQueryGroup()
+	ds := cm.DataSource
 
 	// Query for (start, end) by (pod, namespace, cluster) over the given
 	// window, using the given resolution, and if necessary in batches no
@@ -83,18 +80,17 @@ func (cm *CostModel) buildPodMap(window opencost.Window, resolution, maxBatchSiz
 
 			// Submit and profile query
 
-			var queryPods string
-			// If ingesting UIDs, avg on them
+			var queryPodsResult *source.QueryGroupAsyncResult
 			if ingestPodUID {
-				queryPods = fmt.Sprintf(queryFmtPodsUID, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
+				queryPodsResult = grp.With(ds.QueryPodsUID(batchStart, batchEnd))
 			} else {
-				queryPods = fmt.Sprintf(queryFmtPods, env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
+				queryPodsResult = grp.With(ds.QueryPods(batchStart, batchEnd))
 			}
 
 			queryProfile := time.Now()
-			resPods, err = ctx.QueryAtTime(queryPods, batchEnd).Await()
+			resPods, err = queryPodsResult.Await()
 			if err != nil {
-				log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
+				log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, err))
 				resPods = nil
 			}
 		}
@@ -124,7 +120,7 @@ func (cm *CostModel) buildPodMap(window opencost.Window, resolution, maxBatchSiz
 			}
 		}
 
-		applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods, ingestPodUID, podUIDKeyMap)
+		applyPodResults(window, ds.Resolution(), podMap, clusterStart, clusterEnd, resPods, ingestPodUID, podUIDKeyMap)
 
 		coverage = coverage.ExpandEnd(batchEnd)
 		numQuery++
@@ -213,13 +209,13 @@ func applyPodResults(window opencost.Window, resolution time.Duration, podMap ma
 
 func applyCPUCoresAllocated(podMap map[podKey]*pod, resCPUCoresAllocated []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resCPUCoresAllocated {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
+		container, err := res.GetContainer()
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU allocation query result missing 'container': %s", key)
 			continue
@@ -268,13 +264,13 @@ func applyCPUCoresAllocated(podMap map[podKey]*pod, resCPUCoresAllocated []*sour
 
 func applyCPUCoresRequested(podMap map[podKey]*pod, resCPUCoresRequested []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resCPUCoresRequested {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
+		container, err := res.GetContainer()
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU request query result missing 'container': %s", key)
 			continue
@@ -326,19 +322,16 @@ func applyCPUCoresRequested(podMap map[podKey]*pod, resCPUCoresRequested []*sour
 
 func applyCPUCoresUsedAvg(podMap map[podKey]*pod, resCPUCoresUsedAvg []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resCPUCoresUsedAvg {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
-		if container == "" || err != nil {
-			container, err = res.GetString("container_name")
-			if err != nil {
-				log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
-				continue
-			}
+		container, err := res.GetContainer()
+		if err != nil {
+			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage avg query result missing 'container': %s", key)
+			continue
 		}
 
 		var pods []*pod
@@ -374,19 +367,16 @@ func applyCPUCoresUsedAvg(podMap map[podKey]*pod, resCPUCoresUsedAvg []*source.Q
 
 func applyCPUCoresUsedMax(podMap map[podKey]*pod, resCPUCoresUsedMax []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resCPUCoresUsedMax {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
-		if container == "" || err != nil {
-			container, err = res.GetString("container_name")
-			if err != nil {
-				log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
-				continue
-			}
+		container, err := res.GetContainer()
+		if err != nil {
+			log.DedupedWarningf(10, "CostModel.ComputeAllocation: CPU usage max query result missing 'container': %s", key)
+			continue
 		}
 
 		var pods []*pod
@@ -424,13 +414,13 @@ func applyCPUCoresUsedMax(podMap map[podKey]*pod, resCPUCoresUsedMax []*source.Q
 
 func applyRAMBytesAllocated(podMap map[podKey]*pod, resRAMBytesAllocated []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resRAMBytesAllocated {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
+		container, err := res.GetContainer()
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM allocation query result missing 'container': %s", key)
 			continue
@@ -475,13 +465,13 @@ func applyRAMBytesAllocated(podMap map[podKey]*pod, resRAMBytesAllocated []*sour
 
 func applyRAMBytesRequested(podMap map[podKey]*pod, resRAMBytesRequested []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resRAMBytesRequested {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
+		container, err := res.GetContainer()
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM request query result missing 'container': %s", key)
 			continue
@@ -530,19 +520,16 @@ func applyRAMBytesRequested(podMap map[podKey]*pod, resRAMBytesRequested []*sour
 
 func applyRAMBytesUsedAvg(podMap map[podKey]*pod, resRAMBytesUsedAvg []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resRAMBytesUsedAvg {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM avg usage result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
-		if container == "" || err != nil {
-			container, err = res.GetString("container_name")
-			if err != nil {
-				log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
-				continue
-			}
+		container, err := res.GetContainer()
+		if err != nil {
+			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage avg query result missing 'container': %s", key)
+			continue
 		}
 
 		var pods []*pod
@@ -574,19 +561,16 @@ func applyRAMBytesUsedAvg(podMap map[podKey]*pod, resRAMBytesUsedAvg []*source.Q
 
 func applyRAMBytesUsedMax(podMap map[podKey]*pod, resRAMBytesUsedMax []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resRAMBytesUsedMax {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
-		if container == "" || err != nil {
-			container, err = res.GetString("container_name")
-			if err != nil {
-				log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
-				continue
-			}
+		container, err := res.GetContainer()
+		if err != nil {
+			log.DedupedWarningf(10, "CostModel.ComputeAllocation: RAM usage max query result missing 'container': %s", key)
+			continue
 		}
 
 		var pods []*pod
@@ -626,7 +610,7 @@ func applyRAMBytesUsedMax(podMap map[podKey]*pod, resRAMBytesUsedMax []*source.Q
 func applyGPUUsage(podMap map[podKey]*pod, resGPUUsageAvgOrMax []*source.QueryResult, podUIDKeyMap map[podKey][]podKey, mode string) {
 	// Example PromQueryResult: {container="dcgmproftester12", namespace="gpu", pod="dcgmproftester3-deployment-fc89c8dd6-ph7z5"} 0.997307
 	for _, res := range resGPUUsageAvgOrMax {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU usage avg/max result missing field: %s", err)
 			continue
@@ -649,7 +633,7 @@ func applyGPUUsage(podMap map[podKey]*pod, resGPUUsageAvgOrMax []*source.QueryRe
 		}
 
 		for _, thisPod := range pods {
-			container, err := res.GetString("container")
+			container, err := res.GetContainer()
 			if err != nil {
 				log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU usage avg/max query result missing 'container': %s", key)
 				continue
@@ -726,13 +710,13 @@ func applyGPUsAllocated(podMap map[podKey]*pod, resGPUsRequested []*source.Query
 		resGPUsRequested = resGPUsAllocated
 	}
 	for _, res := range resGPUsRequested {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request result missing field: %s", err)
 			continue
 		}
 
-		container, err := res.GetString("container")
+		container, err := res.GetContainer()
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: GPU request query result missing 'container': %s", key)
 			continue
@@ -782,7 +766,7 @@ func applyGPUsAllocated(podMap map[podKey]*pod, resGPUsRequested []*source.Query
 
 func applyNetworkTotals(podMap map[podKey]*pod, resNetworkTransferBytes []*source.QueryResult, resNetworkReceiveBytes []*source.QueryResult, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resNetworkTransferBytes {
-		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		podKey, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Transfer Bytes query result missing field: %s", err)
 			continue
@@ -812,7 +796,7 @@ func applyNetworkTotals(podMap map[podKey]*pod, resNetworkTransferBytes []*sourc
 		}
 	}
 	for _, res := range resNetworkReceiveBytes {
-		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		podKey, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network Receive Bytes query result missing field: %s", err)
 			continue
@@ -856,7 +840,7 @@ func applyNetworkAllocation(podMap map[podKey]*pod, resNetworkGiB []*source.Quer
 	}
 
 	for _, res := range resNetworkGiB {
-		podKey, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		podKey, err := resultPodKey(res)
 		if err != nil {
 			log.DedupedWarningf(10, "CostModel.ComputeAllocation: Network allocation query result missing field: %s", err)
 			continue
@@ -904,7 +888,7 @@ func resToNodeLabels(resNodeLabels []*source.QueryResult) map[nodeKey]map[string
 	nodeLabels := map[nodeKey]map[string]string{}
 
 	for _, res := range resNodeLabels {
-		nodeKey, err := resultNodeKey(res, env.GetPromClusterLabel(), "node")
+		nodeKey, err := resultNodeKey(res)
 		if err != nil {
 			continue
 		}
@@ -940,7 +924,7 @@ func resToNamespaceLabels(resNamespaceLabels []*source.QueryResult) map[namespac
 	namespaceLabels := map[namespaceKey]map[string]string{}
 
 	for _, res := range resNamespaceLabels {
-		nsKey, err := resultNamespaceKey(res, env.GetPromClusterLabel(), "namespace")
+		nsKey, err := resultNamespaceKey(res)
 		if err != nil {
 			continue
 		}
@@ -961,7 +945,7 @@ func resToPodLabels(resPodLabels []*source.QueryResult, podUIDKeyMap map[podKey]
 	podLabels := map[podKey]map[string]string{}
 
 	for _, res := range resPodLabels {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			continue
 		}
@@ -1017,7 +1001,7 @@ func resToPodAnnotations(resPodAnnotations []*source.QueryResult, podUIDKeyMap m
 	podAnnotations := map[podKey]map[string]string{}
 
 	for _, res := range resPodAnnotations {
-		key, err := resultPodKey(res, env.GetPromClusterLabel(), "namespace")
+		key, err := resultPodKey(res)
 		if err != nil {
 			continue
 		}
@@ -1132,7 +1116,7 @@ func resToDeploymentLabels(resDeploymentLabels []*source.QueryResult) map[contro
 	deploymentLabels := map[controllerKey]map[string]string{}
 
 	for _, res := range resDeploymentLabels {
-		controllerKey, err := resultDeploymentKey(res, env.GetPromClusterLabel(), "namespace", "deployment")
+		controllerKey, err := resultDeploymentKey(res, "deployment")
 		if err != nil {
 			continue
 		}
@@ -1165,7 +1149,7 @@ func resToStatefulSetLabels(resStatefulSetLabels []*source.QueryResult) map[cont
 	statefulSetLabels := map[controllerKey]map[string]string{}
 
 	for _, res := range resStatefulSetLabels {
-		controllerKey, err := resultStatefulSetKey(res, env.GetPromClusterLabel(), "namespace", "statefulSet")
+		controllerKey, err := resultStatefulSetKey(res, "statefulSet")
 		if err != nil {
 			continue
 		}
@@ -1227,12 +1211,12 @@ func resToPodDaemonSetMap(resDaemonSetLabels []*source.QueryResult, podUIDKeyMap
 	daemonSetLabels := map[podKey]controllerKey{}
 
 	for _, res := range resDaemonSetLabels {
-		controllerKey, err := resultDaemonSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
+		controllerKey, err := resultDaemonSetKey(res, "owner_name")
 		if err != nil {
 			continue
 		}
 
-		pod, err := res.GetString("pod")
+		pod, err := res.GetPod()
 		if err != nil {
 			log.Warnf("CostModel.ComputeAllocation: DaemonSetLabel result without pod: %s", controllerKey)
 		}
@@ -1263,7 +1247,7 @@ func resToPodJobMap(resJobLabels []*source.QueryResult, podUIDKeyMap map[podKey]
 	jobLabels := map[podKey]controllerKey{}
 
 	for _, res := range resJobLabels {
-		controllerKey, err := resultJobKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
+		controllerKey, err := resultJobKey(res, "owner_name")
 		if err != nil {
 			continue
 		}
@@ -1275,7 +1259,7 @@ func resToPodJobMap(resJobLabels []*source.QueryResult, podUIDKeyMap map[podKey]
 			controllerKey.Controller = match[1]
 		}
 
-		pod, err := res.GetString("pod")
+		pod, err := res.GetPod()
 		if err != nil {
 			log.Warnf("CostModel.ComputeAllocation: JobLabel result without pod: %s", controllerKey)
 		}
@@ -1313,7 +1297,7 @@ func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*source.QueryResult, res
 
 	// Create unowned ReplicaSet controller keys
 	for _, res := range resReplicaSetsWithoutOwners {
-		controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
+		controllerKey, err := resultReplicaSetKey(res, "replicaset")
 		if err != nil {
 			continue
 		}
@@ -1323,7 +1307,7 @@ func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*source.QueryResult, res
 
 	// Create Rollout-owned ReplicaSet controller keys
 	for _, res := range resReplicaSetsWithRolloutOwner {
-		controllerKey, err := resultReplicaSetRolloutKey(res, env.GetPromClusterLabel(), "namespace", "replicaset")
+		controllerKey, err := resultReplicaSetRolloutKey(res, "replicaset")
 		if err != nil {
 			continue
 		}
@@ -1337,13 +1321,13 @@ func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*source.QueryResult, res
 
 	for _, res := range resPodsWithReplicaSetOwner {
 		// First, check if this pod is owned by an unowned ReplicaSet
-		controllerKey, err := resultReplicaSetKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
+		controllerKey, err := resultReplicaSetKey(res, "owner_name")
 		if err != nil {
 			continue
 		} else if _, ok := replicaSets[controllerKey]; !ok {
 			// If the pod is not owned by an unowned ReplicaSet, check if
 			// it's owned by a Rollout-owned ReplicaSet
-			controllerKey, err = resultReplicaSetRolloutKey(res, env.GetPromClusterLabel(), "namespace", "owner_name")
+			controllerKey, err = resultReplicaSetRolloutKey(res, "owner_name")
 			if err != nil {
 				continue
 			} else if _, ok := replicaSets[controllerKey]; !ok {
@@ -1351,7 +1335,7 @@ func resToPodReplicaSetMap(resPodsWithReplicaSetOwner []*source.QueryResult, res
 			}
 		}
 
-		pod, err := res.GetString("pod")
+		pod, err := res.GetPod()
 		if err != nil {
 			log.Warnf("CostModel.ComputeAllocation: ReplicaSet result without pod: %s", controllerKey)
 		}
@@ -1393,7 +1377,7 @@ func getServiceLabels(resServiceLabels []*source.QueryResult) map[serviceKey]map
 	serviceLabels := map[serviceKey]map[string]string{}
 
 	for _, res := range resServiceLabels {
-		serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service")
+		serviceKey, err := resultServiceKey(res, "service")
 		if err != nil {
 			continue
 		}
@@ -1467,7 +1451,7 @@ func applyServicesToPods(podMap map[podKey]*pod, podLabels map[podKey]map[string
 
 func getLoadBalancerCosts(lbMap map[serviceKey]*lbCost, resLBCost, resLBActiveMins []*source.QueryResult, resolution time.Duration, window opencost.Window) {
 	for _, res := range resLBActiveMins {
-		serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
+		serviceKey, err := resultServiceKey(res, "service_name")
 		if err != nil || len(res.Values) == 0 {
 			continue
 		}
@@ -1485,7 +1469,7 @@ func getLoadBalancerCosts(lbMap map[serviceKey]*lbCost, resLBCost, resLBActiveMi
 	}
 
 	for _, res := range resLBCost {
-		serviceKey, err := resultServiceKey(res, env.GetPromClusterLabel(), "namespace", "service_name")
+		serviceKey, err := resultServiceKey(res, "service_name")
 		if err != nil {
 			continue
 		}
@@ -1920,7 +1904,7 @@ func (cm *CostModel) getNodePricing(nodeMap map[nodeKey]*nodePricing, nodeKey no
 
 func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHour, resPVActiveMins, resPVMeta []*source.QueryResult, window opencost.Window) {
 	for _, result := range resPVActiveMins {
-		key, err := resultPVKey(result, env.GetPromClusterLabel(), "persistentvolume")
+		key, err := resultPVKey(result, "persistentvolume")
 		if err != nil {
 			log.Warnf("CostModel.ComputeAllocation: pv bytes query result missing field: %s", err)
 			continue
@@ -1940,7 +1924,7 @@ func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHo
 	}
 
 	for _, result := range resPVCostPerGiBHour {
-		key, err := resultPVKey(result, env.GetPromClusterLabel(), "volumename")
+		key, err := resultPVKey(result, "volumename")
 		if err != nil {
 			log.Warnf("CostModel.ComputeAllocation: thisPV bytes query result missing field: %s", err)
 			continue
@@ -1957,7 +1941,7 @@ func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHo
 	}
 
 	for _, result := range resPVMeta {
-		key, err := resultPVKey(result, env.GetPromClusterLabel(), "persistentvolume")
+		key, err := resultPVKey(result, "persistentvolume")
 		if err != nil {
 			log.Warnf("error getting key for PV: %v", err)
 			continue
@@ -1978,7 +1962,7 @@ func buildPVMap(resolution time.Duration, pvMap map[pvKey]*pv, resPVCostPerGiBHo
 
 func applyPVBytes(pvMap map[pvKey]*pv, resPVBytes []*source.QueryResult) {
 	for _, res := range resPVBytes {
-		key, err := resultPVKey(res, env.GetPromClusterLabel(), "persistentvolume")
+		key, err := resultPVKey(res, "persistentvolume")
 		if err != nil {
 			log.Warnf("CostModel.ComputeAllocation: pv bytes query result missing field: %s", err)
 			continue
@@ -2046,7 +2030,7 @@ func buildPVCMap(resolution time.Duration, pvcMap map[pvcKey]*pvc, pvMap map[pvK
 
 func applyPVCBytesRequested(pvcMap map[pvcKey]*pvc, resPVCBytesRequested []*source.QueryResult) {
 	for _, res := range resPVCBytesRequested {
-		key, err := resultPVCKey(res, env.GetPromClusterLabel(), "namespace", "persistentvolumeclaim")
+		key, err := resultPVCKey(res, "persistentvolumeclaim")
 		if err != nil {
 			continue
 		}

+ 2 - 2
pkg/costmodel/allocationnode.go

@@ -5,7 +5,7 @@ package costmodel
 import (
 	"time"
 
-	"github.com/opencost/opencost/pkg/prom"
+	"github.com/opencost/opencost/core/pkg/source"
 )
 
 // These implementations are placeholders to allow conditional compilation of
@@ -22,7 +22,7 @@ import (
 type extendedNodeQueryResults struct{}
 
 // queryExtendedNodeData is a place holder function for the incubating feature
-func queryExtendedNodeData(ctx *prom.Context, start, end time.Time, durStr, resStr string) (*extendedNodeQueryResults, error) {
+func queryExtendedNodeData(grp *source.QueryGroup, ds source.OpenCostDataSource, start, end time.Time) (*extendedNodeQueryResults, error) {
 	return &extendedNodeQueryResults{}, nil
 }
 

+ 6 - 123
pkg/costmodel/costmodel.go

@@ -45,7 +45,6 @@ type CostModel struct {
 	ClusterMap      clusters.ClusterMap
 	BatchDuration   time.Duration
 	RequestGroup    *singleflight.Group
-	RefreshInterval time.Duration
 	DataSource      source.OpenCostDataSource
 	Provider        costAnalyzerCloud.Provider
 	pricingMetadata *costAnalyzerCloud.PricingMatchMetadata
@@ -57,19 +56,17 @@ func NewCostModel(
 	cache clustercache.ClusterCache,
 	clusterMap clusters.ClusterMap,
 	batchDuration time.Duration,
-	refreshInterval time.Duration,
 ) *CostModel {
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 
 	return &CostModel{
-		Cache:           cache,
-		ClusterMap:      clusterMap,
-		BatchDuration:   batchDuration,
-		DataSource:      dataSource,
-		Provider:        provider,
-		RequestGroup:    requestGroup,
-		RefreshInterval: refreshInterval,
+		Cache:         cache,
+		ClusterMap:    clusterMap,
+		BatchDuration: batchDuration,
+		DataSource:    dataSource,
+		Provider:      provider,
+		RequestGroup:  requestGroup,
 	}
 }
 
@@ -136,121 +133,7 @@ func (cd *CostData) GetController() (name string, kind string, hasController boo
 	return name, kind, hasController
 }
 
-const (
-	queryRAMRequestsStr = `avg(
-		label_replace(
-			label_replace(
-				sum_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="",container!="POD", node!="", %s}[%s] %s)
-				, "container_name","$1","container","(.+)"
-			), "pod_name","$1","pod","(.+)"
-		)
-	) by (namespace,container_name,pod_name,node,%s)`
-	queryRAMUsageStr = `avg(
-		label_replace(
-			label_replace(
-				label_replace(
-					sum_over_time(container_memory_working_set_bytes{container!="", container!="POD", instance!="", %s}[%s] %s), "node", "$1", "instance", "(.+)"
-				), "container_name", "$1", "container", "(.+)"
-			), "pod_name", "$1", "pod", "(.+)"
-		)
-	) by (namespace, container_name, pod_name, node, %s)`
-	queryCPURequestsStr = `avg(
-		label_replace(
-			label_replace(
-				sum_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="",container!="POD", node!="", %s}[%s] %s)
-				, "container_name","$1","container","(.+)"
-			), "pod_name","$1","pod","(.+)"
-		)
-	) by (namespace,container_name,pod_name,node,%s)`
-	queryCPUUsageStr = `avg(
-		label_replace(
-			label_replace(
-				label_replace(
-					rate(
-						container_cpu_usage_seconds_total{container!="", container!="POD", instance!="", %s}[%s] %s
-					), "node", "$1", "instance", "(.+)"
-				), "container_name", "$1", "container", "(.+)"
-			), "pod_name", "$1", "pod", "(.+)"
-		)
-	) by (namespace, container_name, pod_name, node, %s)`
-	queryGPURequestsStr = `avg(
-		label_replace(
-			label_replace(
-				sum_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!="", %s}[%s] %s),
-				"container_name","$1","container","(.+)"
-			), "pod_name","$1","pod","(.+)"
-		)
-	) by (namespace,container_name,pod_name,node,%s)`
-	queryPVRequestsStr = `avg(avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, namespace, volumename, %s, kubernetes_node)
-	*
-	on (persistentvolumeclaim, namespace, %s, kubernetes_node) group_right(storageclass, volumename)
-	sum(kube_persistentvolumeclaim_resource_requests_storage_bytes{%s}) by (persistentvolumeclaim, namespace, %s, kubernetes_node, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, %s, volumename, kubernetes_node)`
-	// queryRAMAllocationByteHours yields the total byte-hour RAM allocation over the given
-	// window, aggregated by container.
-	//  [line 3]  sum_over_time(each byte) = [byte*scrape] by metric
-	//  [line 4] (scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by container
-	//  [lines 2,4]  sum(") by unique container key and multiply [byte*scrape] * [hours/scrape] for byte*hours
-	//  [lines 1,5]  relabeling
-	queryRAMAllocationByteHours = `
-		label_replace(label_replace(
-			sum(
-				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!="", %s}[%s])
-			) by (namespace,container,pod,node,%s) * %f / 60 / 60
-		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
-	// queryCPUAllocationVCPUHours yields the total VCPU-hour CPU allocation over the given
-	// window, aggregated by container.
-	//  [line 3] sum_over_time(each VCPU*mins in window) = [VCPU*scrape] by metric
-	//  [line 4] (scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by container
-	//  [lines 2,4]  sum(") by unique container key and multiply [VCPU*scrape] * [hours/scrape] for VCPU*hours
-	//  [lines 1,5]  relabeling
-	queryCPUAllocationVCPUHours = `
-		label_replace(label_replace(
-			sum(
-				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!="", %s}[%s])
-			) by (namespace,container,pod,node,%s) * %f / 60 / 60
-		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
-	// queryPVCAllocationFmt yields the total byte-hour PVC allocation over the given window.
-	// sum_over_time(each byte) = [byte*scrape] by metric *(scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by pod
-	queryPVCAllocationFmt     = `sum(sum_over_time(pod_pvc_allocation{%s}[%s])) by (%s, namespace, pod, persistentvolume, persistentvolumeclaim) * %f/60/60`
-	queryPVHourlyCostFmt      = `avg_over_time(pv_hourly_cost{%s}[%s])`
-	queryNSLabels             = `avg_over_time(kube_namespace_labels{%s}[%s])`
-	queryPodLabels            = `avg_over_time(kube_pod_labels{%s}[%s])`
-	queryNSAnnotations        = `avg_over_time(kube_namespace_annotations{%s}[%s])`
-	queryPodAnnotations       = `avg_over_time(kube_pod_annotations{%s}[%s])`
-	queryDeploymentLabels     = `avg_over_time(deployment_match_labels{%s}[%s])`
-	queryStatefulsetLabels    = `avg_over_time(statefulSet_match_labels{%s}[%s])`
-	queryPodDaemonsets        = `sum(kube_pod_owner{owner_kind="DaemonSet", %s}) by (namespace,pod,owner_name,%s)`
-	queryPodJobs              = `sum(kube_pod_owner{owner_kind="Job", %s}) by (namespace,pod,owner_name,%s)`
-	queryServiceLabels        = `avg_over_time(service_selector_labels{%s}[%s])`
-	queryZoneNetworkUsage     = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true", %s}[%s] %s)) by (namespace,pod_name,%s) / 1024 / 1024 / 1024`
-	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false", %s}[%s] %s)) by (namespace,pod_name,%s) / 1024 / 1024 / 1024`
-	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true", %s}[%s] %s)) by (namespace,pod_name,%s) / 1024 / 1024 / 1024`
-	normalizationStr          = `max(count_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", %s}[%s] %s))`
-)
-
 func (cm *CostModel) ComputeCostData(window string, offset string, filterNamespace string) (map[string]*CostData, error) {
-
-	/*
-		queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, env.GetPromClusterFilter(), window, offset, env.GetPromClusterLabel())
-		queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, env.GetPromClusterFilter(), window, offset, env.GetPromClusterLabel())
-		queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, env.GetPromClusterFilter(), window, "", env.GetPromClusterLabel())
-		queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, env.GetPromClusterFilter(), window, "", env.GetPromClusterLabel())
-		queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, env.GetPromClusterFilter(), window, "", env.GetPromClusterLabel())
-		queryNormalization := fmt.Sprintf(normalizationStr, env.GetPromClusterFilter(), window, offset)
-
-		// Cluster ID is specific to the source cluster
-		clusterID := env.GetClusterID()
-
-		// Submit all Prometheus queries asynchronously
-		ctx := prom.NewNamedContext(cli, prom.ComputeCostDataContextName)
-		resChRAMUsage := ctx.Query(queryRAMUsage)
-		resChCPUUsage := ctx.Query(queryCPUUsage)
-		resChNetZoneRequests := ctx.Query(queryNetZoneRequests)
-		resChNetRegionRequests := ctx.Query(queryNetRegionRequests)
-		resChNetInternetRequests := ctx.Query(queryNetInternetRequests)
-		resChNormalization := ctx.Query(queryNormalization)
-	*/
-
 	// Cluster ID is specific to the source cluster
 	clusterID := env.GetClusterID()
 	cp := cm.Provider

+ 3 - 2
pkg/costmodel/csv_export.go

@@ -14,12 +14,13 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/filemanager"
 )
 
 type AllocationModel interface {
 	ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error)
-	DateRange() (time.Time, time.Time, error)
+	DateRange(limitDays int) (time.Time, time.Time, error)
 }
 
 var errNoData = errors.New("no data")
@@ -134,7 +135,7 @@ func (e *csvExporter) updateExportCSV(ctx context.Context, previousExportTmp *os
 }
 
 func (e *csvExporter) availableAllocationDates() (map[time.Time]struct{}, error) {
-	start, end, err := e.Model.DateRange()
+	start, end, err := e.Model.DateRange(env.GetExportCSVMaxDays())
 	if err != nil {
 		return nil, err
 	}

+ 5 - 5
pkg/costmodel/csv_export_test.go

@@ -19,7 +19,7 @@ func Test_UpdateCSV(t *testing.T) {
 	t.Run("previous data doesn't exist, upload new data", func(t *testing.T) {
 		storage := &filemanager.InMemoryFile{}
 		model := &AllocationModelMock{
-			DateRangeFunc: func() (time.Time, time.Time, error) {
+			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), nil
 			},
 			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
@@ -74,7 +74,7 @@ func Test_UpdateCSV(t *testing.T) {
 	t.Run("export labels", func(t *testing.T) {
 		storage := &filemanager.InMemoryFile{}
 		model := &AllocationModelMock{
-			DateRangeFunc: func() (time.Time, time.Time, error) {
+			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), nil
 			},
 			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
@@ -113,7 +113,7 @@ func Test_UpdateCSV(t *testing.T) {
 `),
 		}
 		model := &AllocationModelMock{
-			DateRangeFunc: func() (time.Time, time.Time, error) {
+			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), nil
 			},
 			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
@@ -151,7 +151,7 @@ func Test_UpdateCSV(t *testing.T) {
 			Data: []byte(data),
 		}
 		model := &AllocationModelMock{
-			DateRangeFunc: func() (time.Time, time.Time, error) {
+			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), nil
 			},
 		}
@@ -168,7 +168,7 @@ func Test_UpdateCSV(t *testing.T) {
 					Allocations: nil,
 				}, nil
 			},
-			DateRangeFunc: func() (time.Time, time.Time, error) {
+			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), nil
 			},
 		}

+ 40 - 43
pkg/costmodel/key.go

@@ -34,28 +34,28 @@ func newContainerKey(cluster, namespace, pod, container string) containerKey {
 // "cluster_id" as the containerKey's Cluster field. If a given field does not
 // exist on the result, an error is returned. (The only exception to that is
 // clusterLabel, which we expect may not exist, but has a default value.)
-func resultContainerKey(res *source.QueryResult, clusterLabel, namespaceLabel, podLabel, containerLabel string) (containerKey, error) {
+func resultContainerKey(res *source.QueryResult) (containerKey, error) {
 	key := containerKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}
 	key.Cluster = cluster
 
-	namespace, err := res.GetString(namespaceLabel)
+	namespace, err := res.GetNamespace()
 	if err != nil {
 		return key, err
 	}
 	key.Namespace = namespace
 
-	pod, err := res.GetString(podLabel)
+	pod, err := res.GetPod()
 	if err != nil {
 		return key, err
 	}
 	key.Pod = pod
 
-	container, err := res.GetString(containerLabel)
+	container, err := res.GetContainer()
 	if err != nil {
 		return key, err
 	}
@@ -94,27 +94,24 @@ func getUnmountedPodKey(cluster string) podKey {
 // as the podKey's Cluster field. If a given field does not exist on the
 // result, an error is returned. (The only exception to that is clusterLabel,
 // which we expect may not exist, but has a default value.)
-func resultPodKey(res *source.QueryResult, clusterLabel, namespaceLabel string) (podKey, error) {
+func resultPodKey(res *source.QueryResult) (podKey, error) {
 	key := podKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}
 	key.Cluster = cluster
 
-	namespace, err := res.GetString(namespaceLabel)
+	namespace, err := res.GetNamespace()
 	if err != nil {
 		return key, err
 	}
 	key.Namespace = namespace
 
-	pod, err := res.GetString("pod")
-	if pod == "" || err != nil {
-		pod, err = res.GetString("pod_name")
-		if err != nil {
-			return key, err
-		}
+	pod, err := res.GetPod()
+	if err != nil {
+		return key, err
 	}
 	key.Pod = pod
 
@@ -143,16 +140,16 @@ func newNamespaceKey(cluster, namespace string) namespaceKey {
 // "cluster_id" as the namespaceKey's Cluster field. If a given field does not
 // exist on the result, an error is returned. (The only exception to that is
 // clusterLabel, which we expect may not exist, but has a default value.)
-func resultNamespaceKey(res *source.QueryResult, clusterLabel, namespaceLabel string) (namespaceKey, error) {
+func resultNamespaceKey(res *source.QueryResult) (namespaceKey, error) {
 	key := namespaceKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}
 	key.Cluster = cluster
 
-	namespace, err := res.GetString(namespaceLabel)
+	namespace, err := res.GetNamespace()
 	if err != nil {
 		return key, err
 	}
@@ -187,16 +184,16 @@ func newControllerKey(cluster, namespace, controllerKind, controller string) con
 // "cluster_id" as the controllerKey's Cluster field. If a given field does not
 // exist on the result, an error is returned. (The only exception to that is
 // clusterLabel, which we expect may not exist, but has a default value.)
-func resultControllerKey(controllerKind string, res *source.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
+func resultControllerKey(controllerKind string, res *source.QueryResult, controllerLabel string) (controllerKey, error) {
 	key := controllerKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}
 	key.Cluster = cluster
 
-	namespace, err := res.GetString(namespaceLabel)
+	namespace, err := res.GetNamespace()
 	if err != nil {
 		return key, err
 	}
@@ -215,38 +212,38 @@ func resultControllerKey(controllerKind string, res *source.QueryResult, cluster
 
 // resultDeploymentKey creates a controllerKey for a Deployment.
 // (See resultControllerKey for more.)
-func resultDeploymentKey(res *source.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
-	return resultControllerKey("deployment", res, clusterLabel, namespaceLabel, controllerLabel)
+func resultDeploymentKey(res *source.QueryResult, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("deployment", res, controllerLabel)
 }
 
 // resultStatefulSetKey creates a controllerKey for a StatefulSet.
 // (See resultControllerKey for more.)
-func resultStatefulSetKey(res *source.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
-	return resultControllerKey("statefulset", res, clusterLabel, namespaceLabel, controllerLabel)
+func resultStatefulSetKey(res *source.QueryResult, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("statefulset", res, controllerLabel)
 }
 
 // resultDaemonSetKey creates a controllerKey for a DaemonSet.
 // (See resultControllerKey for more.)
-func resultDaemonSetKey(res *source.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
-	return resultControllerKey("daemonset", res, clusterLabel, namespaceLabel, controllerLabel)
+func resultDaemonSetKey(res *source.QueryResult, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("daemonset", res, controllerLabel)
 }
 
 // resultJobKey creates a controllerKey for a Job.
 // (See resultControllerKey for more.)
-func resultJobKey(res *source.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
-	return resultControllerKey("job", res, clusterLabel, namespaceLabel, controllerLabel)
+func resultJobKey(res *source.QueryResult, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("job", res, controllerLabel)
 }
 
 // resultReplicaSetKey creates a controllerKey for a Job.
 // (See resultControllerKey for more.)
-func resultReplicaSetKey(res *source.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
-	return resultControllerKey("replicaset", res, clusterLabel, namespaceLabel, controllerLabel)
+func resultReplicaSetKey(res *source.QueryResult, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("replicaset", res, controllerLabel)
 }
 
 // resultReplicaSetRolloutKey creates a controllerKey for a Job.
 // (See resultControllerKey for more.)
-func resultReplicaSetRolloutKey(res *source.QueryResult, clusterLabel, namespaceLabel, controllerLabel string) (controllerKey, error) {
-	return resultControllerKey("rollout", res, clusterLabel, namespaceLabel, controllerLabel)
+func resultReplicaSetRolloutKey(res *source.QueryResult, controllerLabel string) (controllerKey, error) {
+	return resultControllerKey("rollout", res, controllerLabel)
 }
 
 type serviceKey struct {
@@ -273,16 +270,16 @@ func newServiceKey(cluster, namespace, service string) serviceKey {
 // "cluster_id" as the serviceKey's Cluster field. If a given field does not
 // exist on the result, an error is returned. (The only exception to that is
 // clusterLabel, which we expect may not exist, but has a default value.)
-func resultServiceKey(res *source.QueryResult, clusterLabel, namespaceLabel, serviceLabel string) (serviceKey, error) {
+func resultServiceKey(res *source.QueryResult, serviceLabel string) (serviceKey, error) {
 	key := serviceKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}
 	key.Cluster = cluster
 
-	namespace, err := res.GetString(namespaceLabel)
+	namespace, err := res.GetNamespace()
 	if err != nil {
 		return key, err
 	}
@@ -319,16 +316,16 @@ func newNodeKey(cluster, node string) nodeKey {
 // "cluster_id" as the nodeKey's Cluster field. If a given field does not
 // exist on the result, an error is returned. (The only exception to that is
 // clusterLabel, which we expect may not exist, but has a default value.)
-func resultNodeKey(res *source.QueryResult, clusterLabel, nodeLabel string) (nodeKey, error) {
+func resultNodeKey(res *source.QueryResult) (nodeKey, error) {
 	key := nodeKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}
 	key.Cluster = cluster
 
-	node, err := res.GetString(nodeLabel)
+	node, err := res.GetNode()
 	if err != nil {
 		return key, err
 	}
@@ -361,16 +358,16 @@ func newPVCKey(cluster, namespace, persistentVolumeClaim string) pvcKey {
 // "cluster_id" as the pvcKey's Cluster field. If a given field does not
 // exist on the result, an error is returned. (The only exception to that is
 // clusterLabel, which we expect may not exist, but has a default value.)
-func resultPVCKey(res *source.QueryResult, clusterLabel, namespaceLabel, pvcLabel string) (pvcKey, error) {
+func resultPVCKey(res *source.QueryResult, pvcLabel string) (pvcKey, error) {
 	key := pvcKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}
 	key.Cluster = cluster
 
-	namespace, err := res.GetString(namespaceLabel)
+	namespace, err := res.GetNamespace()
 	if err != nil {
 		return key, err
 	}
@@ -407,10 +404,10 @@ func newPVKey(cluster, persistentVolume string) pvKey {
 // "cluster_id" as the pvKey's Cluster field. If a given field does not
 // exist on the result, an error is returned. (The only exception to that is
 // clusterLabel, which we expect may not exist, but has a default value.)
-func resultPVKey(res *source.QueryResult, clusterLabel, persistentVolumeLabel string) (pvKey, error) {
+func resultPVKey(res *source.QueryResult, persistentVolumeLabel string) (pvKey, error) {
 	key := pvKey{}
 
-	cluster, err := res.GetString(clusterLabel)
+	cluster, err := res.GetCluster()
 	if err != nil {
 		cluster = env.GetClusterID()
 	}

+ 2 - 4
pkg/costmodel/metrics.go

@@ -18,7 +18,6 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/errors"
 	"github.com/opencost/opencost/pkg/metrics"
-	"github.com/opencost/opencost/pkg/prom"
 
 	promclient "github.com/prometheus/client_golang/api"
 	"github.com/prometheus/client_golang/prometheus"
@@ -325,7 +324,7 @@ type CostModelMetricsEmitter struct {
 }
 
 // NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
-func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clustercache.ClusterCache, provider models.Provider, clusterInfo clusters.ClusterInfoProvider, model *CostModel) *CostModelMetricsEmitter {
+func NewCostModelMetricsEmitter(clusterCache clustercache.ClusterCache, provider models.Provider, clusterInfo clusters.ClusterInfoProvider, model *CostModel) *CostModelMetricsEmitter {
 
 	// Get metric configurations, if any
 	metricsConfig, err := metrics.GetMetricsConfig()
@@ -352,7 +351,6 @@ func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clust
 	metrics.InitOpencostTelemetry(metricsConfig)
 
 	return &CostModelMetricsEmitter{
-		PrometheusClient:              promClient,
 		KubeClusterCache:              clusterCache,
 		CloudProvider:                 provider,
 		Model:                         model,
@@ -460,7 +458,7 @@ func (cmme *CostModelMetricsEmitter) Start() bool {
 				// For an error collection, we'll just log the length of the errors (ComputeCostData already logs the
 				// actual errors)
 				if source.IsErrorCollection(err) {
-					if ec, ok := err.(prom.QueryErrorCollection); ok {
+					if ec, ok := err.(source.QueryErrorCollection); ok {
 						log.Errorf("Error in price recording: %d errors occurred", len(ec.Errors()))
 					}
 				} else {

+ 7 - 3
pkg/costmodel/moq_allocation_model_test.go

@@ -36,7 +36,7 @@ type AllocationModelMock struct {
 	ComputeAllocationFunc func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error)
 
 	// DateRangeFunc mocks the DateRange method.
-	DateRangeFunc func() (time.Time, time.Time, error)
+	DateRangeFunc func(int) (time.Time, time.Time, error)
 
 	// calls tracks calls to the methods.
 	calls struct {
@@ -51,6 +51,7 @@ type AllocationModelMock struct {
 		}
 		// DateRange holds details about calls to the DateRange method.
 		DateRange []struct {
+			LimitDays int 
 		}
 	}
 	lockComputeAllocation sync.RWMutex
@@ -98,16 +99,17 @@ func (mock *AllocationModelMock) ComputeAllocationCalls() []struct {
 }
 
 // DateRange calls DateRangeFunc.
-func (mock *AllocationModelMock) DateRange() (time.Time, time.Time, error) {
+func (mock *AllocationModelMock) DateRange(limitDays int) (time.Time, time.Time, error) {
 	if mock.DateRangeFunc == nil {
 		panic("AllocationModelMock.DateRangeFunc: method is nil but AllocationModel.DateRange was just called")
 	}
 	callInfo := struct {
+		LimitDays int
 	}{}
 	mock.lockDateRange.Lock()
 	mock.calls.DateRange = append(mock.calls.DateRange, callInfo)
 	mock.lockDateRange.Unlock()
-	return mock.DateRangeFunc()
+	return mock.DateRangeFunc(limitDays)
 }
 
 // DateRangeCalls gets all the calls that were made to DateRange.
@@ -115,8 +117,10 @@ func (mock *AllocationModelMock) DateRange() (time.Time, time.Time, error) {
 //
 //	len(mockedAllocationModel.DateRangeCalls())
 func (mock *AllocationModelMock) DateRangeCalls() []struct {
+	LimitDays int
 } {
 	var calls []struct {
+		LimitDays int
 	}
 	mock.lockDateRange.RLock()
 	calls = mock.calls.DateRange

+ 3 - 4
pkg/costmodel/resultparsers.go

@@ -10,7 +10,6 @@ import (
 	"github.com/opencost/opencost/core/pkg/util"
 	costAnalyzerCloud "github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/clustercache"
-	"github.com/opencost/opencost/pkg/prom"
 )
 
 func GetPVInfoLocal(cache clustercache.ClusterCache, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
@@ -487,10 +486,10 @@ func getCost(qrs []*source.QueryResult) (map[string][]*util.Vector, error) {
 // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
 func getNormalization(qrs []*source.QueryResult) (float64, error) {
 	if len(qrs) == 0 {
-		return 0.0, prom.NoDataErr("getNormalization")
+		return 0.0, source.NewNoDataError("getNormalization")
 	}
 	if len(qrs[0].Values) == 0 {
-		return 0.0, prom.NoDataErr("getNormalization")
+		return 0.0, source.NewNoDataError("getNormalization")
 	}
 	return qrs[0].Values[0].Value, nil
 }
@@ -499,7 +498,7 @@ func getNormalization(qrs []*source.QueryResult) (float64, error) {
 // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
 func getNormalizations(qrs []*source.QueryResult) ([]*util.Vector, error) {
 	if len(qrs) == 0 {
-		return nil, prom.NoDataErr("getNormalizations")
+		return nil, source.NewNoDataError("getNormalizations")
 	}
 
 	return qrs[0].Values, nil

+ 4 - 4
pkg/costmodel/router.go

@@ -517,8 +517,8 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	fields := r.URL.Query().Get("filterFields")
 	namespace := r.URL.Query().Get("namespace")
 	cluster := r.URL.Query().Get("cluster")
-	remote := r.URL.Query().Get("remote")
-	remoteEnabled := env.IsRemoteEnabled() && remote != "false"
+	//remote := r.URL.Query().Get("remote")
+	//remoteEnabled := env.IsRemoteEnabled() && remote != "false"
 
 	layout := "2006-01-02T15:04:05.000Z"
 	start, err := time.Parse(layout, startStr)
@@ -1165,8 +1165,8 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	} else {
 		pc = promCli
 	}
-	costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
-	metricsEmitter := NewCostModelMetricsEmitter(dataSource, k8sCache, cloudProvider, clusterInfoProvider, costModel)
+	costModel := NewCostModel(dataSource, cloudProvider, k8sCache, clusterMap, dataSource.BatchDuration())
+	metricsEmitter := NewCostModelMetricsEmitter(k8sCache, cloudProvider, clusterInfoProvider, costModel)
 
 	a := &Accesses{
 		httpServices: services.NewCostModelServices(),