2
0
Эх сурвалжийг харах

Merge pull request #1127 from kubecost/niko/query-time

Improve Asset querying by allowing precise times to be passed to Prometheus queries
Niko Kovacevic 4 жил өмнө
parent
commit
f7356452f5

+ 125 - 102
pkg/costmodel/cluster.go

@@ -118,12 +118,17 @@ type Disk struct {
 	Breakdown  *ClusterCostsBreakdown
 }
 
-func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
-	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
-	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
-	if offset < time.Minute {
-		offsetStr = ""
-	}
+type DiskIdentifier struct {
+	Cluster string
+	Name    string
+}
+
+func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[DiskIdentifier]*Disk, error) {
+	// Query for the duration between start and end
+	durStr := timeutil.DurationString(end.Sub(start))
+
+	// Start from the time "end", querying backwards
+	t := end
 
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
@@ -140,22 +145,22 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	costPerGBHr := 0.04 / 730.0
 
 	ctx := prom.NewNamedContext(client, prom.ClusterContextName)
-	queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s]%s)) by (%s, persistentvolume,provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (%s, persistentvolume)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-
-	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
-	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
-	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s)`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-
-	resChPVCost := ctx.Query(queryPVCost)
-	resChPVSize := ctx.Query(queryPVSize)
-	resChActiveMins := ctx.Query(queryActiveMins)
-	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
-	resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
-	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
-	resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
+	queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
+	queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
+	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+
+	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+
+	resChPVCost := ctx.QueryAtTime(queryPVCost, t)
+	resChPVSize := ctx.QueryAtTime(queryPVSize, t)
+	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
+	resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
+	resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
+	resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
+	resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
 
 	resPVCost, _ := resChPVCost.Await()
 	resPVSize, _ := resChPVSize.Await()
@@ -168,7 +173,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		return nil, ctx.ErrorCollection()
 	}
 
-	diskMap := map[string]*Disk{}
+	diskMap := map[DiskIdentifier]*Disk{}
 
 	pvCosts(diskMap, resolution, resActiveMins, resPVSize, resPVCost, provider)
 
@@ -185,7 +190,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		}
 
 		cost := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -210,7 +215,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		}
 
 		cost := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -235,7 +240,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		}
 
 		bytes := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -263,7 +268,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 			continue
 		}
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			log.DedupedWarningf(5, "ClusterDisks: local active mins for unidentified disk or disk deleted from analysis")
 			continue
@@ -274,7 +279,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		}
 
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
@@ -369,12 +374,12 @@ func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[No
 	}
 }
 
-func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[NodeIdentifier]*Node, error) {
-	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
-	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
-	if offset < time.Minute {
-		offsetStr = ""
-	}
+func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
+	// Query for the duration between start and end
+	durStr := timeutil.DurationString(end.Sub(start))
+
+	// Start from the time "end", querying backwards
+	t := end
 
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
@@ -385,34 +390,34 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
 	optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
 
-	queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (%s, node, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, %s, mode)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel())
-	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-	queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
+	queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
+	queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
+	queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
+	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
+	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+	queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
+	queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
 
 	// Return errors if these fail
-	resChNodeCPUHourlyCost := requiredCtx.Query(queryNodeCPUHourlyCost)
-	resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
-	resChNodeRAMHourlyCost := requiredCtx.Query(queryNodeRAMHourlyCost)
-	resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
-	resChNodeGPUCount := requiredCtx.Query(queryNodeGPUCount)
-	resChNodeGPUHourlyCost := requiredCtx.Query(queryNodeGPUHourlyCost)
-	resChActiveMins := requiredCtx.Query(queryActiveMins)
-	resChIsSpot := requiredCtx.Query(queryIsSpot)
+	resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
+	resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
+	resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
+	resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
+	resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
+	resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
+	resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
+	resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
 
 	// Do not return errors if these fail, but log warnings
-	resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
-	resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
-	resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
-	resChLabels := optionalCtx.Query(queryLabels)
+	resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
+	resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
+	resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
+	resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
 
 	resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
 	resNodeCPUCores, _ := resChNodeCPUCores.Await()
@@ -475,6 +480,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		preemptibleMap,
 		labelsMap,
 		clusterAndNameToType,
+		resolution,
 	)
 
 	c, err := cp.GetConfig()
@@ -504,38 +510,42 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	return nodeMap, nil
 }
 
+type LoadBalancerIdentifier struct {
+	Cluster   string
+	Namespace string
+	Name      string
+}
+
 type LoadBalancer struct {
 	Cluster    string
+	Namespace  string
 	Name       string
 	ProviderID string
 	Cost       float64
 	Start      time.Time
+	End        time.Time
 	Minutes    float64
 }
 
-func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
-	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
-	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
-	if offset < time.Minute {
-		offsetStr = ""
-	}
+func ClusterLoadBalancers(client prometheus.Client, start, end time.Time) (map[LoadBalancerIdentifier]*LoadBalancer, error) {
+	// Query for the duration between start and end
+	durStr := timeutil.DurationString(end.Sub(start))
+
+	// Start from the time "end", querying backwards
+	t := end
 
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
 	// but more expensive queries, and vice-a-versa.
-	minsPerResolution := 5
-
-	// hourlyToCumulative is a scaling factor that, when multiplied by an hourly
-	// value, converts it to a cumulative value; i.e.
-	// [$/hr] * [min/res]*[hr/min] = [$/res]
-	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
+	minsPerResolution := 1
 
 	ctx := prom.NewNamedContext(client, prom.ClusterContextName)
-	queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip))[%s:%dm]%s) * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
-	queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
 
-	resChLBCost := ctx.Query(queryLBCost)
-	resChActiveMins := ctx.Query(queryActiveMins)
+	queryLBCost := fmt.Sprintf(`avg(avg_over_time(kubecost_load_balancer_cost[%s])) by (namespace, service_name, %s, ingress_ip)`, durStr, env.GetPromClusterLabel())
+	queryActiveMins := fmt.Sprintf(`avg(kubecost_load_balancer_cost) by (namespace, service_name, %s, ingress_ip)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+
+	resChLBCost := ctx.QueryAtTime(queryLBCost, t)
+	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
 
 	resLBCost, _ := resChLBCost.Await()
 	resActiveMins, _ := resChActiveMins.Await()
@@ -544,9 +554,9 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 		return nil, ctx.ErrorCollection()
 	}
 
-	loadBalancerMap := map[string]*LoadBalancer{}
+	loadBalancerMap := make(map[LoadBalancerIdentifier]*LoadBalancer, len(resActiveMins))
 
-	for _, result := range resLBCost {
+	for _, result := range resActiveMins {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -556,7 +566,7 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 			log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
 			continue
 		}
-		serviceName, err := result.GetString("service_name")
+		name, err := result.GetString("service_name")
 		if err != nil {
 			log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
 			continue
@@ -566,25 +576,43 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 			log.DedupedWarningf(5, "ClusterLoadBalancers: LB cost data missing ingress_ip")
 			providerID = ""
 		}
-		lbCost := result.Values[0].Value
 
-		key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
+		key := LoadBalancerIdentifier{
+			Cluster:   cluster,
+			Namespace: namespace,
+			Name:      name,
+		}
+
+		// Skip if there are no data
+		if len(result.Values) == 0 {
+			continue
+		}
+
+		// Add load balancer to the set of load balancers
 		if _, ok := loadBalancerMap[key]; !ok {
 			loadBalancerMap[key] = &LoadBalancer{
 				Cluster:    cluster,
-				Name:       namespace + "/" + serviceName,
+				Namespace:  namespace,
+				Name:       fmt.Sprintf("%s/%s", namespace, name), // TODO:ETL this is kept for backwards-compatibility, but not good
 				ProviderID: cloud.ParseLBID(providerID),
 			}
 		}
+
+		// Append start, end, and minutes. This should come before all other data.
+		s := time.Unix(int64(result.Values[0].Timestamp), 0)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
+		loadBalancerMap[key].Start = s
+		loadBalancerMap[key].End = e
+		loadBalancerMap[key].Minutes = e.Sub(s).Minutes()
+
 		// Fill in Provider ID if it is available and missing in the loadBalancerMap
 		// Prevents there from being a duplicate LoadBalancers on the same day
 		if providerID != "" && loadBalancerMap[key].ProviderID == "" {
 			loadBalancerMap[key].ProviderID = providerID
 		}
-		loadBalancerMap[key].Cost += lbCost
 	}
 
-	for _, result := range resActiveMins {
+	for _, result := range resLBCost {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
 			cluster = env.GetClusterID()
@@ -594,28 +622,28 @@ func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Durati
 			log.Warningf("ClusterLoadBalancers: LB cost data missing namespace")
 			continue
 		}
-		serviceName, err := result.GetString("service_name")
+		name, err := result.GetString("service_name")
 		if err != nil {
 			log.Warningf("ClusterLoadBalancers: LB cost data missing service_name")
 			continue
 		}
-		key := fmt.Sprintf("%s/%s/%s", cluster, namespace, serviceName)
 
-		if len(result.Values) == 0 {
-			continue
+		key := LoadBalancerIdentifier{
+			Cluster:   cluster,
+			Namespace: namespace,
+			Name:      name,
 		}
 
+		// Apply cost as price-per-hour * hours
 		if lb, ok := loadBalancerMap[key]; ok {
-			s := time.Unix(int64(result.Values[0].Timestamp), 0)
-			e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
-			mins := e.Sub(s).Minutes()
-
-			lb.Start = s
-			lb.Minutes = mins
+			lbPricePerHr := result.Values[0].Value
+			hrs := lb.Minutes / 60.0
+			lb.Cost += lbPricePerHr * hrs
 		} else {
 			log.DedupedWarningf(20, "ClusterLoadBalancers: found minutes for key that does not exist: %s", key)
 		}
 	}
+
 	return loadBalancerMap, nil
 }
 
@@ -1079,7 +1107,7 @@ func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startS
 	}, nil
 }
 
-func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
+func pvCosts(diskMap map[DiskIdentifier]*Disk, resolution time.Duration, resActiveMins, resPVSize, resPVCost []*prom.QueryResult, cp cloud.Provider) {
 	for _, result := range resActiveMins {
 		cluster, err := result.GetString(env.GetPromClusterLabel())
 		if err != nil {
@@ -1096,7 +1124,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 			continue
 		}
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -1105,7 +1133,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 			}
 		}
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
@@ -1130,7 +1158,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 		// TODO niko/assets storage class
 
 		bytes := result.Values[0].Value
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,
@@ -1162,9 +1190,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 		// TODO niko/assets storage class
 
 		var cost float64
-
 		if customPricingEnabled && customPricingConfig != nil {
-
 			customPVCostStr := customPricingConfig.Storage
 
 			customPVCost, err := strconv.ParseFloat(customPVCostStr, 64)
@@ -1173,14 +1199,11 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 			}
 
 			cost = customPVCost
-
 		} else {
-
 			cost = result.Values[0].Value
-
 		}
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
+		key := DiskIdentifier{cluster, name}
 		if _, ok := diskMap[key]; !ok {
 			diskMap[key] = &Disk{
 				Cluster:   cluster,

+ 3 - 2
pkg/costmodel/cluster_helpers.go

@@ -518,7 +518,7 @@ func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Durat
 		}
 
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
@@ -705,6 +705,7 @@ func buildNodeMap(
 	preemptibleMap map[NodeIdentifier]bool,
 	labelsMap map[nodeIdentifierNoProviderID]map[string]string,
 	clusterAndNameToType map[nodeIdentifierNoProviderID]string,
+	res time.Duration,
 ) map[NodeIdentifier]*Node {
 
 	nodeMap := make(map[NodeIdentifier]*Node)
@@ -740,7 +741,7 @@ func buildNodeMap(
 		checkForKeyAndInitIfMissing(nodeMap, id, clusterAndNameToType)
 		nodeMap[id].Start = activeData.start
 		nodeMap[id].End = activeData.end
-		nodeMap[id].Minutes = activeData.minutes
+		nodeMap[id].Minutes = nodeMap[id].End.Sub(nodeMap[id].Start).Minutes()
 	}
 
 	// We now merge in data that doesn't have a provider id by looping over

+ 9 - 4
pkg/costmodel/cluster_helpers_test.go

@@ -1,12 +1,12 @@
 package costmodel
 
 import (
-	"github.com/kubecost/cost-model/pkg/config"
 	"reflect"
 	"testing"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
 
@@ -687,6 +687,7 @@ func TestBuildNodeMap(t *testing.T) {
 				testCase.preemptibleMap,
 				testCase.labelsMap,
 				testCase.clusterAndNameToType,
+				time.Minute,
 			)
 
 			if !reflect.DeepEqual(result, testCase.expected) {
@@ -925,7 +926,11 @@ func TestAssetCustompricing(t *testing.T) {
 			Values: []*util.Vector{
 				&util.Vector{
 					Timestamp: 0,
-					Value:     60.0,
+					Value:     1.0,
+				},
+				&util.Vector{
+					Timestamp: 3600.0,
+					Value:     1.0,
 				},
 			},
 		},
@@ -994,10 +999,10 @@ func TestAssetCustompricing(t *testing.T) {
 			ramResult := ramMap[nodeKey]
 			gpuResult := gpuMap[nodeKey]
 
-			diskMap := map[string]*Disk{}
+			diskMap := map[DiskIdentifier]*Disk{}
 			pvCosts(diskMap, time.Hour, pvMinsPromResult, pvSizePromResult, pvCostPromResult, testProvider)
 
-			diskResult := diskMap["cluster1/pvc1"].Cost
+			diskResult := diskMap[DiskIdentifier{"cluster1", "pvc1"}].Cost
 
 			if !util.IsApproximately(cpuResult, testCase.expectedPricing["CPU"]) {
 				t.Errorf("CPU custom pricing error in %s. Got %v but expected %v", testCase.name, cpuResult, testCase.expectedPricing["CPU"])

+ 34 - 2
pkg/costmodel/router.go

@@ -692,8 +692,24 @@ func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ htt
 		return
 	}
 
+	// Attempt to parse time as either a unix timestamp or as an RFC3339 value
+	var timeVal time.Time
+	timeStr := qp.Get("time", "")
+	if len(timeStr) > 0 {
+		if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
+			timeVal = time.Unix(t, 0)
+		} else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
+			timeVal = t
+		}
+
+		// If time is given, but not parse-able, return an error
+		if timeVal.IsZero() {
+			http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
+		}
+	}
+
 	ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
-	body, err := ctx.RawQuery(query)
+	body, err := ctx.RawQuery(query, timeVal)
 	if err != nil {
 		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
 		return
@@ -745,8 +761,24 @@ func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprou
 		return
 	}
 
+	// Attempt to parse time as either a unix timestamp or as an RFC3339 value
+	var timeVal time.Time
+	timeStr := qp.Get("time", "")
+	if len(timeStr) > 0 {
+		if t, err := strconv.ParseInt(timeStr, 10, 64); err == nil {
+			timeVal = time.Unix(t, 0)
+		} else if t, err := time.Parse(time.RFC3339, timeStr); err == nil {
+			timeVal = t
+		}
+
+		// If time is given, but not parse-able, return an error
+		if timeVal.IsZero() {
+			http.Error(w, fmt.Sprintf("time must be a unix timestamp or RFC3339 value; illegal value given: %s", timeStr), http.StatusBadRequest)
+		}
+	}
+
 	ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
-	body, err := ctx.RawQuery(query)
+	body, err := ctx.RawQuery(query, timeVal)
 	if err != nil {
 		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
 		return

+ 32 - 16
pkg/prom/query.go

@@ -89,7 +89,19 @@ func (ctx *Context) ErrorCollection() error {
 func (ctx *Context) Query(query string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go runQuery(query, ctx, resCh, "")
+	go runQuery(query, ctx, resCh, time.Now(), "")
+
+	return resCh
+}
+
+// QueryWithTime returns a QueryResultsChan, then runs the given query at the
+// given time (see time parameter here: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries)
+// and sends the results on the provided channel. Receiver is responsible for
+// closing the channel, preferably using the Read method.
+func (ctx *Context) QueryAtTime(query string, t time.Time) QueryResultsChan {
+	resCh := make(QueryResultsChan)
+
+	go runQuery(query, ctx, resCh, t, "")
 
 	return resCh
 }
@@ -100,7 +112,7 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 func (ctx *Context) ProfileQuery(query string, profileLabel string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go runQuery(query, ctx, resCh, profileLabel)
+	go runQuery(query, ctx, resCh, time.Now(), profileLabel)
 
 	return resCh
 }
@@ -134,7 +146,7 @@ func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
 }
 
 func (ctx *Context) QuerySync(query string) ([]*QueryResult, prometheus.Warnings, error) {
-	raw, warnings, err := ctx.query(query)
+	raw, warnings, err := ctx.query(query, time.Now())
 	if err != nil {
 		return nil, warnings, err
 	}
@@ -154,11 +166,11 @@ func (ctx *Context) QueryURL() *url.URL {
 
 // runQuery executes the prometheus query asynchronously, collects results and
 // errors, and passes them through the results channel.
-func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+func runQuery(query string, ctx *Context, resCh QueryResultsChan, t time.Time, profileLabel string) {
 	defer errors.HandlePanic()
 	startQuery := time.Now()
 
-	raw, warnings, requestError := ctx.query(query)
+	raw, warnings, requestError := ctx.query(query, t)
 	results := NewQueryResults(query, raw)
 
 	// report all warnings, request, and parse errors (nils will be ignored)
@@ -172,18 +184,22 @@ func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel s
 }
 
 // RawQuery is a direct query to the prometheus client and returns the body of the response
-func (ctx *Context) RawQuery(query string) ([]byte, error) {
+func (ctx *Context) RawQuery(query string, t time.Time) ([]byte, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)
 
-	// for non-range queries, we set the timestamp for the query to time-offset
-	// this is a special use case that's typically only used when our primary
-	// prom db has delayed insertion (thanos, cortex, etc...)
-	if promQueryOffset != 0 && ctx.name != AllocationContextName {
-		q.Set("time", time.Now().Add(-promQueryOffset).UTC().Format(time.RFC3339))
+	if !t.IsZero() {
+		q.Set("time", strconv.FormatInt(t.Unix(), 10))
 	} else {
-		q.Set("time", time.Now().UTC().Format(time.RFC3339))
+		// for non-range queries, we set the timestamp for the query to time-offset
+		// this is a special use case that's typically only used when our primary
+		// prom db has delayed insertion (thanos, cortex, etc...)
+		if promQueryOffset != 0 && ctx.name != AllocationContextName {
+			q.Set("time", time.Now().Add(-promQueryOffset).UTC().Format(time.RFC3339))
+		} else {
+			q.Set("time", time.Now().UTC().Format(time.RFC3339))
+		}
 	}
 
 	u.RawQuery = q.Encode()
@@ -221,8 +237,8 @@ func (ctx *Context) RawQuery(query string) ([]byte, error) {
 	return body, err
 }
 
-func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error) {
-	body, err := ctx.RawQuery(query)
+func (ctx *Context) query(query string, t time.Time) (interface{}, prometheus.Warnings, error) {
+	body, err := ctx.RawQuery(query, t)
 	if err != nil {
 		return nil, nil, err
 	}
@@ -230,7 +246,7 @@ func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, nil, fmt.Errorf("Unmarshal Error: %s\nQuery: %s", err, query)
+		return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
 	}
 
 	warnings := warningsFrom(toReturn)
@@ -354,7 +370,7 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, nil, fmt.Errorf("Unmarshal Error: %s\nQuery: %s", err, query)
+		return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
 	}
 
 	warnings := warningsFrom(toReturn)