Просмотр исходного кода

Merge pull request #315 from kubecost/niko/idle

Return cumulative and rate data for cluster costs
Niko Kovacevic 6 лет назад
Родитель
Сommit
1d4e3cffd0
9 измененных файлов с 437 добавлено и 44 удалено
  1. 2 2
      cloud/awsprovider.go
  2. 2 2
      cloud/azureprovider.go
  3. 2 2
      cloud/customprovider.go
  4. 24 3
      cloud/gcpprovider.go
  5. 1 1
      cloud/provider.go
  6. 278 33
      costmodel/cluster.go
  7. 1 1
      costmodel/router.go
  8. 39 0
      util/errors.go
  9. 88 0
      util/time.go

+ 2 - 2
cloud/awsprovider.go

@@ -207,8 +207,8 @@ var regionToBillingRegionCode = map[string]string{
 	"us-gov-west-1":  "UGW1",
 }
 
-func (aws *AWS) GetLocalStorageQuery(offset string) (string, error) {
-	return "", nil
+func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool) string {
+	return ""
 }
 
 // KubeAttrConversion maps the k8s labels for region to an aws region

+ 2 - 2
cloud/azureprovider.go

@@ -620,6 +620,6 @@ func (az *Azure) PVPricing(PVKey) (*PV, error) {
 	return nil, nil
 }
 
-func (az *Azure) GetLocalStorageQuery(offset string) (string, error) {
-	return "", nil
+func (az *Azure) GetLocalStorageQuery(window, offset string, rate bool) string {
+	return ""
 }

+ 2 - 2
cloud/customprovider.go

@@ -37,8 +37,8 @@ type customProviderKey struct {
 	Labels         map[string]string
 }
 
-func (*CustomProvider) GetLocalStorageQuery(offset string) (string, error) {
-	return "", nil
+func (*CustomProvider) GetLocalStorageQuery(window, offset string, rate bool) string {
+	return ""
 }
 
 func (cp *CustomProvider) GetConfig() (*CustomPricing, error) {

+ 24 - 3
cloud/gcpprovider.go

@@ -81,9 +81,30 @@ func gcpAllocationToOutOfClusterAllocation(gcpAlloc gcpAllocation) *OutOfCluster
 	}
 }
 
-func (gcp *GCP) GetLocalStorageQuery(offset string) (string, error) {
-	localStorageCost := 0.04 // TODO: Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
-	return fmt.Sprintf(`sum(sum(container_fs_limit_bytes{device!="tmpfs", id="/"} %s) by (instance, cluster_id)) by (cluster_id) / 1024 / 1024 / 1024 * %f`, offset, localStorageCost), nil
+func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool) string {
+	// TODO Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
+	// See https://cloud.google.com/compute/disks-image-pricing#persistentdisk
+	localStorageCost := 0.04
+
+	fmtOffset := ""
+	if offset != "" {
+		fmtOffset = fmt.Sprintf("offset %s", offset)
+	}
+
+	fmtCumulativeQuery := `sum(
+		sum_over_time(container_fs_limit_bytes{device!="tmpfs", id="/"}[%s:1m]%s)
+	) by (cluster_id) / 60 / 730 / 1024 / 1024 / 1024 * %f`
+
+	fmtMonthlyQuery := `sum(
+		avg_over_time(container_fs_limit_bytes{device!="tmpfs", id="/"}[%s:1m]%s)
+	) by (cluster_id) / 1024 / 1024 / 1024 * %f`
+
+	fmtQuery := fmtCumulativeQuery
+	if rate {
+		fmtQuery = fmtMonthlyQuery
+	}
+
+	return fmt.Sprintf(fmtQuery, window, fmtOffset, localStorageCost)
 }
 
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {

+ 1 - 1
cloud/provider.go

@@ -168,7 +168,7 @@ type Provider interface {
 	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
-	GetLocalStorageQuery(offset string) (string, error)
+	GetLocalStorageQuery(string, string, bool) string
 	ExternalAllocations(string, string, string, string, string) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 }

+ 278 - 33
costmodel/cluster.go

@@ -3,11 +3,12 @@ package costmodel
 import (
 	"fmt"
 	"os"
+	"sync"
 	"time"
 
-	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
-	prometheusClient "github.com/prometheus/client_golang/api"
-
+	"github.com/kubecost/cost-model/cloud"
+	"github.com/kubecost/cost-model/util"
+	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
 
@@ -33,6 +34,257 @@ const (
 	  ) by (cluster_id) %s`
 )
 
+// TODO move this to a package-accessible helper
+type PromQueryContext struct {
+	client prometheus.Client
+	ec     *util.ErrorCollector
+	wg     *sync.WaitGroup
+}
+
+// TODO move this to a package-accessible helper function once dependencies are able to
+// be extricated from costmodel package (PromQueryResult -> Vector). Otherwise, circular deps.
+func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
+	if ctx.wg != nil {
+		defer ctx.wg.Done()
+	}
+
+	raw, promErr := Query(ctx.client, query)
+	ctx.ec.Report(promErr)
+
+	results, parseErr := NewQueryResults(raw)
+	ctx.ec.Report(parseErr)
+
+	resultCh <- results
+}
+
+// Costs represents cumulative and monthly cluster costs over a given duration. Costs
+// are broken down by cores, memory, and storage.
+type ClusterCosts struct {
+	Start             *time.Time `json:"startTime"`
+	End               *time.Time `json:"endTime"`
+	CPUCumulative     float64    `json:"cpuCumulativeCost"`
+	CPUMonthly        float64    `json:"cpuMonthlyCost"`
+	GPUCumulative     float64    `json:"gpuCumulativeCost"`
+	GPUMonthly        float64    `json:"gpuMonthlyCost"`
+	RAMCumulative     float64    `json:"ramCumulativeCost"`
+	RAMMonthly        float64    `json:"ramMonthlyCost"`
+	StorageCumulative float64    `json:"storageCumulativeCost"`
+	StorageMonthly    float64    `json:"storageMonthlyCost"`
+	TotalCumulative   float64    `json:"totalCumulativeCost"`
+	TotalMonthly      float64    `json:"totalMonthlyCost"`
+}
+
+// NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
+// the associated monthly rate data, and returns the Costs.
+func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	// If the number of hours is not given (i.e. is zero) compute one from the window and offset
+	if dataHours == 0 {
+		dataHours = end.Sub(*start).Hours()
+	}
+
+	// Do not allow zero-length windows to prevent divide-by-zero issues
+	if dataHours == 0 {
+		return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
+	}
+
+	cc := &ClusterCosts{
+		Start:             start,
+		End:               end,
+		CPUCumulative:     cpu,
+		GPUCumulative:     gpu,
+		RAMCumulative:     ram,
+		StorageCumulative: storage,
+		TotalCumulative:   cpu + gpu + ram + storage,
+		CPUMonthly:        cpu / dataHours * (util.HoursPerMonth),
+		GPUMonthly:        gpu / dataHours * (util.HoursPerMonth),
+		RAMMonthly:        ram / dataHours * (util.HoursPerMonth),
+		StorageMonthly:    storage / dataHours * (util.HoursPerMonth),
+	}
+	cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
+
+	return cc, nil
+}
+
+// NewClusterCostsFromMonthly takes monthly-rate cost data over a given time range, computes
+// the associated cumulative cost data, and returns the Costs.
+func NewClusterCostsFromMonthly(cpuMonthly, gpuMonthly, ramMonthly, storageMonthly float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	// If the number of hours is not given (i.e. is zero) compute one from the window and offset
+	if dataHours == 0 {
+		dataHours = end.Sub(*start).Hours()
+	}
+
+	// Do not allow zero-length windows to prevent divide-by-zero issues
+	if dataHours == 0 {
+		return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
+	}
+
+	cc := &ClusterCosts{
+		Start:             start,
+		End:               end,
+		CPUMonthly:        cpuMonthly,
+		GPUMonthly:        gpuMonthly,
+		RAMMonthly:        ramMonthly,
+		StorageMonthly:    storageMonthly,
+		TotalMonthly:      cpuMonthly + gpuMonthly + ramMonthly + storageMonthly,
+		CPUCumulative:     cpuMonthly / util.HoursPerMonth * dataHours,
+		GPUCumulative:     gpuMonthly / util.HoursPerMonth * dataHours,
+		RAMCumulative:     ramMonthly / util.HoursPerMonth * dataHours,
+		StorageCumulative: storageMonthly / util.HoursPerMonth * dataHours,
+	}
+	cc.TotalCumulative = cc.CPUCumulative + cc.GPUCumulative + cc.RAMCumulative + cc.StorageCumulative
+
+	return cc, nil
+}
+
+// ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
+func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
+	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+	mins := end.Sub(*start).Minutes()
+
+	const fmtQueryDataCount = `max(count_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s))`
+
+	const fmtQueryTotalGPU = `sum(
+		sum_over_time(node_gpu_hourly_cost[%s:1m]%s) / 60
+	) by (node, cluster_id)`
+
+	const fmtQueryTotalCPU = `sum(
+		sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s)) by (node, cluster_id) *
+		avg(avg_over_time(node_cpu_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
+	)`
+
+	const fmtQueryTotalRAM = `sum(
+		sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
+		avg(avg_over_time(node_ram_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
+	)`
+
+	const fmtQueryTotalStorage = `sum(
+		sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1m]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
+		avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
+	)%s`
+
+	queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false)
+	if queryTotalLocalStorage != "" {
+		queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
+	}
+
+	fmtOffset := ""
+	if offset != "" {
+		fmtOffset = fmt.Sprintf("offset %s", offset)
+	}
+
+	queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, fmtOffset)
+	queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, fmtOffset)
+	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
+	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
+	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
+	numQueries := 5
+
+	klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
+	klog.V(4).Infof("[Debug] queryTotalGPU: %s", queryTotalGPU)
+	klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
+	klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
+	klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
+
+	// Submit queries to Prometheus asynchronously
+	var ec util.ErrorCollector
+	var wg sync.WaitGroup
+	ctx := PromQueryContext{client, &ec, &wg}
+	ctx.wg.Add(numQueries)
+
+	chDataCount := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryDataCount, chDataCount, ctx)
+
+	chTotalGPU := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalGPU, chTotalGPU, ctx)
+
+	chTotalCPU := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
+
+	chTotalRAM := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
+
+	chTotalStorage := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
+
+	// After queries complete, retrieve results
+	wg.Wait()
+
+	resultsDataCount := <-chDataCount
+	close(chDataCount)
+
+	resultsTotalGPU := <-chTotalGPU
+	close(chTotalGPU)
+
+	resultsTotalCPU := <-chTotalCPU
+	close(chTotalCPU)
+
+	resultsTotalRAM := <-chTotalRAM
+	close(chTotalRAM)
+
+	resultsTotalStorage := <-chTotalStorage
+	close(chTotalStorage)
+
+	dataMins := mins
+	if len(resultsDataCount) > 0 && len(resultsDataCount[0].Values) > 0 {
+		dataMins = resultsDataCount[0].Values[0].Value
+	} else {
+		klog.V(3).Infof("[Warning] cluster cost data count returned no results")
+	}
+
+	// Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
+	costData := make(map[string]map[string]float64)
+	defaultClusterID := os.Getenv(clusterIDKey)
+
+	// Helper function to iterate over Prom query results, parsing the raw values into
+	// the intermediate costData structure.
+	setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string) {
+		for _, result := range results {
+			clusterID, _ := result.GetString("cluster_id")
+			if clusterID == "" {
+				clusterID = defaultClusterID
+			}
+			if _, ok := costData[clusterID]; !ok {
+				costData[clusterID] = map[string]float64{}
+			}
+			if len(result.Values) > 0 {
+				costData[clusterID][name] += result.Values[0].Value
+				costData[clusterID]["total"] += result.Values[0].Value
+			}
+		}
+	}
+	setCostsFromResults(costData, resultsTotalGPU, "gpu")
+	setCostsFromResults(costData, resultsTotalCPU, "cpu")
+	setCostsFromResults(costData, resultsTotalRAM, "ram")
+	setCostsFromResults(costData, resultsTotalStorage, "storage")
+
+	// Convert intermediate structure to Costs instances
+	costsByCluster := map[string]*ClusterCosts{}
+	for id, cd := range costData {
+		costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"], window, offset, dataMins/util.MinsPerHour)
+		if err != nil {
+			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
+			return nil, err
+		}
+		costsByCluster[id] = costs
+	}
+
+	return costsByCluster, nil
+}
+
 type Totals struct {
 	TotalCost   [][]string `json:"totalcost"`
 	CPUCost     [][]string `json:"cpucost"`
@@ -103,34 +355,33 @@ func resultToTotal(qr interface{}) (map[string][][]string, error) {
 }
 
 // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
-func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
-
-	if offset != "" {
-		offset = fmt.Sprintf("offset %s", offset)
-	}
-
-	localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
-	if err != nil {
-		return nil, err
-	}
+func ClusterCostsForAllClusters(cli prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*Totals, error) {
+	localStorageQuery := provider.GetLocalStorageQuery(window, offset, true)
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
-	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
+	fmtOffset := ""
+	if offset != "" {
+		fmtOffset = fmt.Sprintf("offset %s", offset)
+	}
+
+	qCores := fmt.Sprintf(queryClusterCores, window, fmtOffset, window, fmtOffset, window, fmtOffset)
+	qRAM := fmt.Sprintf(queryClusterRAM, window, fmtOffset, window, fmtOffset)
+	qStorage := fmt.Sprintf(queryStorage, window, fmtOffset, window, fmtOffset, localStorageQuery)
 
 	klog.V(4).Infof("Running query %s", qCores)
 	resultClusterCores, err := Query(cli, qCores)
 	if err != nil {
 		return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
 	}
+
 	klog.V(4).Infof("Running query %s", qRAM)
 	resultClusterRAM, err := Query(cli, qRAM)
 	if err != nil {
 		return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
 	}
+
 	klog.V(4).Infof("Running query %s", qRAM)
 	resultStorage, err := Query(cli, qStorage)
 	if err != nil {
@@ -175,25 +426,23 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
 	return toReturn, nil
 }
 
-// ClusterCosts gives the current full cluster costs averaged over a window of time.
-func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
-
+// AverageClusterTotals gives the current full cluster costs averaged over a window of time.
+// Used to be ClutserCosts, but has been deprecated for that use.
+func AverageClusterTotals(cli prometheus.Client, provider cloud.Provider, windowString, offset string) (*Totals, error) {
 	// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
+	fmtOffset := ""
 	if offset != "" {
-		offset = fmt.Sprintf("offset %s", offset)
+		fmtOffset = fmt.Sprintf("offset %s", offset)
 	}
 
-	localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
-	if err != nil {
-		return nil, err
-	}
+	localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
-	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, fmtOffset, windowString, fmtOffset, windowString, fmtOffset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, fmtOffset, windowString, fmtOffset)
+	qStorage := fmt.Sprintf(queryStorage, windowString, fmtOffset, windowString, fmtOffset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
 	resultClusterCores, err := Query(cli, qCores)
@@ -246,12 +495,8 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 }
 
 // ClusterCostsOverTime gives the full cluster costs over time
-func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
-
-	localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
-	if err != nil {
-		return nil, err
-	}
+func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
+	localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}

+ 1 - 1
costmodel/router.go

@@ -317,7 +317,7 @@ func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httpr
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 
-	data, err := ClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
+	data, err := ComputeClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
 	w.Write(WrapData(data, err))
 }
 

+ 39 - 0
util/errors.go

@@ -0,0 +1,39 @@
+package util
+
+import "sync"
+
+// Error collection helper
+type ErrorCollector struct {
+	m      sync.Mutex
+	errors []error
+}
+
+// Reports an error to the collector. Ignores if the error is nil.
+func (ec *ErrorCollector) Report(e error) {
+	if e == nil {
+		return
+	}
+
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	ec.errors = append(ec.errors, e)
+}
+
+// Whether or not the collector caught errors
+func (ec *ErrorCollector) IsError() bool {
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	return len(ec.errors) > 0
+}
+
+// Errors caught by the collector
+func (ec *ErrorCollector) Errors() []error {
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	errs := make([]error, len(ec.errors))
+	copy(errs, ec.errors)
+	return errs
+}

+ 88 - 0
util/time.go

@@ -0,0 +1,88 @@
+package util
+
+import (
+	"fmt"
+	"strconv"
+	"time"
+)
+
+const (
+	MinsPerHour   = 60.0
+	HoursPerDay   = 24.0
+	HoursPerMonth = 730.0
+	DaysPerMonth  = 30.42
+)
+
+// ParseDuration converts a Prometheus-style duration string into a Duration
+func ParseDuration(duration string) (*time.Duration, error) {
+	unitStr := duration[len(duration)-1:]
+	var unit time.Duration
+	switch unitStr {
+	case "s":
+		unit = time.Second
+	case "m":
+		unit = time.Minute
+	case "h":
+		unit = time.Hour
+	case "d":
+		unit = 24.0 * time.Hour
+	default:
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	amountStr := duration[:len(duration)-1]
+	amount, err := strconv.ParseInt(amountStr, 10, 64)
+	if err != nil {
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	dur := time.Duration(amount) * unit
+	return &dur, nil
+}
+
+// ParseTimeRange returns a start and end time, respectively, which are converted from
+// a duration and offset, defined as strings with Prometheus-style syntax.
+func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
+	// endTime defaults to the current time, unless an offset is explicity declared,
+	// in which case it shifts endTime back by given duration
+	endTime := time.Now()
+	if offset != "" {
+		o, err := ParseDuration(offset)
+		if err != nil {
+			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
+		}
+		endTime = endTime.Add(-1 * *o)
+	}
+
+	// if duration is defined in terms of days, convert to hours
+	// e.g. convert "2d" to "48h"
+	durationNorm, err := normalizeTimeParam(duration)
+	if err != nil {
+		return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
+	}
+
+	// convert time duration into start and end times, formatted
+	// as ISO datetime strings
+	dur, err := time.ParseDuration(durationNorm)
+	if err != nil {
+		return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
+	}
+	startTime := endTime.Add(-1 * dur)
+
+	return &startTime, &endTime, nil
+}
+
+func normalizeTimeParam(param string) (string, error) {
+	// convert days to hours
+	if param[len(param)-1:] == "d" {
+		count := param[:len(param)-1]
+		val, err := strconv.ParseInt(count, 10, 64)
+		if err != nil {
+			return "", err
+		}
+		val = val * 24
+		param = fmt.Sprintf("%dh", val)
+	}
+
+	return param, nil
+}