فهرست منبع

Merge pull request #1051 from kubecost/bolt/rate-limit-response-handling

Handle Rate Limit Responses in Prom Client
Matt Bolt 4 سال پیش
والد
کامیت
53fce803e4
12فایلهای تغییر یافته به همراه949 افزوده شده و 84 حذف شده
  1. 2 1
      go.mod
  2. 0 10
      go.sum
  3. 23 1
      pkg/cmd/agent/agent.go
  4. 37 2
      pkg/costmodel/router.go
  5. 23 1
      pkg/env/costmodelenv.go
  6. 12 0
      pkg/env/env.go
  7. 154 32
      pkg/prom/prom.go
  8. 382 0
      pkg/prom/ratelimitedclient_test.go
  9. 19 12
      pkg/thanos/thanos.go
  10. 65 0
      pkg/util/httputil/httputil.go
  11. 220 25
      pkg/util/timeutil/timeutil.go
  12. 12 0
      pkg/util/timeutil/timeutil_test.go

+ 2 - 1
go.mod

@@ -6,7 +6,6 @@ require (
 	cloud.google.com/go v0.81.0
 	cloud.google.com/go/bigquery v1.8.0
 	github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
-	github.com/Azure/azure-storage-blob-go v0.13.0
 	github.com/Azure/go-autorest/autorest v0.11.17
 	github.com/Azure/go-autorest/autorest/azure/auth v0.5.6
 	github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
@@ -22,6 +21,7 @@ require (
 	github.com/lib/pq v1.2.0
 	github.com/microcosm-cc/bluemonday v1.0.5
 	github.com/minio/minio-go/v7 v7.0.15
+	github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
 	github.com/patrickmn/go-cache v2.1.0+incompatible
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.0.0
@@ -34,6 +34,7 @@ require (
 	golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
 	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
 	google.golang.org/api v0.44.0
+	gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
 	gopkg.in/yaml.v2 v2.4.0
 	k8s.io/api v0.20.4
 	k8s.io/apimachinery v0.20.4

+ 0 - 10
go.sum

@@ -41,19 +41,14 @@ cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
-github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
-github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
-github.com/Azure/azure-storage-blob-go v0.13.0 h1:lgWHvFh+UYBNVQLFHXkvul2f6yOPA9PIH82RTG2cSwc=
-github.com/Azure/azure-storage-blob-go v0.13.0/go.mod h1:pA9kNqtjUeQF2zOSu4s//nUdBD+e64lEuc4sVnuOfNs=
 github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
 github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
 github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
 github.com/Azure/go-autorest/autorest v0.11.17 h1:2zCdHwNgRH+St1J+ZMf66xI8aLr/5KMy+wWLH97zwYM=
 github.com/Azure/go-autorest/autorest v0.11.17/go.mod h1:eipySxLmqSyC5s5k1CLupqet0PSENBEDP93LQ9a8QYw=
 github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
-github.com/Azure/go-autorest/autorest/adal v0.9.2/go.mod h1:/3SMAM86bP6wC9Ev35peQDUeqFZBMH07vvUOmg4z/fE=
 github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
 github.com/Azure/go-autorest/autorest/adal v0.9.10 h1:r6fZHMaHD8B6LDCn0o5vyBFHIHrM6Ywwx7mb49lPItI=
 github.com/Azure/go-autorest/autorest/adal v0.9.10/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
@@ -368,8 +363,6 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
 github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
 github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
-github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
-github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
 github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
 github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
@@ -611,7 +604,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -682,7 +674,6 @@ golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -699,7 +690,6 @@ golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200828194041-157a740278f4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

+ 23 - 1
pkg/cmd/agent/agent.go

@@ -85,8 +85,30 @@ func newPrometheusClient() (prometheus.Client, error) {
 
 	timeout := 120 * time.Second
 	keepAlive := 120 * time.Second
+	tlsHandshakeTimeout := 10 * time.Second
 
-	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
+	if env.IsPrometheusRetryOnRateLimitResponse() {
+		rateLimitRetryOpts = &prom.RateLimitRetryOpts{
+			MaxRetries:       env.GetPrometheusRetryOnRateLimitMaxRetries(),
+			DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
+		}
+	}
+
+	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
+		Auth: &prom.ClientAuth{
+			Username:    env.GetDBBasicAuthUsername(),
+			Password:    env.GetDBBasicAuthUserPassword(),
+			BearerToken: env.GetDBBearerToken(),
+		},
+		QueryConcurrency: queryConcurrency,
+		QueryLogFile:     "",
+	})
 	if err != nil {
 		return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
 	}

+ 37 - 2
pkg/costmodel/router.go

@@ -1342,9 +1342,31 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 
 	timeout := 120 * time.Second
 	keepAlive := 120 * time.Second
+	tlsHandshakeTimeout := 10 * time.Second
 	scrapeInterval := time.Minute
 
-	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
+	if env.IsPrometheusRetryOnRateLimitResponse() {
+		rateLimitRetryOpts = &prom.RateLimitRetryOpts{
+			MaxRetries:       env.GetPrometheusRetryOnRateLimitMaxRetries(),
+			DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
+		}
+	}
+
+	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
+		Auth: &prom.ClientAuth{
+			Username:    env.GetDBBasicAuthUsername(),
+			Password:    env.GetDBBasicAuthUserPassword(),
+			BearerToken: env.GetDBBearerToken(),
+		},
+		QueryConcurrency: queryConcurrency,
+		QueryLogFile:     "",
+	})
 	if err != nil {
 		klog.Fatalf("Failed to create prometheus client, Error: %v", err)
 	}
@@ -1452,7 +1474,20 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		thanosAddress := thanos.QueryURL()
 
 		if thanosAddress != "" {
-			thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
+			thanosCli, _ := thanos.NewThanosClient(thanosAddress, &prom.PrometheusClientConfig{
+				Timeout:               timeout,
+				KeepAlive:             keepAlive,
+				TLSHandshakeTimeout:   tlsHandshakeTimeout,
+				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+				RateLimitRetryOpts:    rateLimitRetryOpts,
+				Auth: &prom.ClientAuth{
+					Username:    env.GetMultiClusterBasicAuthUsername(),
+					Password:    env.GetMultiClusterBasicAuthPassword(),
+					BearerToken: env.GetMultiClusterBearerToken(),
+				},
+				QueryConcurrency: queryConcurrency,
+				QueryLogFile:     env.GetQueryLoggingFile(),
+			})
 
 			_, err = prom.Validate(thanosCli)
 			if err != nil {

+ 23 - 1
pkg/env/costmodelenv.go

@@ -81,7 +81,11 @@ const (
 	KubecostConfigBucketEnvVar    = "KUBECOST_CONFIG_BUCKET"
 	ClusterInfoFileEnabledEnvVar  = "CLUSTER_INFO_FILE_ENABLED"
 	ClusterCacheFileEnabledEnvVar = "CLUSTER_CACHE_FILE_ENABLED"
-	PrometheusQueryOffsetEnvVar   = "PROMETHEUS_QUERY_OFFSET"
+
+	PrometheusQueryOffsetEnvVar                 = "PROMETHEUS_QUERY_OFFSET"
+	PrometheusRetryOnRateLimitResponseEnvVar    = "PROMETHEUS_RETRY_ON_RATE_LIMIT"
+	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
+	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 )
 
 // GetKubecostConfigBucket returns a file location for a mounted bucket configuration which is used to store
@@ -102,6 +106,24 @@ func IsClusterCacheFileEnabled() bool {
 	return GetBool(ClusterCacheFileEnabledEnvVar, false)
 }
 
+// IsPrometheusRetryOnRateLimitResponse will attempt to retry if a 429 response is received OR a 400 with a body containing
+// ThrottleException (common in AWS services like AMP)
+func IsPrometheusRetryOnRateLimitResponse() bool {
+	return GetBool(PrometheusRetryOnRateLimitResponseEnvVar, true)
+}
+
+// GetPrometheusRetryOnRateLimitMaxRetries returns the maximum number of retries that should be attempted prior to failing.
+// Only used if IsPrometheusRetryOnRateLimitResponse() is true.
+func GetPrometheusRetryOnRateLimitMaxRetries() int {
+	return GetInt(PrometheusRetryOnRateLimitMaxRetriesEnvVar, 5)
+}
+
+// GetPrometheusRetryOnRateLimitDefaultWait returns the default wait time for a retriable rate limit response without a
+// Retry-After header.
+func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
+	return GetDuration(PrometheusRetryOnRateLimitDefaultWaitEnvVar, 100*time.Millisecond)
+}
+
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
 // to all non-range queries made via our query context. This should only be applied when there is a significant delay in
 // data arriving in the target prom db. For example, if supplying a thanos or cortex querier for the prometheus server, using

+ 12 - 0
pkg/env/env.go

@@ -2,6 +2,7 @@ package env
 
 import (
 	"os"
+	"time"
 
 	"github.com/kubecost/cost-model/pkg/util/mapper"
 )
@@ -116,6 +117,12 @@ func GetBool(key string, defaultValue bool) bool {
 	return envMapper.GetBool(key, defaultValue)
 }
 
+// GetDuration parses a time.Duration from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetDuration(key string, defaultValue time.Duration) time.Duration {
+	return envMapper.GetDuration(key, defaultValue)
+}
+
 // Set sets the environment variable for the key provided using the value provided.
 func Set(key string, value string) error {
 	return envMapper.Set(key, value)
@@ -175,3 +182,8 @@ func SetUInt64(key string, value uint64) error {
 func SetBool(key string, value bool) error {
 	return envMapper.SetBool(key, value)
 }
+
+// SetDuration sets the environment variable to a string formatted time.Duration
+func SetDuration(key string, value time.Duration) error {
+	return envMapper.SetDuration(key, value)
+}

+ 154 - 32
pkg/prom/prom.go

@@ -3,14 +3,15 @@ package prom
 import (
 	"context"
 	"crypto/tls"
+	"fmt"
 	"net"
 	"net/http"
 	"net/url"
 	"os"
+	"strings"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/collections"
-	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util/atomic"
 	"github.com/kubecost/cost-model/pkg/util/fileutil"
@@ -56,6 +57,53 @@ func (auth *ClientAuth) Apply(req *http.Request) {
 	}
 }
 
+//--------------------------------------------------------------------------
+//  Rate Limit Options
+//--------------------------------------------------------------------------
+
+// MaxRetryAfterDuration is the maximum amount of time we should ever wait
+// during a retry. This is to prevent starvation on the request threads
+const MaxRetryAfterDuration = 10 * time.Second
+
+// RateLimitRetryOpts contains retry options
+type RateLimitRetryOpts struct {
+	MaxRetries       int
+	DefaultRetryWait time.Duration
+}
+
+// RateLimitResponseStatus contains the status of the rate limited retries
+type RateLimitResponseStatus struct {
+	RetriesRemaining int
+	WaitTime         time.Duration
+}
+
+// String creates a string representation of the rate limit status
+func (rtrs *RateLimitResponseStatus) String() string {
+	return fmt.Sprintf("Wait Time: %.2f seconds, Retries Remaining: %d", rtrs.WaitTime.Seconds(), rtrs.RetriesRemaining)
+}
+
+// RateLimitedError contains a list of retry statuses that occurred during
+// retries on a rate limited response
+type RateLimitedResponseError struct {
+	RateLimitStatus []*RateLimitResponseStatus
+}
+
+// Error returns a string representation of the error, including the rate limit
+// status reports
+func (rlre *RateLimitedResponseError) Error() string {
+	var sb strings.Builder
+
+	sb.WriteString("Request was Rate Limited and Retries Exhausted:\n")
+
+	for _, rls := range rlre.RateLimitStatus {
+		sb.WriteString(" * ")
+		sb.WriteString(rls.String())
+		sb.WriteString("\n")
+	}
+
+	return sb.String()
+}
+
 //--------------------------------------------------------------------------
 //  RateLimitedPrometheusClient
 //--------------------------------------------------------------------------
@@ -63,13 +111,14 @@ func (auth *ClientAuth) Apply(req *http.Request) {
 // RateLimitedPrometheusClient is a prometheus client which limits the total number of
 // concurrent outbound requests allowed at a given moment.
 type RateLimitedPrometheusClient struct {
-	id         string
-	client     prometheus.Client
-	auth       *ClientAuth
-	queue      collections.BlockingQueue
-	decorator  QueryParamsDecorator
-	outbound   *atomic.AtomicInt32
-	fileLogger *golog.Logger
+	id             string
+	client         prometheus.Client
+	auth           *ClientAuth
+	queue          collections.BlockingQueue
+	decorator      QueryParamsDecorator
+	rateLimitRetry *RateLimitRetryOpts
+	outbound       *atomic.AtomicInt32
+	fileLogger     *golog.Logger
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -81,11 +130,14 @@ type requestCounter interface {
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
-func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, decorator QueryParamsDecorator, queryLogFile string) (prometheus.Client, error) {
-	c, err := prometheus.NewClient(config)
-	if err != nil {
-		return nil, err
-	}
+func NewRateLimitedClient(
+	id string,
+	client prometheus.Client,
+	maxConcurrency int,
+	auth *ClientAuth,
+	decorator QueryParamsDecorator,
+	rateLimitRetryOpts *RateLimitRetryOpts,
+	queryLogFile string) (prometheus.Client, error) {
 
 	queue := collections.NewBlockingQueue()
 	outbound := atomic.NewAtomicInt32(0)
@@ -105,14 +157,24 @@ func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency in
 		}
 	}
 
+	// default authentication
+	if auth == nil {
+		auth = &ClientAuth{
+			Username:    "",
+			Password:    "",
+			BearerToken: "",
+		}
+	}
+
 	rlpc := &RateLimitedPrometheusClient{
-		id:         id,
-		client:     c,
-		queue:      queue,
-		decorator:  decorator,
-		outbound:   outbound,
-		auth:       auth,
-		fileLogger: logger,
+		id:             id,
+		client:         client,
+		queue:          queue,
+		decorator:      decorator,
+		rateLimitRetry: rateLimitRetryOpts,
+		outbound:       outbound,
+		auth:           auth,
+		fileLogger:     logger,
 	}
 
 	// Start concurrent request processing
@@ -168,6 +230,9 @@ type workResponse struct {
 
 // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
 func (rlpc *RateLimitedPrometheusClient) worker() {
+	retryOpts := rlpc.rateLimitRetry
+	retryRateLimit := retryOpts != nil
+
 	for {
 		// blocks until there is an item available
 		item := rlpc.queue.Dequeue()
@@ -198,6 +263,43 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 			roundTripStart := time.Now()
 			res, body, warnings, err := rlpc.client.Do(ctx, req)
 
+			// If retries on rate limited response is enabled:
+			// * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
+			// * Attempt to parse a Retry-After from response headers (common on 429)
+			// * If we couldn't determine how long to wait for a retry, use 1 second by default
+			if retryRateLimit {
+				var status []*RateLimitResponseStatus
+				var retries int = retryOpts.MaxRetries
+				var defaultWait time.Duration = retryOpts.DefaultRetryWait
+
+				for httputil.IsRateLimited(res, body) && retries > 0 {
+					// calculate amount of time to wait before retry, in the event the default wait is used,
+					// an exponential backoff is applied based on the number of times we've retried.
+					retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
+					retries--
+
+					status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
+					log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
+
+					// To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
+					// to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
+					// don't want to block for 10 days.
+					if retryAfter > MaxRetryAfterDuration {
+						retryAfter = MaxRetryAfterDuration
+					}
+
+					// execute wait and retry
+					time.Sleep(retryAfter)
+					res, body, warnings, err = rlpc.client.Do(ctx, req)
+				}
+
+				// if we've broken out of our retry loop and the resp is still rate limited,
+				// then let's generate a meaningful error to pass back
+				if retries == 0 && httputil.IsRateLimited(res, body) {
+					err = &RateLimitedResponseError{RateLimitStatus: status}
+				}
+			}
+
 			// Decrement outbound counter
 			rlpc.outbound.Decrement()
 			LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
@@ -245,30 +347,50 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 //  Client Helpers
 //--------------------------------------------------------------------------
 
-func NewPrometheusClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
-	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
+// PrometheusClientConfig contains all configurable options for creating a new prometheus client
+type PrometheusClientConfig struct {
+	Timeout               time.Duration
+	KeepAlive             time.Duration
+	TLSHandshakeTimeout   time.Duration
+	TLSInsecureSkipVerify bool
+	RateLimitRetryOpts    *RateLimitRetryOpts
+	Auth                  *ClientAuth
+	QueryConcurrency      int
+	QueryLogFile          string
+}
 
-	// may be necessary for long prometheus queries. TODO: make this configurable
+// NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
+func NewPrometheusClient(address string, config *PrometheusClientConfig) (prometheus.Client, error) {
+	// may be necessary for long prometheus queries
 	pc := prometheus.Config{
 		Address: address,
 		RoundTripper: &http.Transport{
 			Proxy: http.ProxyFromEnvironment,
 			DialContext: (&net.Dialer{
-				Timeout:   timeout,
-				KeepAlive: keepAlive,
+				Timeout:   config.Timeout,
+				KeepAlive: config.KeepAlive,
 			}).DialContext,
-			TLSHandshakeTimeout: 10 * time.Second,
-			TLSClientConfig:     tlsConfig,
+			TLSHandshakeTimeout: config.TLSHandshakeTimeout,
+			TLSClientConfig: &tls.Config{
+				InsecureSkipVerify: config.TLSInsecureSkipVerify,
+			},
 		},
 	}
 
-	auth := &ClientAuth{
-		Username:    env.GetDBBasicAuthUsername(),
-		Password:    env.GetDBBasicAuthUserPassword(),
-		BearerToken: env.GetDBBearerToken(),
+	client, err := prometheus.NewClient(pc)
+	if err != nil {
+		return nil, err
 	}
 
-	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, nil, queryLogFile)
+	return NewRateLimitedClient(
+		PrometheusClientID,
+		client,
+		config.QueryConcurrency,
+		config.Auth,
+		nil,
+		config.RateLimitRetryOpts,
+		config.QueryLogFile,
+	)
 }
 
 // LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent

+ 382 - 0
pkg/prom/ratelimitedclient_test.go

@@ -0,0 +1,382 @@
+package prom
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"math"
+	"net/http"
+
+	"net/url"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/httputil"
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+// ResponseAndBody is just a test objet used to hold predefined responses
+// and response bodies
+type ResponseAndBody struct {
+	Response *http.Response
+	Body     []byte
+}
+
+// MockPromClient accepts a slice of responses and bodies to return on requests made.
+// It will cycle these responses linearly, then reset back to the first.
+// Also works with concurrent requests.
+type MockPromClient struct {
+	sync.Mutex
+	responses []*ResponseAndBody
+	current   int
+}
+
+// prometheus.Client URL()
+func (mpc *MockPromClient) URL(ep string, args map[string]string) *url.URL {
+	return nil
+}
+
+// prometheus.Client Do
+func (mpc *MockPromClient) Do(context.Context, *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
+	// fake latency
+	time.Sleep(250 * time.Millisecond)
+
+	mpc.Lock()
+	defer mpc.Unlock()
+	rnb := mpc.responses[mpc.current]
+	mpc.current++
+	if mpc.current >= len(mpc.responses) {
+		mpc.current = 0
+	}
+
+	return rnb.Response, rnb.Body, nil, nil
+}
+
+// Creates a new mock prometheus client
+func newMockPromClientWith(responses []*ResponseAndBody) prometheus.Client {
+	return &MockPromClient{
+		responses: responses,
+		current:   0,
+	}
+}
+
+// creates a ResponseAndBody representing a 200 status code
+func newSuccessfulResponse() *ResponseAndBody {
+	body := []byte("Success")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 200,
+			Body:       io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+// creates a ResponseAndBody representing a 400 status code
+func newFailureResponse() *ResponseAndBody {
+	body := []byte("Fail")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 400,
+			Body:       io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+// creates a ResponseAndBody representing a 429 status code and 'Retry-After' header
+func newNormalRateLimitedResponse(retryAfter string) *ResponseAndBody {
+	body := []byte("Rate Limitted")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 429,
+			Header: http.Header{
+				"Retry-After": []string{retryAfter},
+			},
+			Body: io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+// creates a ResponseAndBody representing some amazon services ThrottlingException 400 status
+func newHackyAmazonRateLimitedResponse() *ResponseAndBody {
+	body := []byte("<ThrottlingException>\n  <Message>Rate exceeded</Message>\n</ThrottlingException>\n")
+
+	return &ResponseAndBody{
+		Response: &http.Response{
+			StatusCode: 400,
+			Body:       io.NopCloser(bytes.NewReader(body)),
+		},
+		Body: body,
+	}
+}
+
+func newTestRetryOpts() *RateLimitRetryOpts {
+	return &RateLimitRetryOpts{
+		MaxRetries:       5,
+		DefaultRetryWait: 100 * time.Millisecond,
+	}
+}
+
+func TestRateLimitedOnceAndSuccess(t *testing.T) {
+	t.Parallel()
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newSuccessfulResponse(),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		1,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req, err := http.NewRequest(http.MethodPost, "", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// we just need to execute this  once to see retries in effect
+	res, body, _, err := client.Do(context.Background(), req)
+
+	if res.StatusCode != 200 {
+		t.Fatalf("200 StatusCode expected. Got: %d", res.StatusCode)
+	}
+
+	if string(body) != "Success" {
+		t.Fatalf("Expected 'Success' message body. Got: %s", string(body))
+	}
+}
+
+func TestRateLimitedOnceAndFail(t *testing.T) {
+	t.Parallel()
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newFailureResponse(),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		1,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req, err := http.NewRequest(http.MethodPost, "", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// we just need to execute this  once to see retries in effect
+	res, body, _, err := client.Do(context.Background(), req)
+
+	if res.StatusCode != 400 {
+		t.Fatalf("400 StatusCode expected. Got: %d", res.StatusCode)
+	}
+
+	if string(body) != "Fail" {
+		t.Fatalf("Expected 'fail' message body. Got: %s", string(body))
+	}
+}
+
+func TestRateLimitedResponses(t *testing.T) {
+	t.Parallel()
+
+	dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newNormalRateLimitedResponse(dateRetry),
+		newHackyAmazonRateLimitedResponse(),
+		newHackyAmazonRateLimitedResponse(),
+		newNormalRateLimitedResponse("3"),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		1,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req, err := http.NewRequest(http.MethodPost, "", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// we just need to execute this  once to see retries in effect
+	_, _, _, err = client.Do(context.Background(), req)
+
+	if err == nil {
+		t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
+	}
+
+	rateLimitErr, ok := err.(*RateLimitedResponseError)
+	if !ok {
+		t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
+	}
+
+	t.Logf("%s\n", rateLimitErr.Error())
+
+	// RateLimitedResponseStatus checks just ensure that wait times were close configuration
+	rateLimitRetries := rateLimitErr.RateLimitStatus
+
+	if len(rateLimitRetries) != 5 {
+		t.Fatalf("Expected 5 retries. Got: %d", len(rateLimitRetries))
+	}
+
+	// check 2s wait after
+	seconds := rateLimitRetries[0].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 2.0) {
+		t.Fatalf("Expected 2.0 seconds. Got %.2f", seconds)
+	}
+
+	// check to see if fuzzed wait time for datetime parsing
+	seconds = rateLimitRetries[1].WaitTime.Seconds()
+	if math.Abs(seconds-2.0) > 3.0 {
+		t.Fatalf("Expected delta between 2s and resulting wait time to be within 3s. Seconds: %.2f, Delta: %.2f", seconds, math.Abs(seconds-2.0))
+	}
+
+	// check 1s wait
+	seconds = rateLimitRetries[2].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 0.4) {
+		t.Fatalf("Expected 0.4 seconds. Got %.2f", seconds)
+	}
+
+	// check 1s wait
+	seconds = rateLimitRetries[3].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 0.8) {
+		t.Fatalf("Expected 0.8 seconds. Got %.2f", seconds)
+	}
+
+	// check 3s wait
+	seconds = rateLimitRetries[4].WaitTime.Seconds()
+	if !util.IsApproximately(seconds, 3.0) {
+		t.Fatalf("Expected 3.0 seconds. Got %.2f", seconds)
+	}
+
+}
+
+//
+func AssertDurationEqual(t *testing.T, expected, actual time.Duration) {
+	if actual != expected {
+		t.Fatalf("Expected: %dms, Got: %dms", expected.Milliseconds(), actual.Milliseconds())
+	}
+}
+
+func TestExponentialBackOff(t *testing.T) {
+	var ExpectedResults = []time.Duration{
+		100 * time.Millisecond,
+		200 * time.Millisecond,
+		400 * time.Millisecond,
+		800 * time.Millisecond,
+		1600 * time.Millisecond,
+	}
+
+	w := 100 * time.Millisecond
+
+	for retry := 0; retry < 5; retry++ {
+		AssertDurationEqual(t, ExpectedResults[retry], httputil.ExponentialBackoffWaitFor(w, retry))
+	}
+}
+
+func TestConcurrentRateLimiting(t *testing.T) {
+	t.Parallel()
+
+	// Set QueryConcurrency to 3 here, then add a few for total requests
+	const QueryConcurrency = 3
+	const TotalRequests = QueryConcurrency + 2
+
+	dateRetry := time.Now().Add(5 * time.Second).Format(time.RFC1123)
+
+	// creates a prom client with hard coded responses for any requests that
+	// are issued
+	promClient := newMockPromClientWith([]*ResponseAndBody{
+		newNormalRateLimitedResponse("2"),
+		newNormalRateLimitedResponse(dateRetry),
+		newHackyAmazonRateLimitedResponse(),
+		newHackyAmazonRateLimitedResponse(),
+		newNormalRateLimitedResponse("3"),
+	})
+
+	client, err := NewRateLimitedClient(
+		"TestClient",
+		promClient,
+		QueryConcurrency,
+		nil,
+		nil,
+		newTestRetryOpts(),
+		"",
+	)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	errs := make(chan error, TotalRequests)
+
+	for i := 0; i < TotalRequests; i++ {
+		go func() {
+			req, err := http.NewRequest(http.MethodPost, "", nil)
+			if err != nil {
+				errs <- err
+				return
+			}
+
+			// we just need to execute this  once to see retries in effect
+			_, _, _, err = client.Do(context.Background(), req)
+
+			errs <- err
+		}()
+	}
+
+	for i := 0; i < TotalRequests; i++ {
+		err := <-errs
+		if err == nil {
+			t.Fatal("Expected a RateLimitedResponseError. Err was nil.")
+		}
+
+		rateLimitErr, ok := err.(*RateLimitedResponseError)
+		if !ok {
+			t.Fatal("Expected a RateLimitedResponseError. Got unexpected type.")
+		}
+
+		t.Logf("%s\n", rateLimitErr.Error())
+	}
+}

+ 19 - 12
pkg/thanos/thanos.go

@@ -67,26 +67,25 @@ func QueryOffset() string {
 	return queryOffset
 }
 
-func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
-	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
-
+func NewThanosClient(address string, config *prom.PrometheusClientConfig) (prometheus.Client, error) {
 	tc := prometheus.Config{
 		Address: address,
 		RoundTripper: &http.Transport{
 			Proxy: http.ProxyFromEnvironment,
 			DialContext: (&net.Dialer{
-				Timeout:   timeout,
-				KeepAlive: keepAlive,
+				Timeout:   config.Timeout,
+				KeepAlive: config.KeepAlive,
 			}).DialContext,
-			TLSHandshakeTimeout: 10 * time.Second,
-			TLSClientConfig:     tlsConfig,
+			TLSHandshakeTimeout: config.TLSHandshakeTimeout,
+			TLSClientConfig: &tls.Config{
+				InsecureSkipVerify: config.TLSInsecureSkipVerify,
+			},
 		},
 	}
 
-	auth := &prom.ClientAuth{
-		Username:    env.GetMultiClusterBasicAuthUsername(),
-		Password:    env.GetMultiClusterBasicAuthPassword(),
-		BearerToken: env.GetMultiClusterBearerToken(),
+	client, err := prometheus.NewClient(tc)
+	if err != nil {
+		return nil, err
 	}
 
 	// max source resolution decorator
@@ -97,5 +96,13 @@ func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConc
 		return queryParams
 	}
 
-	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, maxSourceDecorator, queryLogFile)
+	return prom.NewRateLimitedClient(
+		prom.ThanosClientID,
+		client,
+		config.QueryConcurrency,
+		config.Auth,
+		maxSourceDecorator,
+		config.RateLimitRetryOpts,
+		config.QueryLogFile,
+	)
 }

+ 65 - 0
pkg/util/httputil/httputil.go

@@ -3,9 +3,12 @@ package httputil
 import (
 	"context"
 	"fmt"
+	"math"
 	"net/http"
 	"net/url"
+	"strconv"
 	"strings"
+	"time"
 
 	"github.com/kubecost/cost-model/pkg/util/mapper"
 )
@@ -90,6 +93,68 @@ func SetQuery(r *http.Request, query string) *http.Request {
 //  Package Funcs
 //--------------------------------------------------------------------------
 
+// IsRateLimited accepts a response and body to determine if either indicate
+// a rate limited return
+func IsRateLimited(resp *http.Response, body []byte) bool {
+	return IsRateLimitedResponse(resp) || IsRateLimitedBody(resp, body)
+}
+
+// RateLimitedRetryFor returns the parsed Retry-After header relative to the
+// current time. If the Retry-After header does not exist, the defaultWait parameter
+// is returned.
+func RateLimitedRetryFor(resp *http.Response, defaultWait time.Duration, retry int) time.Duration {
+	if resp.Header == nil {
+		return ExponentialBackoffWaitFor(defaultWait, retry)
+	}
+
+	// Retry-After is either the number of seconds to wait or a target datetime (RFC1123)
+	value := resp.Header.Get("Retry-After")
+	if value == "" {
+		return defaultWait
+	}
+
+	seconds, err := strconv.ParseInt(value, 10, 64)
+	if err == nil {
+		return time.Duration(seconds) * time.Second
+	}
+
+	// failed to parse an integer, try datetime RFC1123
+	t, err := time.Parse(time.RFC1123, value)
+	if err == nil {
+		// return 0 if the datetime has already elapsed
+		result := t.Sub(time.Now())
+		if result < 0 {
+			return 0
+		}
+		return result
+	}
+
+	// failed to parse datetime, return default
+	return defaultWait
+}
+
+// ExpontentialBackoffWatiFor accepts a default wait duration and the current retry count
+// and returns a new duration
+func ExponentialBackoffWaitFor(defaultWait time.Duration, retry int) time.Duration {
+	return time.Duration(math.Pow(2, float64(retry))*float64(defaultWait.Milliseconds())) * time.Millisecond
+}
+
+// IsRateLimitedResponse returns true if the status code is a 429 (TooManyRequests)
+func IsRateLimitedResponse(resp *http.Response) bool {
+	return resp.StatusCode == http.StatusTooManyRequests
+}
+
+// IsRateLimitedBody attempts to determine if a response body indicates throttling
+// has occurred. This function is a result of some API providers (AWS) returning
+// a 400 status code instead of 429 for rate limit exceptions.
+func IsRateLimitedBody(resp *http.Response, body []byte) bool {
+	// ignore non-400 status
+	if resp.StatusCode < http.StatusBadRequest || resp.StatusCode >= http.StatusInternalServerError {
+		return false
+	}
+	return strings.Contains(string(body), "ThrottlingException")
+}
+
 // HeaderString writes the request/response http.Header to a string.
 func HeaderString(h http.Header) string {
 	var sb strings.Builder

+ 220 - 25
pkg/util/timeutil/timeutil.go

@@ -1,6 +1,7 @@
 package timeutil
 
 import (
+	"errors"
 	"fmt"
 	"regexp"
 	"strconv"
@@ -86,35 +87,129 @@ func FormatStoreResolution(dur time.Duration) string {
 	return fmt.Sprint(dur)
 }
 
-// ParseDuration converts a Prometheus-style duration string into a Duration
+// ParseDuration parses a duration string.
+// A duration string is a possibly signed sequence of
+// decimal numbers, each with optional fraction and a unit suffix,
+// such as "300ms", "-1.5h" or "2h45m".
+// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h", "d"
 func ParseDuration(duration string) (time.Duration, error) {
-	// Trim prefix of Prometheus format duration
 	duration = CleanDurationString(duration)
-	if len(duration) < 2 {
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-	unitStr := duration[len(duration)-1:]
-	var unit time.Duration
-	switch unitStr {
-	case "s":
-		unit = time.Second
-	case "m":
-		unit = time.Minute
-	case "h":
-		unit = time.Hour
-	case "d":
-		unit = 24.0 * time.Hour
-	default:
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
-	}
-
-	amountStr := duration[:len(duration)-1]
-	amount, err := strconv.ParseInt(amountStr, 10, 64)
-	if err != nil {
-		return 0, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	return goParseDuration(duration)
+}
+
+// unitMap contains a list of units that can be parsed by ParseDuration
+var unitMap = map[string]int64{
+	"ns": int64(time.Nanosecond),
+	"us": int64(time.Microsecond),
+	"µs": int64(time.Microsecond), // U+00B5 = micro symbol
+	"μs": int64(time.Microsecond), // U+03BC = Greek letter mu
+	"ms": int64(time.Millisecond),
+	"s":  int64(time.Second),
+	"m":  int64(time.Minute),
+	"h":  int64(time.Hour),
+	"d":  int64(time.Hour * 24),
+}
+
+// goParseDuration is time.ParseDuration lifted from the go std library and enhanced with the ability to
+// handle the "d" (day) unit. The contents of the function itself are identical to the std library, it is
+// only the unitMap above that contains the added unit.
+func goParseDuration(s string) (time.Duration, error) {
+	// [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+
+	orig := s
+	var d int64
+	neg := false
+
+	// Consume [-+]?
+	if s != "" {
+		c := s[0]
+		if c == '-' || c == '+' {
+			neg = c == '-'
+			s = s[1:]
+		}
+	}
+	// Special case: if all that is left is "0", this is zero.
+	if s == "0" {
+		return 0, nil
+	}
+	if s == "" {
+		return 0, errors.New("time: invalid duration " + quote(orig))
+	}
+	for s != "" {
+		var (
+			v, f  int64       // integers before, after decimal point
+			scale float64 = 1 // value = v + f/scale
+		)
+
+		var err error
+
+		// The next character must be [0-9.]
+		if !(s[0] == '.' || '0' <= s[0] && s[0] <= '9') {
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+		// Consume [0-9]*
+		pl := len(s)
+		v, s, err = leadingInt(s)
+		if err != nil {
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+		pre := pl != len(s) // whether we consumed anything before a period
+
+		// Consume (\.[0-9]*)?
+		post := false
+		if s != "" && s[0] == '.' {
+			s = s[1:]
+			pl := len(s)
+			f, scale, s = leadingFraction(s)
+			post = pl != len(s)
+		}
+		if !pre && !post {
+			// no digits (e.g. ".s" or "-.s")
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+
+		// Consume unit.
+		i := 0
+		for ; i < len(s); i++ {
+			c := s[i]
+			if c == '.' || '0' <= c && c <= '9' {
+				break
+			}
+		}
+		if i == 0 {
+			return 0, errors.New("time: missing unit in duration " + quote(orig))
+		}
+		u := s[:i]
+		s = s[i:]
+		unit, ok := unitMap[u]
+		if !ok {
+			return 0, errors.New("time: unknown unit " + quote(u) + " in duration " + quote(orig))
+		}
+		if v > (1<<63-1)/unit {
+			// overflow
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
+		v *= unit
+		if f > 0 {
+			// float64 is needed to be nanosecond accurate for fractions of hours.
+			// v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest unit)
+			v += int64(float64(f) * (float64(unit) / scale))
+			if v < 0 {
+				// overflow
+				return 0, errors.New("time: invalid duration " + quote(orig))
+			}
+		}
+		d += v
+		if d < 0 {
+			// overflow
+			return 0, errors.New("time: invalid duration " + quote(orig))
+		}
 	}
 
-	return time.Duration(amount) * unit, nil
+	if neg {
+		d = -d
+	}
+
+	return time.Duration(d), nil
 }
 
 // CleanDurationString removes prometheus formatted prefix "offset " allong with leading a trailing whitespace
@@ -238,3 +333,103 @@ func (jt *JobTicker) TickIn(d time.Duration) {
 		}
 	}(d)
 }
+
+// NOTE: The following functions were lifted from the go std library to support the ParseDuration enhancement
+// NOTE: described above.
+
+const (
+	lowerhex  = "0123456789abcdef"
+	runeSelf  = 0x80
+	runeError = '\uFFFD'
+)
+
+// quote is lifted from the go std library to support the custom ParseDuration enhancement
+func quote(s string) string {
+	buf := make([]byte, 1, len(s)+2) // slice will be at least len(s) + quotes
+	buf[0] = '"'
+	for i, c := range s {
+		if c >= runeSelf || c < ' ' {
+			// This means you are asking us to parse a time.Duration or
+			// time.Location with unprintable or non-ASCII characters in it.
+			// We don't expect to hit this case very often. We could try to
+			// reproduce strconv.Quote's behavior with full fidelity but
+			// given how rarely we expect to hit these edge cases, speed and
+			// conciseness are better.
+			var width int
+			if c == runeError {
+				width = 1
+				if i+2 < len(s) && s[i:i+3] == string(runeError) {
+					width = 3
+				}
+			} else {
+				width = len(string(c))
+			}
+			for j := 0; j < width; j++ {
+				buf = append(buf, `\x`...)
+				buf = append(buf, lowerhex[s[i+j]>>4])
+				buf = append(buf, lowerhex[s[i+j]&0xF])
+			}
+		} else {
+			if c == '"' || c == '\\' {
+				buf = append(buf, '\\')
+			}
+			buf = append(buf, string(c)...)
+		}
+	}
+	buf = append(buf, '"')
+	return string(buf)
+}
+
+// leadingFraction consumes the leading [0-9]* from s.
+// It is used only for fractions, so does not return an error on overflow,
+// it just stops accumulating precision.
+func leadingFraction(s string) (x int64, scale float64, rem string) {
+	i := 0
+	scale = 1
+	overflow := false
+	for ; i < len(s); i++ {
+		c := s[i]
+		if c < '0' || c > '9' {
+			break
+		}
+		if overflow {
+			continue
+		}
+		if x > (1<<63-1)/10 {
+			// It's possible for overflow to give a positive number, so take care.
+			overflow = true
+			continue
+		}
+		y := x*10 + int64(c) - '0'
+		if y < 0 {
+			overflow = true
+			continue
+		}
+		x = y
+		scale *= 10
+	}
+	return x, scale, s[i:]
+}
+
+var errLeadingInt = errors.New("time: bad [0-9]*") // never printed
+
+// leadingInt consumes the leading [0-9]* from s.
+func leadingInt(s string) (x int64, rem string, err error) {
+	i := 0
+	for ; i < len(s); i++ {
+		c := s[i]
+		if c < '0' || c > '9' {
+			break
+		}
+		if x > (1<<63-1)/10 {
+			// overflow
+			return 0, "", errLeadingInt
+		}
+		x = x*10 + int64(c) - '0'
+		if x < 0 {
+			// overflow
+			return 0, "", errLeadingInt
+		}
+	}
+	return x, s[i:], nil
+}

+ 12 - 0
pkg/util/timeutil/timeutil_test.go

@@ -262,6 +262,18 @@ func Test_ParseDuration(t *testing.T) {
 			input:    " offset 3d ",
 			expected: 24.0 * time.Hour * 3,
 		},
+		"ms duration": {
+			input:    "100ms",
+			expected: 100 * time.Millisecond,
+		},
+		"complex duration": {
+			input:    "2d3h14m2s",
+			expected: (24 * time.Hour * 2) + (3 * time.Hour) + (14 * time.Minute) + (2 * time.Second),
+		},
+		"negative duration": {
+			input:    "-2d",
+			expected: -48 * time.Hour,
+		},
 		"zero": {
 			input:    "0h",
 			expected: time.Duration(0),