Browse Source

Allow passing time to Prometheus queries; pass time in Asset node and disk queries; remove 1m addition to end for nodes and disks

Niko Kovacevic 4 years ago
parent
commit
2eb995d7b9

+ 56 - 54
pkg/costmodel/cluster.go

@@ -118,12 +118,12 @@ type Disk struct {
 	Breakdown  *ClusterCostsBreakdown
 }
 
-func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, offset time.Duration) (map[string]*Disk, error) {
-	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
-	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
-	if offset < time.Minute {
-		offsetStr = ""
-	}
+func ClusterDisks(client prometheus.Client, provider cloud.Provider, start, end time.Time) (map[string]*Disk, error) {
+	// Query for the duration between start and end
+	durStr := timeutil.DurationString(end.Sub(start))
+
+	// Start from the time "end", querying backwards
+	t := end
 
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
@@ -140,22 +140,22 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 	costPerGBHr := 0.04 / 730.0
 
 	ctx := prom.NewNamedContext(client, prom.ClusterContextName)
-	queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s]%s)) by (%s, persistentvolume,provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s]%s)) by (%s, persistentvolume)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-
-	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
-	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, hourlyToCumulative, costPerGBHr)
-	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]%s)`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-
-	resChPVCost := ctx.Query(queryPVCost)
-	resChPVSize := ctx.Query(queryPVSize)
-	resChActiveMins := ctx.Query(queryActiveMins)
-	resChLocalStorageCost := ctx.Query(queryLocalStorageCost)
-	resChLocalStorageUsedCost := ctx.Query(queryLocalStorageUsedCost)
-	resChLocalStorageBytes := ctx.Query(queryLocalStorageBytes)
-	resChLocalActiveMins := ctx.Query(queryLocalActiveMins)
+	queryPVCost := fmt.Sprintf(`avg(avg_over_time(pv_hourly_cost[%s])) by (%s, persistentvolume,provider_id)`, durStr, env.GetPromClusterLabel())
+	queryPVSize := fmt.Sprintf(`avg(avg_over_time(kube_persistentvolume_capacity_bytes[%s])) by (%s, persistentvolume)`, durStr, env.GetPromClusterLabel())
+	queryActiveMins := fmt.Sprintf(`count(pv_hourly_cost) by (%s, persistentvolume)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+
+	queryLocalStorageCost := fmt.Sprintf(`sum_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageUsedCost := fmt.Sprintf(`sum_over_time(sum(container_fs_usage_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm]) / 1024 / 1024 / 1024 * %f * %f`, env.GetPromClusterLabel(), durStr, minsPerResolution, hourlyToCumulative, costPerGBHr)
+	queryLocalStorageBytes := fmt.Sprintf(`avg_over_time(sum(container_fs_limit_bytes{device!="tmpfs", id="/"}) by (instance, %s)[%s:%dm])`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+	queryLocalActiveMins := fmt.Sprintf(`count(node_total_hourly_cost) by (%s, node)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+
+	resChPVCost := ctx.QueryAtTime(queryPVCost, t)
+	resChPVSize := ctx.QueryAtTime(queryPVSize, t)
+	resChActiveMins := ctx.QueryAtTime(queryActiveMins, t)
+	resChLocalStorageCost := ctx.QueryAtTime(queryLocalStorageCost, t)
+	resChLocalStorageUsedCost := ctx.QueryAtTime(queryLocalStorageUsedCost, t)
+	resChLocalStorageBytes := ctx.QueryAtTime(queryLocalStorageBytes, t)
+	resChLocalActiveMins := ctx.QueryAtTime(queryLocalActiveMins, t)
 
 	resPVCost, _ := resChPVCost.Await()
 	resPVSize, _ := resChPVSize.Await()
@@ -274,7 +274,7 @@ func ClusterDisks(client prometheus.Client, provider cloud.Provider, duration, o
 		}
 
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
@@ -369,12 +369,12 @@ func costTimesMinute(activeDataMap map[NodeIdentifier]activeData, costMap map[No
 	}
 }
 
-func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset time.Duration) (map[NodeIdentifier]*Node, error) {
-	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
-	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
-	if offset < time.Minute {
-		offsetStr = ""
-	}
+func ClusterNodes(cp cloud.Provider, client prometheus.Client, start, end time.Time) (map[NodeIdentifier]*Node, error) {
+	// Query for the duration between start and end
+	durStr := timeutil.DurationString(end.Sub(start))
+
+	// Start from the time "end", querying backwards
+	t := end
 
 	// minsPerResolution determines accuracy and resource use for the following
 	// queries. Smaller values (higher resolution) result in better accuracy,
@@ -385,34 +385,34 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	requiredCtx := prom.NewNamedContext(client, prom.ClusterContextName)
 	optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
 
-	queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s]%s)) by (%s, node)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s]%s)) by (%s, node, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s]%s)) by (%s, node, instance_type, provider_id)`, durationStr, offsetStr, env.GetPromClusterLabel())
-	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, %s, mode)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel())
-	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr, env.GetPromClusterLabel(), env.GetPromClusterLabel())
-	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]%s`, env.GetPromClusterLabel(), durationStr, minsPerResolution, offsetStr)
-	queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
+	queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
+	queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
+	queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
+	queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
+	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
+	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm])) by (instance, %s) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm])) by (node, %s), "instance", "$1", "node", "(.*)")) by (instance, %s)`, durStr, minsPerResolution, env.GetPromClusterLabel(), durStr, minsPerResolution, env.GetPromClusterLabel(), env.GetPromClusterLabel())
+	queryActiveMins := fmt.Sprintf(`avg(node_total_hourly_cost) by (node, %s, provider_id)[%s:%dm]`, env.GetPromClusterLabel(), durStr, minsPerResolution)
+	queryIsSpot := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm])`, durStr, minsPerResolution)
+	queryLabels := fmt.Sprintf(`count_over_time(kube_node_labels[%s:%dm])`, durStr, minsPerResolution)
 
 	// Return errors if these fail
-	resChNodeCPUHourlyCost := requiredCtx.Query(queryNodeCPUHourlyCost)
-	resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
-	resChNodeRAMHourlyCost := requiredCtx.Query(queryNodeRAMHourlyCost)
-	resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
-	resChNodeGPUCount := requiredCtx.Query(queryNodeGPUCount)
-	resChNodeGPUHourlyCost := requiredCtx.Query(queryNodeGPUHourlyCost)
-	resChActiveMins := requiredCtx.Query(queryActiveMins)
-	resChIsSpot := requiredCtx.Query(queryIsSpot)
+	resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
+	resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
+	resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
+	resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
+	resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
+	resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
+	resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
+	resChIsSpot := requiredCtx.QueryAtTime(queryIsSpot, t)
 
 	// Do not return errors if these fail, but log warnings
-	resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
-	resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
-	resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
-	resChLabels := optionalCtx.Query(queryLabels)
+	resChNodeCPUModeTotal := optionalCtx.QueryAtTime(queryNodeCPUModeTotal, t)
+	resChNodeRAMSystemPct := optionalCtx.QueryAtTime(queryNodeRAMSystemPct, t)
+	resChNodeRAMUserPct := optionalCtx.QueryAtTime(queryNodeRAMUserPct, t)
+	resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
 
 	resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
 	resNodeCPUCores, _ := resChNodeCPUCores.Await()
@@ -475,6 +475,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		preemptibleMap,
 		labelsMap,
 		clusterAndNameToType,
+		resolution,
 	)
 
 	c, err := cp.GetConfig()
@@ -513,6 +514,7 @@ type LoadBalancer struct {
 	Minutes    float64
 }
 
+// TODO rewrite this more-or-less altogether
 func ClusterLoadBalancers(client prometheus.Client, duration, offset time.Duration) (map[string]*LoadBalancer, error) {
 	durationStr := fmt.Sprintf("%dm", int64(duration.Minutes()))
 	offsetStr := fmt.Sprintf(" offset %dm", int64(offset.Minutes()))
@@ -1105,7 +1107,7 @@ func pvCosts(diskMap map[string]*Disk, resolution time.Duration, resActiveMins,
 			}
 		}
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?

+ 3 - 2
pkg/costmodel/cluster_helpers.go

@@ -518,7 +518,7 @@ func buildActiveDataMap(resActiveMins []*prom.QueryResult, resolution time.Durat
 		}
 
 		s := time.Unix(int64(result.Values[0].Timestamp), 0)
-		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0).Add(resolution)
+		e := time.Unix(int64(result.Values[len(result.Values)-1].Timestamp), 0)
 		mins := e.Sub(s).Minutes()
 
 		// TODO niko/assets if mins >= threshold, interpolate for missing data?
@@ -705,6 +705,7 @@ func buildNodeMap(
 	preemptibleMap map[NodeIdentifier]bool,
 	labelsMap map[nodeIdentifierNoProviderID]map[string]string,
 	clusterAndNameToType map[nodeIdentifierNoProviderID]string,
+	res time.Duration,
 ) map[NodeIdentifier]*Node {
 
 	nodeMap := make(map[NodeIdentifier]*Node)
@@ -740,7 +741,7 @@ func buildNodeMap(
 		checkForKeyAndInitIfMissing(nodeMap, id, clusterAndNameToType)
 		nodeMap[id].Start = activeData.start
 		nodeMap[id].End = activeData.end
-		nodeMap[id].Minutes = activeData.minutes
+		nodeMap[id].Minutes = nodeMap[id].End.Sub(nodeMap[id].Start).Minutes()
 	}
 
 	// We now merge in data that doesn't have a provider id by looping over

+ 3 - 1
pkg/costmodel/cluster_helpers_test.go

@@ -1,11 +1,12 @@
 package costmodel
 
 import (
-	"github.com/kubecost/cost-model/pkg/config"
 	"reflect"
 	"testing"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/config"
+
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -687,6 +688,7 @@ func TestBuildNodeMap(t *testing.T) {
 				testCase.preemptibleMap,
 				testCase.labelsMap,
 				testCase.clusterAndNameToType,
+				time.Minute,
 			)
 
 			if !reflect.DeepEqual(result, testCase.expected) {

+ 16 - 2
pkg/costmodel/router.go

@@ -692,8 +692,15 @@ func (a *Accesses) PrometheusQuery(w http.ResponseWriter, r *http.Request, _ htt
 		return
 	}
 
+	// TODO test to make sure "time" does not get set, if not given
+
+	var ts time.Time
+	if qp.GetInt64("time", 0) > 0 {
+		ts = time.Unix(qp.GetInt64("time", 0), 0)
+	}
+
 	ctx := prom.NewNamedContext(a.PrometheusClient, prom.FrontendContextName)
-	body, err := ctx.RawQuery(query)
+	body, err := ctx.RawQuery(query, ts)
 	if err != nil {
 		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
 		return
@@ -745,8 +752,15 @@ func (a *Accesses) ThanosQuery(w http.ResponseWriter, r *http.Request, _ httprou
 		return
 	}
 
+	// TODO test to make sure "time" does not get set, if not given
+
+	var ts time.Time
+	if qp.GetInt64("time", 0) > 0 {
+		ts = time.Unix(qp.GetInt64("time", 0), 0)
+	}
+
 	ctx := prom.NewNamedContext(a.ThanosClient, prom.FrontendContextName)
-	body, err := ctx.RawQuery(query)
+	body, err := ctx.RawQuery(query, ts)
 	if err != nil {
 		w.Write(WrapData(nil, fmt.Errorf("Error running query %s. Error: %s", query, err)))
 		return

+ 34 - 16
pkg/prom/query.go

@@ -89,7 +89,19 @@ func (ctx *Context) ErrorCollection() error {
 func (ctx *Context) Query(query string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go runQuery(query, ctx, resCh, "")
+	go runQuery(query, ctx, resCh, time.Now(), "")
+
+	return resCh
+}
+
+// QueryWithTime returns a QueryResultsChan, then runs the given query at the
+// given time (see time parameter here: https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries)
+// and sends the results on the provided channel. Receiver is responsible for
+// closing the channel, preferably using the Read method.
+func (ctx *Context) QueryAtTime(query string, t time.Time) QueryResultsChan {
+	resCh := make(QueryResultsChan)
+
+	go runQuery(query, ctx, resCh, t, "")
 
 	return resCh
 }
@@ -100,7 +112,7 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 func (ctx *Context) ProfileQuery(query string, profileLabel string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go runQuery(query, ctx, resCh, profileLabel)
+	go runQuery(query, ctx, resCh, time.Now(), profileLabel)
 
 	return resCh
 }
@@ -134,7 +146,7 @@ func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
 }
 
 func (ctx *Context) QuerySync(query string) ([]*QueryResult, prometheus.Warnings, error) {
-	raw, warnings, err := ctx.query(query)
+	raw, warnings, err := ctx.query(query, time.Now())
 	if err != nil {
 		return nil, warnings, err
 	}
@@ -154,11 +166,11 @@ func (ctx *Context) QueryURL() *url.URL {
 
 // runQuery executes the prometheus query asynchronously, collects results and
 // errors, and passes them through the results channel.
-func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+func runQuery(query string, ctx *Context, resCh QueryResultsChan, t time.Time, profileLabel string) {
 	defer errors.HandlePanic()
 	startQuery := time.Now()
 
-	raw, warnings, requestError := ctx.query(query)
+	raw, warnings, requestError := ctx.query(query, t)
 	results := NewQueryResults(query, raw)
 
 	// report all warnings, request, and parse errors (nils will be ignored)
@@ -172,18 +184,24 @@ func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel s
 }
 
 // RawQuery is a direct query to the prometheus client and returns the body of the response
-func (ctx *Context) RawQuery(query string) ([]byte, error) {
+func (ctx *Context) RawQuery(query string, t time.Time) ([]byte, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)
 
-	// for non-range queries, we set the timestamp for the query to time-offset
-	// this is a special use case that's typically only used when our primary
-	// prom db has delayed insertion (thanos, cortex, etc...)
-	if promQueryOffset != 0 && ctx.name != AllocationContextName {
-		q.Set("time", time.Now().Add(-promQueryOffset).UTC().Format(time.RFC3339))
+	if !t.IsZero() {
+		// TODO remove log
+		log.Infof("[Prom] time=%s query=%s", strconv.FormatInt(t.Unix(), 10), query)
+		q.Set("time", strconv.FormatInt(t.Unix(), 10))
 	} else {
-		q.Set("time", time.Now().UTC().Format(time.RFC3339))
+		// for non-range queries, we set the timestamp for the query to time-offset
+		// this is a special use case that's typically only used when our primary
+		// prom db has delayed insertion (thanos, cortex, etc...)
+		if promQueryOffset != 0 && ctx.name != AllocationContextName {
+			q.Set("time", time.Now().Add(-promQueryOffset).UTC().Format(time.RFC3339))
+		} else {
+			q.Set("time", time.Now().UTC().Format(time.RFC3339))
+		}
 	}
 
 	u.RawQuery = q.Encode()
@@ -221,8 +239,8 @@ func (ctx *Context) RawQuery(query string) ([]byte, error) {
 	return body, err
 }
 
-func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error) {
-	body, err := ctx.RawQuery(query)
+func (ctx *Context) query(query string, t time.Time) (interface{}, prometheus.Warnings, error) {
+	body, err := ctx.RawQuery(query, t)
 	if err != nil {
 		return nil, nil, err
 	}
@@ -230,7 +248,7 @@ func (ctx *Context) query(query string) (interface{}, prometheus.Warnings, error
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, nil, fmt.Errorf("Unmarshal Error: %s\nQuery: %s", err, query)
+		return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
 	}
 
 	warnings := warningsFrom(toReturn)
@@ -354,7 +372,7 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, nil, fmt.Errorf("Unmarshal Error: %s\nQuery: %s", err, query)
+		return nil, nil, fmt.Errorf("query '%s' caused unmarshal error: %s", query, err)
 	}
 
 	warnings := warningsFrom(toReturn)