Просмотр исходного кода

Spike solution of ETL Allocation hourly build/rebuild with accumulation; convert Allocation queries to use QueryAtTime; guard ClusterNodes, Disks, and LoadBalancers from bad input

Niko Kovacevic 4 лет назад
Родитель
Сommit
bcdca0470f
5 измененных файлов с 338 добавлено и 170 удалено
  1. 295 135
      pkg/costmodel/allocation.go
  2. 10 0
      pkg/costmodel/cluster.go
  3. 15 13
      pkg/costmodel/costmodel.go
  4. 9 13
      pkg/env/costmodelenv.go
  5. 9 9
      pkg/kubecost/status.go

+ 295 - 135
pkg/costmodel/allocation.go

@@ -18,47 +18,47 @@ import (
 )
 
 const (
-	queryFmtPods                     = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]%s`
-	queryFmtRAMBytesAllocated        = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s, provider_id)`
-	queryFmtRAMRequests              = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
-	queryFmtRAMUsageAvg              = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtRAMUsageMax              = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtCPUCoresAllocated        = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
-	queryFmtCPURequests              = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
-	queryFmtCPUUsageAvg              = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtCPUUsageMax              = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s]%s)) by (container_name, container, pod_name, pod, namespace, instance, %s)`
-	queryFmtGPUsRequested            = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
-	queryFmtGPUsAllocated            = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s]%s)) by (container, pod, namespace, node, %s)`
-	queryFmtNodeCostPerCPUHr         = `avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeCostPerRAMGiBHr      = `avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeCostPerGPUHr         = `avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (node, %s, instance_type, provider_id)`
-	queryFmtNodeIsSpot               = `avg_over_time(kubecost_node_is_spot[%s]%s)`
-	queryFmtPVCInfo                  = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]%s`
-	queryFmtPVBytes                  = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (persistentvolume, %s)`
-	queryFmtPodPVCAllocation         = `avg(avg_over_time(pod_pvc_allocation[%s]%s)) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
-	queryFmtPVCBytesRequested        = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s]%s)) by (persistentvolumeclaim, namespace, %s)`
-	queryFmtPVCostPerGiBHour         = `avg(avg_over_time(pv_hourly_cost[%s]%s)) by (volumename, %s)`
-	queryFmtNetZoneGiB               = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetZoneCostPerGiB        = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s]%s)) by (%s)`
-	queryFmtNetRegionGiB             = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetRegionCostPerGiB      = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s]%s)) by (%s)`
-	queryFmtNetInternetGiB           = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s]%s)) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
-	queryFmtNetInternetCostPerGiB    = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s]%s)) by (%s)`
-	queryFmtNetReceiveBytes          = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
-	queryFmtNetTransferBytes         = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s]%s)) by (pod_name, pod, namespace, %s)`
-	queryFmtNamespaceLabels          = `avg_over_time(kube_namespace_labels[%s]%s)`
-	queryFmtNamespaceAnnotations     = `avg_over_time(kube_namespace_annotations[%s]%s)`
-	queryFmtPodLabels                = `avg_over_time(kube_pod_labels[%s]%s)`
-	queryFmtPodAnnotations           = `avg_over_time(kube_pod_annotations[%s]%s)`
-	queryFmtServiceLabels            = `avg_over_time(service_selector_labels[%s]%s)`
-	queryFmtDeploymentLabels         = `avg_over_time(deployment_match_labels[%s]%s)`
-	queryFmtStatefulSetLabels        = `avg_over_time(statefulSet_match_labels[%s]%s)`
-	queryFmtDaemonSetLabels          = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s]%s)) by (pod, owner_name, namespace, %s)`
-	queryFmtJobLabels                = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
-	queryFmtPodsWithReplicaSetOwner  = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s]%s)) by (pod, owner_name, namespace ,%s)`
-	queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s]%s)) by (replicaset, namespace, %s)`
-	queryFmtLBCostPerHr              = `avg(avg_over_time(kubecost_load_balancer_cost[%s]%s)) by (namespace, service_name, %s)`
-	queryFmtLBActiveMins             = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]%s`
+	queryFmtPods                     = `avg(kube_pod_container_status_running{}) by (pod, namespace, %s)[%s:%s]`
+	queryFmtRAMBytesAllocated        = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s, provider_id)`
+	queryFmtRAMRequests              = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtRAMUsageAvg              = `avg(avg_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	queryFmtRAMUsageMax              = `max(max_over_time(container_memory_working_set_bytes{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	queryFmtCPUCoresAllocated        = `avg(avg_over_time(container_cpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtCPURequests              = `avg(avg_over_time(kube_pod_container_resource_requests{resource="cpu", unit="core", container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtCPUUsageAvg              = `avg(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	queryFmtCPUUsageMax              = `max(rate(container_cpu_usage_seconds_total{container!="", container_name!="POD", container!="POD"}[%s])) by (container_name, container, pod_name, pod, namespace, instance, %s)`
+	queryFmtGPUsRequested            = `avg(avg_over_time(kube_pod_container_resource_requests{resource="nvidia_com_gpu", container!="",container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtGPUsAllocated            = `avg(avg_over_time(container_gpu_allocation{container!="", container!="POD", node!=""}[%s])) by (container, pod, namespace, node, %s)`
+	queryFmtNodeCostPerCPUHr         = `avg(avg_over_time(node_cpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeCostPerRAMGiBHr      = `avg(avg_over_time(node_ram_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeCostPerGPUHr         = `avg(avg_over_time(node_gpu_hourly_cost[%s])) by (node, %s, instance_type, provider_id)`
+	queryFmtNodeIsSpot               = `avg_over_time(kubecost_node_is_spot[%s])`
+	queryFmtPVCInfo                  = `avg(kube_persistentvolumeclaim_info{volumename != ""}) by (persistentvolumeclaim, storageclass, volumename, namespace, %s)[%s:%s]`
+	queryFmtPVBytes                  = `avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (persistentvolume, %s)`
+	queryFmtPodPVCAllocation         = `avg(avg_over_time(pod_pvc_allocation[%s])) by (persistentvolume, persistentvolumeclaim, pod, namespace, %s)`
+	queryFmtPVCBytesRequested        = `avg(avg_over_time(kube_persistentvolumeclaim_resource_requests_storage_bytes{}[%s])) by (persistentvolumeclaim, namespace, %s)`
+	queryFmtPVCostPerGiBHour         = `avg(avg_over_time(pv_hourly_cost[%s])) by (volumename, %s)`
+	queryFmtNetZoneGiB               = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetZoneCostPerGiB        = `avg(avg_over_time(kubecost_network_zone_egress_cost{}[%s])) by (%s)`
+	queryFmtNetRegionGiB             = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetRegionCostPerGiB      = `avg(avg_over_time(kubecost_network_region_egress_cost{}[%s])) by (%s)`
+	queryFmtNetInternetGiB           = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s])) by (pod_name, namespace, %s) / 1024 / 1024 / 1024`
+	queryFmtNetInternetCostPerGiB    = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
+	queryFmtNetReceiveBytes          = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
+	queryFmtNetTransferBytes         = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
+	queryFmtNamespaceLabels          = `avg_over_time(kube_namespace_labels[%s])`
+	queryFmtNamespaceAnnotations     = `avg_over_time(kube_namespace_annotations[%s])`
+	queryFmtPodLabels                = `avg_over_time(kube_pod_labels[%s])`
+	queryFmtPodAnnotations           = `avg_over_time(kube_pod_annotations[%s])`
+	queryFmtServiceLabels            = `avg_over_time(service_selector_labels[%s])`
+	queryFmtDeploymentLabels         = `avg_over_time(deployment_match_labels[%s])`
+	queryFmtStatefulSetLabels        = `avg_over_time(statefulSet_match_labels[%s])`
+	queryFmtDaemonSetLabels          = `sum(avg_over_time(kube_pod_owner{owner_kind="DaemonSet"}[%s])) by (pod, owner_name, namespace, %s)`
+	queryFmtJobLabels                = `sum(avg_over_time(kube_pod_owner{owner_kind="Job"}[%s])) by (pod, owner_name, namespace ,%s)`
+	queryFmtPodsWithReplicaSetOwner  = `sum(avg_over_time(kube_pod_owner{owner_kind="ReplicaSet"}[%s])) by (pod, owner_name, namespace ,%s)`
+	queryFmtReplicaSetsWithoutOwners = `avg(avg_over_time(kube_replicaset_owner{owner_kind="<none>", owner_name="<none>"}[%s])) by (replicaset, namespace, %s)`
+	queryFmtLBCostPerHr              = `avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s)`
+	queryFmtLBActiveMins             = `count(kubecost_load_balancer_cost) by (namespace, service_name, %s)[%s:%s]`
 )
 
 // This is a bit of a hack to work around garbage data from cadvisor
@@ -83,6 +83,169 @@ func (cm *CostModel) Name() string {
 // for the window defined by the given start and end times. The Allocations
 // returned are unaggregated (i.e. down to the container level).
 func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
+	// If the duration is short enough, compute the AllocationSet directly
+	if end.Sub(start) <= cm.MaxPrometheusQueryDuration {
+		return cm.computeAllocation(start, end, resolution)
+	}
+
+	// If the duration exceeds the configured MaxPrometheusQueryDuration, then
+	// query for maximum-sized AllocationSets, collect them, and accumulate.
+
+	// s and e track the coverage of the entire given window over multiple
+	// internal queries.
+	s, e := start, start
+
+	// Collect AllocationSets in a range, then accumulate
+	// TODO optimize by collecting consecutive AllocationSets, accumulating as we go
+	asr := kubecost.NewAllocationSetRange()
+
+	for e.Before(end) {
+		// By default, query for the full remaining duration. But do not let
+		// any individual query duration exceed the configured max Prometheus
+		// query duration.
+		duration := end.Sub(e)
+		if duration > cm.MaxPrometheusQueryDuration {
+			duration = cm.MaxPrometheusQueryDuration
+		}
+
+		// Set start and end parameters (s, e) for next individual computation.
+		e = s.Add(duration)
+
+		// Compute the individual AllocationSet for just (s, e)
+		as, err := cm.computeAllocation(s, e, resolution)
+		if err != nil {
+			return kubecost.NewAllocationSet(start, end), fmt.Errorf("error computing allocation for %s: %s", kubecost.NewClosedWindow(s, e), err)
+		}
+
+		// TODO remove log
+		log.Infof("[Hourly] computeAllocation => AllocationSet{ %s window, %d allocations, %.5f total cost }", as.Window, as.Length(), as.TotalCost())
+
+		// Append to the range
+		asr.Append(as)
+
+		// Set s equal to e to set up the next query, if one exists.
+		s = e
+	}
+
+	// TODO remove log
+	log.Infof("[Hourly] ComputeAllocation(%s) => AllocationSetRange{ %s window, %d sets, %.5f total cost }", kubecost.NewClosedWindow(start, end), asr.Window(), asr.Length(), asr.TotalCost())
+
+	// Populate annotations, labels, and services on each Allocation. This is
+	// necessary because Properties.Intersection does not propagate any values
+	// stored in maps or slices for performance reasons. In this case, however,
+	// it is both acceptable and necessary to do so.
+	allocationAnnotations := map[string]map[string]string{}
+	allocationLabels := map[string]map[string]string{}
+	allocationServices := map[string]map[string]bool{}
+
+	asr.Each(func(i int, as *kubecost.AllocationSet) {
+		as.Each(func(k string, a *kubecost.Allocation) {
+			if len(a.Properties.Annotations) > 0 {
+				if _, ok := allocationAnnotations[k]; !ok {
+					allocationAnnotations[k] = map[string]string{}
+				}
+				for name, val := range a.Properties.Annotations {
+					allocationAnnotations[k][name] = val
+				}
+			}
+
+			if len(a.Properties.Labels) > 0 {
+				if _, ok := allocationLabels[k]; !ok {
+					allocationLabels[k] = map[string]string{}
+				}
+				for name, val := range a.Properties.Labels {
+					allocationLabels[k][name] = val
+				}
+			}
+
+			if len(a.Properties.Services) > 0 {
+				if _, ok := allocationServices[k]; !ok {
+					allocationServices[k] = map[string]bool{}
+				}
+				for _, val := range a.Properties.Services {
+					allocationServices[k][val] = true
+				}
+			}
+		})
+	})
+
+	// Accumulate to yield the result AllocationSet. After this step, we will
+	// be nearly complete, but without the raw allocation data, which must be
+	// recomputed.
+	result, err := asr.Accumulate()
+	if err != nil {
+		return kubecost.NewAllocationSet(start, end), fmt.Errorf("error accumulating data for %s: %s", kubecost.NewClosedWindow(s, e), err)
+	}
+
+	// Apply the annotations, labels, and services to the post-accumulation
+	// results. (See above for why this is necessary.)
+	result.Each(func(k string, a *kubecost.Allocation) {
+		if annotations, ok := allocationAnnotations[k]; ok {
+			a.Properties.Annotations = annotations
+		}
+
+		if labels, ok := allocationLabels[k]; ok {
+			a.Properties.Labels = labels
+		}
+
+		if services, ok := allocationServices[k]; ok {
+			a.Properties.Services = []string{}
+			for s := range services {
+				a.Properties.Services = append(a.Properties.Services, s)
+			}
+		}
+
+		// Expand the Window of all Allocations within the AllocationSet
+		// to match the Window of the AllocationSet, which gets expanded
+		// at the end of this function.
+		a.Window = a.Window.ExpandStart(start).ExpandEnd(end)
+	})
+
+	// Maintain RAM and CPU max usage values by iterating over the range,
+	// computing maximums on a rolling basis, and setting on the result set.
+	asr.Each(func(i int, as *kubecost.AllocationSet) {
+
+		// TODO pass through warnings and errors
+
+		as.Each(func(key string, alloc *kubecost.Allocation) {
+			resultAlloc := result.Get(key)
+			if resultAlloc == nil {
+				return
+			}
+
+			if resultAlloc.RawAllocationOnly == nil {
+				resultAlloc.RawAllocationOnly = &kubecost.RawAllocationOnlyData{}
+			}
+
+			if alloc.RawAllocationOnly == nil {
+				// This will happen inevitably for unmounted disks, but should
+				// ideally not happen for any allocation with CPU and RAM data.
+				if !alloc.IsUnmounted() {
+					log.DedupedWarningf(10, "ComputeAllocation: raw allocation data missing for %s", key)
+				}
+				return
+			}
+
+			if alloc.RawAllocationOnly.CPUCoreUsageMax > resultAlloc.RawAllocationOnly.CPUCoreUsageMax {
+				resultAlloc.RawAllocationOnly.CPUCoreUsageMax = alloc.RawAllocationOnly.CPUCoreUsageMax
+			}
+
+			if alloc.RawAllocationOnly.RAMBytesUsageMax > resultAlloc.RawAllocationOnly.RAMBytesUsageMax {
+				resultAlloc.RawAllocationOnly.RAMBytesUsageMax = alloc.RawAllocationOnly.RAMBytesUsageMax
+			}
+		})
+	})
+
+	// Expand the window to match the queried time range.
+	result.Window = result.Window.ExpandStart(start).ExpandEnd(end)
+
+	// TODO remove log
+	log.Infof("[Hourly] ComputeAllocation(%s) => AllocationSet{ %s window, %d allocations, %.5f total cost }", kubecost.NewClosedWindow(start, end), result.Window, result.Length(), result.TotalCost())
+
+	return result, nil
+}
+
+func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Duration) (*kubecost.AllocationSet, error) {
 	// 1. Build out Pod map from resolution-tuned, batched Pod start/end query
 	// 2. Run and apply the results of the remaining queries to
 	// 3. Build out AllocationSet from completed Pod map
@@ -110,16 +273,15 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 	clusterStart := map[string]time.Time{}
 	clusterEnd := map[string]time.Time{}
 
-	cm.buildPodMap(window, resolution, env.GetETLMaxBatchDuration(), podMap, clusterStart, clusterEnd)
+	// TODO:CLEANUP remove "max batch" idea and clusterStart/End
+	cm.buildPodMap(window, resolution, env.GetETLMaxPrometheusQueryDuration(), podMap, clusterStart, clusterEnd)
 
 	// (2) Run and apply remaining queries
 
-	// Convert window (start, end) to (duration, offset) for querying Prometheus,
-	// including handling Thanos offset
-	durStr, offStr, err := window.DurationOffsetForPrometheus()
-	if err != nil {
-		// Negative duration, so return empty set
-		return allocSet, nil
+	// Query for the duration between start and end
+	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		return allocSet, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
 	}
 
 	// Convert resolution duration to a query-ready string
@@ -127,125 +289,125 @@ func (cm *CostModel) ComputeAllocation(start, end time.Time, resolution time.Dur
 
 	ctx := prom.NewNamedContext(cm.PrometheusClient, prom.AllocationContextName)
 
-	queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, offStr, env.GetPromClusterLabel())
-	resChRAMBytesAllocated := ctx.Query(queryRAMBytesAllocated)
+	queryRAMBytesAllocated := fmt.Sprintf(queryFmtRAMBytesAllocated, durStr, env.GetPromClusterLabel())
+	resChRAMBytesAllocated := ctx.QueryAtTime(queryRAMBytesAllocated, end)
 
-	queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, offStr, env.GetPromClusterLabel())
-	resChRAMRequests := ctx.Query(queryRAMRequests)
+	queryRAMRequests := fmt.Sprintf(queryFmtRAMRequests, durStr, env.GetPromClusterLabel())
+	resChRAMRequests := ctx.QueryAtTime(queryRAMRequests, end)
 
-	queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, offStr, env.GetPromClusterLabel())
-	resChRAMUsageAvg := ctx.Query(queryRAMUsageAvg)
+	queryRAMUsageAvg := fmt.Sprintf(queryFmtRAMUsageAvg, durStr, env.GetPromClusterLabel())
+	resChRAMUsageAvg := ctx.QueryAtTime(queryRAMUsageAvg, end)
 
-	queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, offStr, env.GetPromClusterLabel())
-	resChRAMUsageMax := ctx.Query(queryRAMUsageMax)
+	queryRAMUsageMax := fmt.Sprintf(queryFmtRAMUsageMax, durStr, env.GetPromClusterLabel())
+	resChRAMUsageMax := ctx.QueryAtTime(queryRAMUsageMax, end)
 
-	queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, offStr, env.GetPromClusterLabel())
-	resChCPUCoresAllocated := ctx.Query(queryCPUCoresAllocated)
+	queryCPUCoresAllocated := fmt.Sprintf(queryFmtCPUCoresAllocated, durStr, env.GetPromClusterLabel())
+	resChCPUCoresAllocated := ctx.QueryAtTime(queryCPUCoresAllocated, end)
 
-	queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, offStr, env.GetPromClusterLabel())
-	resChCPURequests := ctx.Query(queryCPURequests)
+	queryCPURequests := fmt.Sprintf(queryFmtCPURequests, durStr, env.GetPromClusterLabel())
+	resChCPURequests := ctx.QueryAtTime(queryCPURequests, end)
 
-	queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, offStr, env.GetPromClusterLabel())
-	resChCPUUsageAvg := ctx.Query(queryCPUUsageAvg)
+	queryCPUUsageAvg := fmt.Sprintf(queryFmtCPUUsageAvg, durStr, env.GetPromClusterLabel())
+	resChCPUUsageAvg := ctx.QueryAtTime(queryCPUUsageAvg, end)
 
-	queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, offStr, env.GetPromClusterLabel())
-	resChCPUUsageMax := ctx.Query(queryCPUUsageMax)
+	queryCPUUsageMax := fmt.Sprintf(queryFmtCPUUsageMax, durStr, env.GetPromClusterLabel())
+	resChCPUUsageMax := ctx.QueryAtTime(queryCPUUsageMax, end)
 
-	queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, offStr, env.GetPromClusterLabel())
-	resChGPUsRequested := ctx.Query(queryGPUsRequested)
+	queryGPUsRequested := fmt.Sprintf(queryFmtGPUsRequested, durStr, env.GetPromClusterLabel())
+	resChGPUsRequested := ctx.QueryAtTime(queryGPUsRequested, end)
 
-	queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, offStr, env.GetPromClusterLabel())
-	resChGPUsAllocated := ctx.Query(queryGPUsAllocated)
+	queryGPUsAllocated := fmt.Sprintf(queryFmtGPUsAllocated, durStr, env.GetPromClusterLabel())
+	resChGPUsAllocated := ctx.QueryAtTime(queryGPUsAllocated, end)
 
-	queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, offStr, env.GetPromClusterLabel())
-	resChNodeCostPerCPUHr := ctx.Query(queryNodeCostPerCPUHr)
+	queryNodeCostPerCPUHr := fmt.Sprintf(queryFmtNodeCostPerCPUHr, durStr, env.GetPromClusterLabel())
+	resChNodeCostPerCPUHr := ctx.QueryAtTime(queryNodeCostPerCPUHr, end)
 
-	queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, offStr, env.GetPromClusterLabel())
-	resChNodeCostPerRAMGiBHr := ctx.Query(queryNodeCostPerRAMGiBHr)
+	queryNodeCostPerRAMGiBHr := fmt.Sprintf(queryFmtNodeCostPerRAMGiBHr, durStr, env.GetPromClusterLabel())
+	resChNodeCostPerRAMGiBHr := ctx.QueryAtTime(queryNodeCostPerRAMGiBHr, end)
 
-	queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, offStr, env.GetPromClusterLabel())
-	resChNodeCostPerGPUHr := ctx.Query(queryNodeCostPerGPUHr)
+	queryNodeCostPerGPUHr := fmt.Sprintf(queryFmtNodeCostPerGPUHr, durStr, env.GetPromClusterLabel())
+	resChNodeCostPerGPUHr := ctx.QueryAtTime(queryNodeCostPerGPUHr, end)
 
-	queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr, offStr)
-	resChNodeIsSpot := ctx.Query(queryNodeIsSpot)
+	queryNodeIsSpot := fmt.Sprintf(queryFmtNodeIsSpot, durStr)
+	resChNodeIsSpot := ctx.QueryAtTime(queryNodeIsSpot, end)
 
-	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr, offStr)
-	resChPVCInfo := ctx.Query(queryPVCInfo)
+	queryPVCInfo := fmt.Sprintf(queryFmtPVCInfo, env.GetPromClusterLabel(), durStr, resStr)
+	resChPVCInfo := ctx.QueryAtTime(queryPVCInfo, end)
 
-	queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, offStr, env.GetPromClusterLabel())
-	resChPVBytes := ctx.Query(queryPVBytes)
+	queryPVBytes := fmt.Sprintf(queryFmtPVBytes, durStr, env.GetPromClusterLabel())
+	resChPVBytes := ctx.QueryAtTime(queryPVBytes, end)
 
-	queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, offStr, env.GetPromClusterLabel())
-	resChPodPVCAllocation := ctx.Query(queryPodPVCAllocation)
+	queryPodPVCAllocation := fmt.Sprintf(queryFmtPodPVCAllocation, durStr, env.GetPromClusterLabel())
+	resChPodPVCAllocation := ctx.QueryAtTime(queryPodPVCAllocation, end)
 
-	queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, offStr, env.GetPromClusterLabel())
-	resChPVCBytesRequested := ctx.Query(queryPVCBytesRequested)
+	queryPVCBytesRequested := fmt.Sprintf(queryFmtPVCBytesRequested, durStr, env.GetPromClusterLabel())
+	resChPVCBytesRequested := ctx.QueryAtTime(queryPVCBytesRequested, end)
 
-	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, offStr, env.GetPromClusterLabel())
-	resChPVCostPerGiBHour := ctx.Query(queryPVCostPerGiBHour)
+	queryPVCostPerGiBHour := fmt.Sprintf(queryFmtPVCostPerGiBHour, durStr, env.GetPromClusterLabel())
+	resChPVCostPerGiBHour := ctx.QueryAtTime(queryPVCostPerGiBHour, end)
 
-	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, offStr, env.GetPromClusterLabel())
-	resChNetTransferBytes := ctx.Query(queryNetTransferBytes)
+	queryNetTransferBytes := fmt.Sprintf(queryFmtNetTransferBytes, durStr, env.GetPromClusterLabel())
+	resChNetTransferBytes := ctx.QueryAtTime(queryNetTransferBytes, end)
 
-	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, offStr, env.GetPromClusterLabel())
-	resChNetReceiveBytes := ctx.Query(queryNetReceiveBytes)
+	queryNetReceiveBytes := fmt.Sprintf(queryFmtNetReceiveBytes, durStr, env.GetPromClusterLabel())
+	resChNetReceiveBytes := ctx.QueryAtTime(queryNetReceiveBytes, end)
 
-	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, offStr, env.GetPromClusterLabel())
-	resChNetZoneGiB := ctx.Query(queryNetZoneGiB)
+	queryNetZoneGiB := fmt.Sprintf(queryFmtNetZoneGiB, durStr, env.GetPromClusterLabel())
+	resChNetZoneGiB := ctx.QueryAtTime(queryNetZoneGiB, end)
 
-	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
-	resChNetZoneCostPerGiB := ctx.Query(queryNetZoneCostPerGiB)
+	queryNetZoneCostPerGiB := fmt.Sprintf(queryFmtNetZoneCostPerGiB, durStr, env.GetPromClusterLabel())
+	resChNetZoneCostPerGiB := ctx.QueryAtTime(queryNetZoneCostPerGiB, end)
 
-	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, offStr, env.GetPromClusterLabel())
-	resChNetRegionGiB := ctx.Query(queryNetRegionGiB)
+	queryNetRegionGiB := fmt.Sprintf(queryFmtNetRegionGiB, durStr, env.GetPromClusterLabel())
+	resChNetRegionGiB := ctx.QueryAtTime(queryNetRegionGiB, end)
 
-	queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
-	resChNetRegionCostPerGiB := ctx.Query(queryNetRegionCostPerGiB)
+	queryNetRegionCostPerGiB := fmt.Sprintf(queryFmtNetRegionCostPerGiB, durStr, env.GetPromClusterLabel())
+	resChNetRegionCostPerGiB := ctx.QueryAtTime(queryNetRegionCostPerGiB, end)
 
-	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, offStr, env.GetPromClusterLabel())
-	resChNetInternetGiB := ctx.Query(queryNetInternetGiB)
+	queryNetInternetGiB := fmt.Sprintf(queryFmtNetInternetGiB, durStr, env.GetPromClusterLabel())
+	resChNetInternetGiB := ctx.QueryAtTime(queryNetInternetGiB, end)
 
-	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, offStr, env.GetPromClusterLabel())
-	resChNetInternetCostPerGiB := ctx.Query(queryNetInternetCostPerGiB)
+	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
+	resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
 
-	queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr, offStr)
-	resChNamespaceLabels := ctx.Query(queryNamespaceLabels)
+	queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
+	resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
 
-	queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr, offStr)
-	resChNamespaceAnnotations := ctx.Query(queryNamespaceAnnotations)
+	queryNamespaceAnnotations := fmt.Sprintf(queryFmtNamespaceAnnotations, durStr)
+	resChNamespaceAnnotations := ctx.QueryAtTime(queryNamespaceAnnotations, end)
 
-	queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr, offStr)
-	resChPodLabels := ctx.Query(queryPodLabels)
+	queryPodLabels := fmt.Sprintf(queryFmtPodLabels, durStr)
+	resChPodLabels := ctx.QueryAtTime(queryPodLabels, end)
 
-	queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr, offStr)
-	resChPodAnnotations := ctx.Query(queryPodAnnotations)
+	queryPodAnnotations := fmt.Sprintf(queryFmtPodAnnotations, durStr)
+	resChPodAnnotations := ctx.QueryAtTime(queryPodAnnotations, end)
 
-	queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr, offStr)
-	resChServiceLabels := ctx.Query(queryServiceLabels)
+	queryServiceLabels := fmt.Sprintf(queryFmtServiceLabels, durStr)
+	resChServiceLabels := ctx.QueryAtTime(queryServiceLabels, end)
 
-	queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr, offStr)
-	resChDeploymentLabels := ctx.Query(queryDeploymentLabels)
+	queryDeploymentLabels := fmt.Sprintf(queryFmtDeploymentLabels, durStr)
+	resChDeploymentLabels := ctx.QueryAtTime(queryDeploymentLabels, end)
 
-	queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr, offStr)
-	resChStatefulSetLabels := ctx.Query(queryStatefulSetLabels)
+	queryStatefulSetLabels := fmt.Sprintf(queryFmtStatefulSetLabels, durStr)
+	resChStatefulSetLabels := ctx.QueryAtTime(queryStatefulSetLabels, end)
 
-	queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, offStr, env.GetPromClusterLabel())
-	resChDaemonSetLabels := ctx.Query(queryDaemonSetLabels)
+	queryDaemonSetLabels := fmt.Sprintf(queryFmtDaemonSetLabels, durStr, env.GetPromClusterLabel())
+	resChDaemonSetLabels := ctx.QueryAtTime(queryDaemonSetLabels, end)
 
-	queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, offStr, env.GetPromClusterLabel())
-	resChPodsWithReplicaSetOwner := ctx.Query(queryPodsWithReplicaSetOwner)
+	queryPodsWithReplicaSetOwner := fmt.Sprintf(queryFmtPodsWithReplicaSetOwner, durStr, env.GetPromClusterLabel())
+	resChPodsWithReplicaSetOwner := ctx.QueryAtTime(queryPodsWithReplicaSetOwner, end)
 
-	queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, offStr, env.GetPromClusterLabel())
-	resChReplicaSetsWithoutOwners := ctx.Query(queryReplicaSetsWithoutOwners)
+	queryReplicaSetsWithoutOwners := fmt.Sprintf(queryFmtReplicaSetsWithoutOwners, durStr, env.GetPromClusterLabel())
+	resChReplicaSetsWithoutOwners := ctx.QueryAtTime(queryReplicaSetsWithoutOwners, end)
 
-	queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, offStr, env.GetPromClusterLabel())
-	resChJobLabels := ctx.Query(queryJobLabels)
+	queryJobLabels := fmt.Sprintf(queryFmtJobLabels, durStr, env.GetPromClusterLabel())
+	resChJobLabels := ctx.QueryAtTime(queryJobLabels, end)
 
-	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, offStr, env.GetPromClusterLabel())
-	resChLBCostPerHr := ctx.Query(queryLBCostPerHr)
+	queryLBCostPerHr := fmt.Sprintf(queryFmtLBCostPerHr, durStr, env.GetPromClusterLabel())
+	resChLBCostPerHr := ctx.QueryAtTime(queryLBCostPerHr, end)
 
-	queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr, offStr)
-	resChLBActiveMins := ctx.Query(queryLBActiveMins)
+	queryLBActiveMins := fmt.Sprintf(queryFmtLBActiveMins, env.GetPromClusterLabel(), durStr, resStr)
+	resChLBActiveMins := ctx.QueryAtTime(queryLBActiveMins, end)
 
 	resCPUCoresAllocated, _ := resChCPUCoresAllocated.Await()
 	resCPURequests, _ := resChCPURequests.Await()
@@ -542,7 +704,6 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
 		if batchEnd.After(end) {
 			batchEnd = end
 		}
-		batchWindow := kubecost.NewWindow(&batchStart, &batchEnd)
 
 		var resPods []*prom.QueryResult
 		var err error
@@ -551,10 +712,9 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
 		for resPods == nil && numTries < maxTries {
 			numTries++
 
-			// Convert window (start, end) to (duration, offset) for querying Prometheus,
-			// including handling Thanos offset
-			durStr, offStr, err := batchWindow.DurationOffsetForPrometheus()
-			if err != nil || durStr == "" {
+			// Query for the duration between start and end
+			durStr := timeutil.DurationString(batchEnd.Sub(batchStart))
+			if durStr == "" {
 				// Negative duration, so set empty results and don't query
 				resPods = []*prom.QueryResult{}
 				err = nil
@@ -562,9 +722,9 @@ func (cm *CostModel) buildPodMap(window kubecost.Window, resolution, maxBatchSiz
 			}
 
 			// Submit and profile query
-			queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr, offStr)
+			queryPods := fmt.Sprintf(queryFmtPods, env.GetPromClusterLabel(), durStr, resStr)
 			queryProfile := time.Now()
-			resPods, err = ctx.Query(queryPods).Await()
+			resPods, err = ctx.QueryAtTime(queryPods, batchEnd).Await()
 			if err != nil {
 				log.Profile(queryProfile, fmt.Sprintf("CostModel.ComputeAllocation: pod query %d try %d failed: %s", numQuery, numTries, queryPods))
 				resPods = nil

+ 10 - 0
pkg/costmodel/cluster.go

@@ -5,6 +5,7 @@ import (
 	"strconv"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/kubecost"
 	"github.com/kubecost/cost-model/pkg/util/timeutil"
 
 	"github.com/kubecost/cost-model/pkg/cloud"
@@ -126,6 +127,9 @@ type DiskIdentifier struct {
 func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
 	// Query for the duration between start and end
 	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
+	}
 
 	// Start from the time "end", querying backwards
 	t := end
@@ -377,6 +381,9 @@ func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[No
 func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
 	// Query for the duration between start and end
 	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
+	}
 
 	// Start from the time "end", querying backwards
 	t := end
@@ -530,6 +537,9 @@ type LoadBalancer struct {
 func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
 	// Query for the duration between start and end
 	durStr := timeutil.DurationString(end.Sub(start))
+	if durStr == "" {
+		return nil, fmt.Errorf("illegal duration value for %s", kubecost.NewClosedWindow(start, end))
+	}
 
 	// Start from the time "end", querying backwards
 	t := end

+ 15 - 13
pkg/costmodel/costmodel.go

@@ -47,13 +47,14 @@ const (
 var isCron = regexp.MustCompile(`^(.+)-\d{10}$`)
 
 type CostModel struct {
-	Cache            clustercache.ClusterCache
-	ClusterMap       clusters.ClusterMap
-	RequestGroup     *singleflight.Group
-	ScrapeInterval   time.Duration
-	PrometheusClient prometheus.Client
-	Provider         costAnalyzerCloud.Provider
-	pricingMetadata  *costAnalyzerCloud.PricingMatchMetadata
+	Cache                      clustercache.ClusterCache
+	ClusterMap                 clusters.ClusterMap
+	MaxPrometheusQueryDuration time.Duration
+	RequestGroup               *singleflight.Group
+	ScrapeInterval             time.Duration
+	PrometheusClient           prometheus.Client
+	Provider                   costAnalyzerCloud.Provider
+	pricingMetadata            *costAnalyzerCloud.PricingMatchMetadata
 }
 
 func NewCostModel(client prometheus.Client, provider costAnalyzerCloud.Provider, cache clustercache.ClusterCache, clusterMap clusters.ClusterMap, scrapeInterval time.Duration) *CostModel {
@@ -61,12 +62,13 @@ func NewCostModel(client prometheus.Client, provider costAnalyzerCloud.Provider,
 	requestGroup := new(singleflight.Group)
 
 	return &CostModel{
-		Cache:            cache,
-		ClusterMap:       clusterMap,
-		PrometheusClient: client,
-		Provider:         provider,
-		RequestGroup:     requestGroup,
-		ScrapeInterval:   scrapeInterval,
+		Cache:                      cache,
+		ClusterMap:                 clusterMap,
+		MaxPrometheusQueryDuration: env.GetETLMaxPrometheusQueryDuration(),
+		PrometheusClient:           client,
+		Provider:                   provider,
+		RequestGroup:               requestGroup,
+		ScrapeInterval:             scrapeInterval,
 	}
 }
 

+ 9 - 13
pkg/env/costmodelenv.go

@@ -62,11 +62,11 @@ const (
 
 	UTCOffsetEnvVar = "UTC_OFFSET"
 
-	CacheWarmingEnabledEnvVar    = "CACHE_WARMING_ENABLED"
-	ETLEnabledEnvVar             = "ETL_ENABLED"
-	ETLMaxBatchHours             = "ETL_MAX_BATCH_HOURS"
-	ETLResolutionSeconds         = "ETL_RESOLUTION_SECONDS"
-	LegacyExternalAPIDisabledVar = "LEGACY_EXTERNAL_API_DISABLED"
+	CacheWarmingEnabledEnvVar            = "CACHE_WARMING_ENABLED"
+	ETLEnabledEnvVar                     = "ETL_ENABLED"
+	ETLMaxPrometheusQueryDurationMinutes = "ETL_MAX_PROMETHEUS_QUERY_DURATION_MINUTES"
+	ETLResolutionSeconds                 = "ETL_RESOLUTION_SECONDS"
+	LegacyExternalAPIDisabledVar         = "LEGACY_EXTERNAL_API_DISABLED"
 
 	PromClusterIDLabelEnvVar = "PROM_CLUSTER_ID_LABEL"
 
@@ -429,14 +429,10 @@ func IsETLEnabled() bool {
 	return GetBool(ETLEnabledEnvVar, true)
 }
 
-// GetETLMaxBatchDuration limits the window duration of the most expensive ETL
-// queries to a maximum batch size, such that queries can be tuned to avoid
-// timeout for large windows; e.g. if a 24h query is expected to timeout, but
-// a 6h query is expected to complete in 1m, then 6h could be a good value.
-func GetETLMaxBatchDuration() time.Duration {
-	// Default to 6h
-	hrs := time.Duration(GetInt64(ETLMaxBatchHours, 6))
-	return hrs * time.Hour
+func GetETLMaxPrometheusQueryDuration() time.Duration {
+	dayMins := 60 * 24
+	mins := time.Duration(GetInt64(ETLMaxPrometheusQueryDurationMinutes, int64(dayMins)))
+	return mins * time.Minute
 }
 
 // GetETLResolution determines the resolution of ETL queries. The smaller the

+ 9 - 9
pkg/kubecost/status.go

@@ -4,15 +4,15 @@ import "time"
 
 // ETLStatus describes ETL metadata
 type ETLStatus struct {
-	Coverage    Window           `json:"coverage"`
-	LastRun     time.Time        `json:"lastRun"`
-	Progress    float64          `json:"progress"`
-	RefreshRate string           `json:"refreshRate"`
-	Resolution  string           `json:"resolution"`
-	MaxBatch    string           `json:"maxBatch"`
-	StartTime   time.Time        `json:"startTime"`
-	UTCOffset   string           `json:"utcOffset"`
-	Backup      *DirectoryStatus `json:"backup,omitempty"`
+	Coverage                   Window           `json:"coverage"`
+	LastRun                    time.Time        `json:"lastRun"`
+	Progress                   float64          `json:"progress"`
+	RefreshRate                string           `json:"refreshRate"`
+	Resolution                 string           `json:"resolution"`
+	MaxPrometheusQueryDuration string           `json:"maxPrometheusQueryDuration"`
+	StartTime                  time.Time        `json:"startTime"`
+	UTCOffset                  string           `json:"utcOffset"`
+	Backup                     *DirectoryStatus `json:"backup,omitempty"`
 }
 
 // DirectoryStatus describes metadata of a directory of files