Просмотр исходного кода

Update all query with range to run through prom3 helper func, clean o… (#3182)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 11 месяцев назад
Родитель
Сommit
d888cb464f

+ 11 - 14
core/pkg/exporter/controller.go

@@ -97,13 +97,12 @@ func (cd *EventExportController[T]) Stop() {
 // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
 // to regularly compute the data for the current resolution and export it on a specific interval.
 type ComputeExportController[T any] struct {
-	runState         atomic.AtomicRunState
-	source           ComputeSource[T]
-	exporter         ComputeExporter[T]
-	resolution       time.Duration
-	sourceResolution time.Duration
-	lastExport       time.Time
-	typeName         string
+	runState   atomic.AtomicRunState
+	source     ComputeSource[T]
+	exporter   ComputeExporter[T]
+	resolution time.Duration
+	lastExport time.Time
+	typeName   string
 }
 
 // NewComputeExportController creates a new `ComputeExportController[T]` instance.
@@ -111,14 +110,12 @@ func NewComputeExportController[T any](
 	source ComputeSource[T],
 	exporter ComputeExporter[T],
 	resolution time.Duration,
-	sourceResolution time.Duration,
 ) *ComputeExportController[T] {
 	return &ComputeExportController[T]{
-		source:           source,
-		resolution:       resolution,
-		sourceResolution: sourceResolution,
-		exporter:         exporter,
-		typeName:         reflect.TypeOf((*T)(nil)).Elem().String(),
+		source:     source,
+		resolution: resolution,
+		exporter:   exporter,
+		typeName:   reflect.TypeOf((*T)(nil)).Elem().String(),
 	}
 }
 
@@ -225,7 +222,7 @@ func (cd *ComputeExportController[T]) export(window opencost.Window) error {
 		return fmt.Errorf("cannot compute window: [Start: %s, End: %s]", start, end)
 	}
 
-	set, err := cd.source.Compute(start, end, cd.sourceResolution)
+	set, err := cd.source.Compute(start, end)
 	// all errors but NoDataError are considered a halt to the export
 	if err != nil && !source.IsNoDataError(err) {
 		return err

+ 2 - 2
core/pkg/exporter/source.go

@@ -19,8 +19,8 @@ type ComputeSource[T any] interface {
 	// cannot compute and allow another Source to handle the computation.
 	CanCompute(start, end time.Time) bool
 
-	// Compute should compute a single T for the given time range, optionally using the given resolution.
-	Compute(start, end time.Time, resolution time.Duration) (*T, error)
+	// Compute should compute a single T for the given time range
+	Compute(start, end time.Time) (*T, error)
 
 	// Name returns the name of the ComputeSource
 	Name() string

+ 4 - 4
core/pkg/opencost/exporter/allocation/source.go

@@ -9,7 +9,7 @@ import (
 )
 
 type AllocationSource interface {
-	ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error)
+	ComputeAllocation(start, end time.Time) (*opencost.AllocationSet, error)
 }
 
 type AllocationComputeSource struct {
@@ -32,9 +32,9 @@ func (acs *AllocationComputeSource) CanCompute(start, end time.Time) bool {
 	return true
 }
 
-// Compute should compute a single T for the given time range, optionally using the given resolution.
-func (acs *AllocationComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
-	return acs.src.ComputeAllocation(start, end, resolution)
+// Compute should compute a single T for the given time range.
+func (acs *AllocationComputeSource) Compute(start, end time.Time) (*opencost.AllocationSet, error) {
+	return acs.src.ComputeAllocation(start, end)
 }
 
 // Name returns the name of the ComputeSource

+ 2 - 2
core/pkg/opencost/exporter/asset/source.go

@@ -32,8 +32,8 @@ func (acs *AssetsComputeSource) CanCompute(start, end time.Time) bool {
 	return true
 }
 
-// Compute should compute a single T for the given time range, optionally using the given resolution.
-func (acs *AssetsComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.AssetSet, error) {
+// Compute should compute a single T for the given time range.
+func (acs *AssetsComputeSource) Compute(start, end time.Time) (*opencost.AssetSet, error) {
 	return acs.src.ComputeAssets(start, end)
 }
 

+ 3 - 3
core/pkg/opencost/exporter/controllers.go

@@ -84,7 +84,7 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm Co
 			continue
 		}
 
-		allocController, err := NewComputePipelineExportController(clusterId, store, allocSource, res, sourceResolution)
+		allocController, err := NewComputePipelineExportController(clusterId, store, allocSource, res)
 		if err != nil {
 			log.Errorf("Failed to create allocation export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue
@@ -103,7 +103,7 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm Co
 			continue
 		}
 
-		assetController, err := NewComputePipelineExportController(clusterId, store, assetSource, res, sourceResolution)
+		assetController, err := NewComputePipelineExportController(clusterId, store, assetSource, res)
 		if err != nil {
 			log.Errorf("Failed to create asset export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue
@@ -122,7 +122,7 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm Co
 			continue
 		}
 
-		networkInsightController, err := NewComputePipelineExportController(clusterId, store, networkInsightSource, res, sourceResolution)
+		networkInsightController, err := NewComputePipelineExportController(clusterId, store, networkInsightSource, res)
 		if err != nil {
 			log.Errorf("Failed to create network insight export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue

+ 9 - 9
core/pkg/opencost/exporter/exporter_test.go

@@ -29,7 +29,7 @@ type MockSource[T any] struct {
 func (ms *MockSource[T]) CanCompute(start, end time.Time) bool {
 	return true
 }
-func (ms *MockSource[T]) Compute(start, end time.Time, resolution time.Duration) (*T, error) {
+func (ms *MockSource[T]) Compute(start, end time.Time) (*T, error) {
 	return ms.generate(start, end), nil
 }
 func (ms *MockSource[T]) Name() string {
@@ -105,14 +105,14 @@ func NewMockPipelineComputeSourceWith(srcResolution time.Duration) *MockPipeline
 	}
 }
 
-func (mpcs *MockPipelineComputeSource) ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
-	return mpcs.allocSource.Compute(start, end, resolution)
+func (mpcs *MockPipelineComputeSource) ComputeAllocation(start, end time.Time) (*opencost.AllocationSet, error) {
+	return mpcs.allocSource.Compute(start, end)
 }
 func (mpcs *MockPipelineComputeSource) ComputeAssets(start, end time.Time) (*opencost.AssetSet, error) {
-	return mpcs.assetSource.Compute(start, end, TestResolution)
+	return mpcs.assetSource.Compute(start, end)
 }
-func (mpcs *MockPipelineComputeSource) ComputeNetworkInsights(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
-	return mpcs.netSource.Compute(start, end, resolution)
+func (mpcs *MockPipelineComputeSource) ComputeNetworkInsights(start, end time.Time) (*opencost.NetworkInsightSet, error) {
+	return mpcs.netSource.Compute(start, end)
 }
 func (mpcs *MockPipelineComputeSource) GetDataSource() source.OpenCostDataSource {
 	return mpcs.ds
@@ -157,7 +157,7 @@ func TestExporters(t *testing.T) {
 		end := time.Now().UTC().Truncate(TestResolution)
 		start := end.Add(-TestResolution)
 
-		data, err := allocSource.Compute(start, end, TestResolution)
+		data, err := allocSource.Compute(start, end)
 		if err != nil {
 			t.Fatalf("failed to compute allocation data: %v", err)
 		}
@@ -186,7 +186,7 @@ func TestExporters(t *testing.T) {
 		end := time.Now().UTC().Truncate(TestResolution)
 		start := end.Add(-TestResolution)
 
-		data, err := assetSource.Compute(start, end, TestResolution)
+		data, err := assetSource.Compute(start, end)
 		if err != nil {
 			t.Fatalf("failed to compute asset data: %v", err)
 		}
@@ -215,7 +215,7 @@ func TestExporters(t *testing.T) {
 		end := time.Now().UTC().Truncate(TestResolution)
 		start := end.Add(-TestResolution)
 
-		data, err := netInsightSource.Compute(start, end, TestResolution)
+		data, err := netInsightSource.Compute(start, end)
 		if err != nil {
 			t.Fatalf("failed to compute net insights data: %v", err)
 		}

+ 1 - 2
core/pkg/opencost/exporter/exporters.go

@@ -44,12 +44,11 @@ func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S
 	store storage.Storage,
 	source export.ComputeSource[T],
 	resolution time.Duration,
-	sourceResolution time.Duration,
 ) (*export.ComputeExportController[T], error) {
 	exporter, err := NewComputePipelineExporter[T, U, S](clusterId, resolution, store)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create compute exporter: %w", err)
 	}
 
-	return export.NewComputeExportController(source, exporter, resolution, sourceResolution), nil
+	return export.NewComputeExportController(source, exporter, resolution), nil
 }

+ 3 - 3
core/pkg/opencost/exporter/networkinsight/source.go

@@ -9,7 +9,7 @@ import (
 )
 
 type NetworkInsightSource interface {
-	ComputeNetworkInsights(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error)
+	ComputeNetworkInsights(start, end time.Time) (*opencost.NetworkInsightSet, error)
 }
 
 type NetworkInsightsComputeSource struct {
@@ -33,8 +33,8 @@ func (acs *NetworkInsightsComputeSource) CanCompute(start, end time.Time) bool {
 }
 
 // Compute should compute a single T for the given time range, optionally using the given resolution.
-func (acs *NetworkInsightsComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
-	return acs.src.ComputeNetworkInsights(start, end, resolution)
+func (acs *NetworkInsightsComputeSource) Compute(start, end time.Time) (*opencost.NetworkInsightSet, error) {
+	return acs.src.ComputeNetworkInsights(start, end)
 }
 
 // Name returns the name of the ComputeSource

+ 28 - 24
modules/prometheus-source/pkg/prom/metricsquerier.go

@@ -91,19 +91,18 @@ func (pds *PrometheusMetricsQuerier) QueryPVUsedMax(start, end time.Time) *sourc
 }
 
 func (pds *PrometheusMetricsQuerier) QueryPVCInfo(start, end time.Time) *source.Future[source.PVCInfoResult] {
-	const queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
+	const queryFmtPVCInfo = `avg(kube_persistentvolumeclaim_info{volumename != "", %s}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%dm]`
 	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
 
 	cfg := pds.promConfig
-	resolution := cfg.DataResolution
-	resStr := timeutil.DurationString(resolution)
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryPVCInfo")
 	}
 
-	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodePVCInfoResult, ctx.QueryAtTime(queryPVCInfo, end))
 }
@@ -488,37 +487,35 @@ func (pds *PrometheusMetricsQuerier) QueryClusterManagementPricePerHr(start, end
 // AllocationMetricQuerier
 
 func (pds *PrometheusMetricsQuerier) QueryPods(start, end time.Time) *source.Future[source.PodsResult] {
-	const queryFmtPods = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
+	const queryFmtPods = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%dm]`
 	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
 
 	cfg := pds.promConfig
-	resolution := cfg.DataResolution
-	resStr := timeutil.DurationString(resolution)
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryPods")
 	}
 
-	queryPods := fmt.Sprintf(queryFmtPods, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	queryPods := fmt.Sprintf(queryFmtPods, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodePodsResult, ctx.QueryAtTime(queryPods, end))
 }
 
 func (pds *PrometheusMetricsQuerier) QueryPodsUID(start, end time.Time) *source.Future[source.PodsResult] {
-	const queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
+	const queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%dm]`
 	// env.GetPromClusterFilter(), env.GetPromClusterLabel(), durStr, resStr)
 
 	cfg := pds.promConfig
-	resolution := cfg.DataResolution
-	resStr := timeutil.DurationString(resolution)
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	durStr := timeutil.DurationString(end.Sub(start))
+	durStr := pds.durationStringFor(start, end, minsPerResolution)
 	if durStr == "" {
 		panic("failed to parse duration string passed to QueryPodsUID")
 	}
 
-	queryPodsUID := fmt.Sprintf(queryFmtPodsUID, cfg.ClusterFilter, cfg.ClusterLabel, durStr, resStr)
+	queryPodsUID := fmt.Sprintf(queryFmtPodsUID, cfg.ClusterFilter, cfg.ClusterLabel, durStr, minsPerResolution)
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	return source.NewFuture(source.DecodePodsResult, ctx.QueryAtTime(queryPodsUID, end))
 }
@@ -662,7 +659,7 @@ func (pds *PrometheusMetricsQuerier) QueryCPUUsageMax(start, end time.Time) *sou
 	// the resolution, to make sure the irate always has two points to query
 	// in case the Prom scrape duration has been reduced to be equal to the
 	// ETL resolution.
-	const queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%s])[%s:%s])) by (container, pod_name, pod, namespace, node, instance, %s)`
+	const queryFmtCPUUsageMaxSubquery = `max(max_over_time(irate(container_cpu_usage_seconds_total{container!="POD", container!="", %s}[%dm])[%s:%dm])) by (container, pod_name, pod, namespace, node, instance, %s)`
 	// env.GetPromClusterFilter(), doubleResStr, durStr, resStr, env.GetPromClusterLabel()
 
 	cfg := pds.promConfig
@@ -681,11 +678,14 @@ func (pds *PrometheusMetricsQuerier) QueryCPUUsageMax(start, end time.Time) *sou
 		return wrapResults(queryCPUUsageMaxRecordingRule, source.DecodeCPUUsageMaxResult, resCPUUsageMax)
 	}
 
-	resolution := cfg.DataResolution
-	resStr := timeutil.DurationString(resolution)
-	doubleResStr := timeutil.DurationString(2 * resolution)
+	minsPerResolution := cfg.DataResolutionMinutes
 
-	queryCPUUsageMaxSubquery := fmt.Sprintf(queryFmtCPUUsageMaxSubquery, cfg.ClusterFilter, doubleResStr, durStr, resStr, cfg.ClusterLabel)
+	durStr = pds.durationStringFor(start, end, minsPerResolution)
+	if durStr == "" {
+		panic("failed to parse duration string passed to QueryCPUUsageMax")
+	}
+
+	queryCPUUsageMaxSubquery := fmt.Sprintf(queryFmtCPUUsageMaxSubquery, cfg.ClusterFilter, 2*minsPerResolution, durStr, minsPerResolution, cfg.ClusterLabel)
 	return source.NewFuture(source.DecodeCPUUsageMaxResult, ctx.QueryAtTime(queryCPUUsageMaxSubquery, end))
 }
 
@@ -1319,12 +1319,16 @@ func (pds *PrometheusMetricsQuerier) QueryDataCoverage(limitDays int) (time.Time
 	)
 
 	cfg := pds.promConfig
-	now := time.Now()
-	durStr := fmt.Sprintf("%dd", limitDays)
+	minutesPerDuration := 60
+	dur := time.Duration(limitDays) * timeutil.Day
+	end := time.Now().UTC().Truncate(timeutil.Day).Add(timeutil.Day)
+	start := end.Add(-dur)
+
+	durStr := pds.durationStringFor(start, end, minutesPerDuration)
 
 	ctx := pds.promContexts.NewNamedContext(AllocationContextName)
 	queryOldest := fmt.Sprintf(queryFmtOldestSample, cfg.ClusterFilter, durStr, "1h")
-	resOldestFut := ctx.QueryAtTime(queryOldest, now)
+	resOldestFut := ctx.QueryAtTime(queryOldest, end)
 
 	resOldest, err := resOldestFut.Await()
 	if err != nil {
@@ -1337,7 +1341,7 @@ func (pds *PrometheusMetricsQuerier) QueryDataCoverage(limitDays int) (time.Time
 	oldest := time.Unix(int64(resOldest[0].Values[0].Value), 0)
 
 	queryNewest := fmt.Sprintf(queryFmtNewestSample, cfg.ClusterFilter, durStr, "1h")
-	resNewestFut := ctx.QueryAtTime(queryNewest, now)
+	resNewestFut := ctx.QueryAtTime(queryNewest, end)
 
 	resNewest, err := resNewestFut.Await()
 	if err != nil {

+ 2 - 10
pkg/costmodel/aggregation.go

@@ -74,10 +74,6 @@ func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *htt
 	// computed. Defaults to the window size, making one set.
 	step := qp.GetDuration("step", window.Duration())
 
-	// Resolution is an optional parameter, defaulting to the configured ETL
-	// resolution.
-	resolution := qp.GetDuration("resolution", env.GetETLResolution())
-
 	// Aggregation is a required comma-separated list of fields by which to
 	// aggregate results. Some fields allow a sub-field, which is distinguished
 	// with a colon; e.g. "label:app".
@@ -103,7 +99,7 @@ func (a *Accesses) ComputeAllocationHandlerSummary(w http.ResponseWriter, r *htt
 		stepEnd := stepStart.Add(step)
 		stepWindow := opencost.NewWindow(&stepStart, &stepEnd)
 
-		as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End(), resolution)
+		as, err := a.Model.ComputeAllocation(*stepWindow.Start(), *stepWindow.End())
 		if err != nil {
 			proto.WriteError(w, proto.InternalServerError(err.Error()))
 			return
@@ -183,10 +179,6 @@ func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Reque
 		http.Error(w, fmt.Sprintf("Invalid 'window' parameter: %s", err), http.StatusBadRequest)
 	}
 
-	// Resolution is an optional parameter, defaulting to the configured ETL
-	// resolution.
-	resolution := qp.GetDuration("resolution", env.GetETLResolution())
-
 	// Step is an optional parameter that defines the duration per-set, i.e.
 	// the window for an AllocationSet, of the AllocationSetRange to be
 	// computed. Defaults to the window size, making one set.
@@ -235,7 +227,7 @@ func (a *Accesses) ComputeAllocationHandler(w http.ResponseWriter, r *http.Reque
 	// Get allocation filter if provided
 	allocationFilter := qp.Get("filter", "")
 
-	asr, err := a.Model.QueryAllocation(window, resolution, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, accumulateBy, shareIdle)
+	asr, err := a.Model.QueryAllocation(window, step, aggregateBy, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer, accumulateBy, shareIdle)
 	if err != nil {
 		if strings.Contains(strings.ToLower(err.Error()), "bad request") {
 			proto.WriteError(w, proto.BadRequest(err.Error()))

+ 6 - 13
pkg/costmodel/allocation.go

@@ -29,11 +29,11 @@ func (cm *CostModel) Name() string {
 // ComputeAllocation uses the CostModel instance to compute an AllocationSet
 // for the window defined by the given start and end times. The Allocations
 // returned are unaggregated (i.e. down to the container level).
-func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
+func (cm *CostModel) ComputeAllocation(start, end time.Time) (*opencost.AllocationSet, error) {
 
 	// If the duration is short enough, compute the AllocationSet directly
 	if end.Sub(start) <= cm.BatchDuration {
-		as, _, err := cm.computeAllocation(start, end, resolution)
+		as, _, err := cm.computeAllocation(start, end)
 		return as, err
 	}
 
@@ -61,7 +61,7 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 		e = s.Add(duration)
 
 		// Compute the individual AllocationSet for just (s, e)
-		as, _, err := cm.computeAllocation(s, e, resolution)
+		as, _, err := cm.computeAllocation(s, e)
 		if err != nil {
 			return opencost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", opencost.NewClosedWindow(s, e), err)
 		}
@@ -216,10 +216,11 @@ func (cm *CostModel) DateRange(limitDays int) (time.Time, time.Time, error) {
 	return cm.DataSource.Metrics().QueryDataCoverage(limitDays)
 }
 
-func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, map[nodeKey]*nodePricing, error) {
+func (cm *CostModel) computeAllocation(start, end time.Time) (*opencost.AllocationSet, map[nodeKey]*nodePricing, error) {
 	// 1. Build out Pod map from resolution-tuned, batched Pod start/end query
 	// 2. Run and apply the results of the remaining queries to
 	// 3. Build out AllocationSet from completed Pod map
+	resolution := cm.DataSource.Resolution()
 
 	// Create a window spanning the requested query
 	window := opencost.NewWindow(&start, &end)
@@ -237,13 +238,6 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	// totals from measured rate data.
 	podMap := map[podKey]*pod{}
 
-	// clusterStarts and clusterEnds record the earliest start and latest end
-	// times, respectively, on a cluster-basis. These are used for unmounted
-	// PVs and other "virtual" Allocations so that minutes are maximally
-	// accurate during start-up or spin-down of a cluster
-	clusterStart := map[string]time.Time{}
-	clusterEnd := map[string]time.Time{}
-
 	// If ingesting pod UID, we query kube_pod_container_status_running avg
 	// by uid as well as the default values, and all podKeys/pods have their
 	// names changed to "<pod_name> <pod_uid>". Because other metrics need
@@ -263,8 +257,7 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 		log.Debugf("CostModel.ComputeAllocation: ingesting UID data from KSM metrics...")
 	}
 
-	// TODO:CLEANUP remove "max batch" idea and clusterStart/End
-	err := cm.buildPodMap(window, cm.BatchDuration, podMap, clusterStart, clusterEnd, ingestPodUID, podUIDKeyMap)
+	err := cm.buildPodMap(window, podMap, ingestPodUID, podUIDKeyMap)
 	if err != nil {
 		log.Errorf("CostModel.ComputeAllocation: failed to build pod map: %s", err.Error())
 	}

+ 44 - 82
pkg/costmodel/allocation_helpers.go

@@ -11,7 +11,6 @@ import (
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/util"
-	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/cloud/provider"
 	"github.com/opencost/opencost/pkg/env"
 	"k8s.io/apimachinery/pkg/labels"
@@ -38,7 +37,7 @@ const (
 
 /* Pod Helpers */
 
-func (cm *CostModel) buildPodMap(window opencost.Window, maxBatchSize time.Duration, podMap map[podKey]*pod, clusterStart, clusterEnd map[string]time.Time, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) error {
+func (cm *CostModel) buildPodMap(window opencost.Window, podMap map[podKey]*pod, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) error {
 	// Assumes that window is positive and closed
 	start, end := *window.Start(), *window.End()
 
@@ -46,91 +45,61 @@ func (cm *CostModel) buildPodMap(window opencost.Window, maxBatchSize time.Durat
 	ds := cm.DataSource.Metrics()
 	resolution := cm.DataSource.Resolution()
 
-	// Query for (start, end) by (pod, namespace, cluster) over the given
-	// window, using the given resolution, and if necessary in batches no
-	// larger than the given maximum batch size. If working in batches, track
-	// overall progress by starting with (window.start, window.start) and
-	// querying in batches no larger than maxBatchSize from start-to-end,
-	// folding each result set into podMap as the results come back.
-	coverage := opencost.NewWindow(&start, &start)
-
-	numQuery := 1
-	for coverage.End().Before(end) {
-		// Determine the (start, end) of the current batch
-		batchStart := *coverage.End()
-		batchEnd := coverage.End().Add(maxBatchSize)
-		if batchEnd.After(end) {
-			batchEnd = end
-		}
-
-		var resPods []*source.PodsResult
-		var err error
-		maxTries := 3
-		numTries := 0
-		for resPods == nil && numTries < maxTries {
-			numTries++
-
-			// Query for the duration between start and end
-			durStr := timeutil.DurationString(batchEnd.Sub(batchStart))
-			if durStr == "" {
-				// Negative duration, so set empty results and don't query
-				resPods = []*source.PodsResult{}
-				err = nil
-				break
-			}
+	var resPods []*source.PodsResult
+	var err error
+	maxTries := 3
+	numTries := 0
+	for resPods == nil && numTries < maxTries {
+		numTries++
 
-			// Submit and profile query
+		// Submit and profile query
 
-			var queryPodsResult *source.QueryGroupFuture[source.PodsResult]
-			if ingestPodUID {
-				queryPodsResult = source.WithGroup(grp, ds.QueryPodsUID(batchStart, batchEnd))
-			} else {
-				queryPodsResult = source.WithGroup(grp, ds.QueryPods(batchStart, batchEnd))
-			}
-
-			queryProfile := time.Now()
-			resPods, err = queryPodsResult.Await()
-			if err != nil {
-				log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, err))
-				resPods = nil
-			}
+		var queryPodsResult *source.QueryGroupFuture[source.PodsResult]
+		if ingestPodUID {
+			queryPodsResult = source.WithGroup(grp, ds.QueryPodsUID(start, end))
+		} else {
+			queryPodsResult = source.WithGroup(grp, ds.QueryPods(start, end))
 		}
 
+		queryProfile := time.Now()
+		resPods, err = queryPodsResult.Await()
 		if err != nil {
-			return err
+			log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query try %d failed: %s", numTries, err))
+			resPods = nil
 		}
+	}
 
-		// queryFmtPodsUID will return both UID-containing results, and non-UID-containing results,
-		// so filter out the non-containing results so we don't duplicate pods. This is due to the
-		// default setup of Kubecost having replicated kube_pod_container_status_running and
-		// included KSM kube_pod_container_status_running. Querying w/ UID will return both.
-		if ingestPodUID {
-			var resPodsUID []*source.PodsResult
+	if err != nil {
+		return err
+	}
 
-			for _, res := range resPods {
-				uid := res.UID
-				if uid != "" {
-					resPodsUID = append(resPodsUID, res)
-				}
-			}
+	// queryFmtPodsUID will return both UID-containing results, and non-UID-containing results,
+	// so filter out the non-containing results so we don't duplicate pods. This is due to the
+	// default setup of Kubecost having replicated kube_pod_container_status_running and
+	// included KSM kube_pod_container_status_running. Querying w/ UID will return both.
+	if ingestPodUID {
+		var resPodsUID []*source.PodsResult
 
-			if len(resPodsUID) > 0 {
-				resPods = resPodsUID
-			} else {
-				log.DedupedWarningf(5, "CostModel.ComputeAllocation: UID ingestion enabled, but query did not return any results with UID")
+		for _, res := range resPods {
+			uid := res.UID
+			if uid != "" {
+				resPodsUID = append(resPodsUID, res)
 			}
 		}
 
-		applyPodResults(window, resolution, podMap, clusterStart, clusterEnd, resPods, ingestPodUID, podUIDKeyMap)
-
-		coverage = coverage.ExpandEnd(batchEnd)
-		numQuery++
+		if len(resPodsUID) > 0 {
+			resPods = resPodsUID
+		} else {
+			log.DedupedWarningf(5, "CostModel.ComputeAllocation: UID ingestion enabled, but query did not return any results with UID")
+		}
 	}
 
+	applyPodResults(window, resolution, podMap, resPods, ingestPodUID, podUIDKeyMap)
+
 	return nil
 }
 
-func applyPodResults(window opencost.Window, resolution time.Duration, podMap map[podKey]*pod, clusterStart, clusterEnd map[string]time.Time, resPods []*source.PodsResult, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) {
+func applyPodResults(window opencost.Window, resolution time.Duration, podMap map[podKey]*pod, resPods []*source.PodsResult, ingestPodUID bool, podUIDKeyMap map[podKey][]podKey) {
 	for _, res := range resPods {
 		if len(res.Data) == 0 {
 			log.Warnf("CostModel.ComputeAllocation: empty minutes result")
@@ -177,18 +146,6 @@ func applyPodResults(window opencost.Window, resolution time.Duration, podMap ma
 			continue
 		}
 
-		// Set start if unset or this datum's start time is earlier than the
-		// current earliest time.
-		if _, ok := clusterStart[cluster]; !ok || allocStart.Before(clusterStart[cluster]) {
-			clusterStart[cluster] = allocStart
-		}
-
-		// Set end if unset or this datum's end time is later than the
-		// current latest time.
-		if _, ok := clusterEnd[cluster]; !ok || allocEnd.After(clusterEnd[cluster]) {
-			clusterEnd[cluster] = allocEnd
-		}
-
 		if thisPod, ok := podMap[key]; ok {
 			// Pod has already been recorded, so update it accordingly
 			if allocStart.Before(thisPod.Start) {
@@ -2480,6 +2437,11 @@ func calculateStartAndEnd(result []*util.Vector, resolution time.Duration, windo
 	if e.After(*window.End()) {
 		e = *window.End()
 	}
+	// prevent end times in the future
+	now := time.Now().UTC()
+	if e.After(now) {
+		e = now
+	}
 
 	return s, e
 }

+ 4 - 4
pkg/costmodel/cluster.go

@@ -93,7 +93,7 @@ type DiskIdentifier struct {
 }
 
 func ClusterDisks(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
-	resolution := env.GetETLResolution()
+	resolution := dataSource.Resolution()
 
 	grp := source.NewQueryGroup()
 	mq := dataSource.Metrics()
@@ -501,7 +501,7 @@ func costTimesMinute[T comparable](activeDataMap map[T]activeData, costMap map[T
 
 func ClusterNodes(dataSource source.OpenCostDataSource, cp models.Provider, start, end time.Time) (map[NodeIdentifier]*Node, error) {
 	mq := dataSource.Metrics()
-	resolution := env.GetETLResolution()
+	resolution := dataSource.Resolution()
 
 	requiredGrp := source.NewQueryGroup()
 	optionalGrp := source.NewQueryGroup()
@@ -642,7 +642,7 @@ type LoadBalancer struct {
 }
 
 func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
-	resolution := env.GetETLResolution()
+	resolution := dataSource.Resolution()
 
 	grp := source.NewQueryGroup()
 	mq := dataSource.Metrics()
@@ -697,7 +697,7 @@ func ClusterLoadBalancers(dataSource source.OpenCostDataSource, start, end time.
 }
 
 func ClusterManagement(dataSource source.OpenCostDataSource, start, end time.Time) (map[ClusterManagementIdentifier]*ClusterManagementCost, error) {
-	resolution := env.GetETLResolution()
+	resolution := dataSource.Resolution()
 
 	grp := source.NewQueryGroup()
 	mq := dataSource.Metrics()

+ 2 - 2
pkg/costmodel/costmodel.go

@@ -1539,7 +1539,7 @@ func measureTime(start time.Time, threshold time.Duration, name string) {
 	}
 }
 
-func (cm *CostModel) QueryAllocation(window opencost.Window, resolution, step time.Duration, aggregate []string, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer bool, accumulateBy opencost.AccumulateOption, shareIdle bool) (*opencost.AllocationSetRange, error) {
+func (cm *CostModel) QueryAllocation(window opencost.Window, step time.Duration, aggregate []string, includeIdle, idleByNode, includeProportionalAssetResourceCosts, includeAggregatedMetadata, sharedLoadBalancer bool, accumulateBy opencost.AccumulateOption, shareIdle bool) (*opencost.AllocationSetRange, error) {
 	// Validate window is legal
 	if window.IsOpen() || window.IsNegative() {
 		return nil, fmt.Errorf("illegal window: %s", window)
@@ -1563,7 +1563,7 @@ func (cm *CostModel) QueryAllocation(window opencost.Window, resolution, step ti
 	stepEnd := stepStart.Add(step)
 	var isAKS bool
 	for window.End().After(stepStart) {
-		allocSet, err := cm.ComputeAllocation(stepStart, stepEnd, resolution)
+		allocSet, err := cm.ComputeAllocation(stepStart, stepEnd)
 		if err != nil {
 			return nil, fmt.Errorf("error computing allocations for %s: %w", opencost.NewClosedWindow(stepStart, stepEnd), err)
 		}

+ 2 - 2
pkg/costmodel/csv_export.go

@@ -19,7 +19,7 @@ import (
 )
 
 type AllocationModel interface {
-	ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error)
+	ComputeAllocation(start, end time.Time) (*opencost.AllocationSet, error)
 	DateRange(limitDays int) (time.Time, time.Time, error)
 }
 
@@ -328,7 +328,7 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates [
 	for _, date := range dates {
 		start := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, time.UTC)
 		end := start.AddDate(0, 0, 1)
-		data, err := e.Model.ComputeAllocation(start, end, 5*time.Minute)
+		data, err := e.Model.ComputeAllocation(start, end)
 		if err != nil {
 			return err
 		}

+ 4 - 4
pkg/costmodel/csv_export_test.go

@@ -22,7 +22,7 @@ func Test_UpdateCSV(t *testing.T) {
 			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), nil
 			},
-			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
+			ComputeAllocationFunc: func(start time.Time, end time.Time) (*opencost.AllocationSet, error) {
 				return &opencost.AllocationSet{
 					Allocations: map[string]*opencost.Allocation{
 						"test": {
@@ -77,7 +77,7 @@ func Test_UpdateCSV(t *testing.T) {
 			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), nil
 			},
-			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
+			ComputeAllocationFunc: func(start time.Time, end time.Time) (*opencost.AllocationSet, error) {
 				return &opencost.AllocationSet{
 					Allocations: map[string]*opencost.Allocation{
 						"test": {
@@ -116,7 +116,7 @@ func Test_UpdateCSV(t *testing.T) {
 			DateRangeFunc: func(_ int) (time.Time, time.Time, error) {
 				return time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), nil
 			},
-			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
+			ComputeAllocationFunc: func(start time.Time, end time.Time) (*opencost.AllocationSet, error) {
 				return &opencost.AllocationSet{
 					Allocations: map[string]*opencost.Allocation{
 						"test": {
@@ -163,7 +163,7 @@ func Test_UpdateCSV(t *testing.T) {
 
 	t.Run("allocation data is empty", func(t *testing.T) {
 		model := &AllocationModelMock{
-			ComputeAllocationFunc: func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
+			ComputeAllocationFunc: func(start time.Time, end time.Time) (*opencost.AllocationSet, error) {
 				return &opencost.AllocationSet{
 					Allocations: nil,
 				}, nil

+ 3 - 9
pkg/costmodel/moq_allocation_model_test.go

@@ -33,7 +33,7 @@ var _ AllocationModel = &AllocationModelMock{}
 //	}
 type AllocationModelMock struct {
 	// ComputeAllocationFunc mocks the ComputeAllocation method.
-	ComputeAllocationFunc func(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error)
+	ComputeAllocationFunc func(start time.Time, end time.Time) (*opencost.AllocationSet, error)
 
 	// DateRangeFunc mocks the DateRange method.
 	DateRangeFunc func(limitDays int) (time.Time, time.Time, error)
@@ -46,8 +46,6 @@ type AllocationModelMock struct {
 			Start time.Time
 			// End is the end argument value.
 			End time.Time
-			// Resolution is the resolution argument value.
-			Resolution time.Duration
 		}
 		// DateRange holds details about calls to the DateRange method.
 		DateRange []struct {
@@ -60,23 +58,21 @@ type AllocationModelMock struct {
 }
 
 // ComputeAllocation calls ComputeAllocationFunc.
-func (mock *AllocationModelMock) ComputeAllocation(start time.Time, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
+func (mock *AllocationModelMock) ComputeAllocation(start time.Time, end time.Time) (*opencost.AllocationSet, error) {
 	if mock.ComputeAllocationFunc == nil {
 		panic("AllocationModelMock.ComputeAllocationFunc: method is nil but AllocationModel.ComputeAllocation was just called")
 	}
 	callInfo := struct {
 		Start      time.Time
 		End        time.Time
-		Resolution time.Duration
 	}{
 		Start:      start,
 		End:        end,
-		Resolution: resolution,
 	}
 	mock.lockComputeAllocation.Lock()
 	mock.calls.ComputeAllocation = append(mock.calls.ComputeAllocation, callInfo)
 	mock.lockComputeAllocation.Unlock()
-	return mock.ComputeAllocationFunc(start, end, resolution)
+	return mock.ComputeAllocationFunc(start, end)
 }
 
 // ComputeAllocationCalls gets all the calls that were made to ComputeAllocation.
@@ -86,12 +82,10 @@ func (mock *AllocationModelMock) ComputeAllocation(start time.Time, end time.Tim
 func (mock *AllocationModelMock) ComputeAllocationCalls() []struct {
 	Start      time.Time
 	End        time.Time
-	Resolution time.Duration
 } {
 	var calls []struct {
 		Start      time.Time
 		End        time.Time
-		Resolution time.Duration
 	}
 	mock.lockComputeAllocation.RLock()
 	calls = mock.calls.ComputeAllocation

+ 1 - 1
pkg/costmodel/networkinsight.go

@@ -10,7 +10,7 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 )
 
-func (cm *CostModel) ComputeNetworkInsights(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
+func (cm *CostModel) ComputeNetworkInsights(start, end time.Time) (*opencost.NetworkInsightSet, error) {
 	log.Debugf("Network Insight compute called on CostModel for window  %s", opencost.NewClosedWindow(start, end).String())
 
 	// If the duration is short enough, compute the network insight directly

+ 0 - 17
pkg/env/costmodelenv.go

@@ -61,9 +61,6 @@ const (
 
 	UTCOffsetEnvVar = "UTC_OFFSET"
 
-	ETLEnabledEnvVar     = "ETL_ENABLED"
-	ETLResolutionSeconds = "ETL_RESOLUTION_SECONDS"
-
 	PricingConfigmapName = "PRICING_CONFIGMAP_NAME"
 	MetricsConfigmapName = "METRICS_CONFIGMAP_NAME"
 
@@ -418,20 +415,6 @@ func GetParsedUTCOffset() time.Duration {
 	return offset
 }
 
-func IsETLEnabled() bool {
-	return env.GetBool(ETLEnabledEnvVar, true)
-}
-
-// GetETLResolution determines the resolution of ETL queries. The smaller the
-// duration, the higher the resolution; the higher the resolution, the more
-// accurate the query results, but the more computationally expensive.
-func GetETLResolution() time.Duration {
-	// Use the configured ETL resolution, or default to
-	// 5m (i.e. 300s)
-	secs := time.Duration(env.GetInt64(ETLResolutionSeconds, 300))
-	return secs * time.Second
-}
-
 // IsIngestingPodUID returns the env variable from ingestPodUID, which alters the
 // contents of podKeys in Allocation
 func IsIngestingPodUID() bool {