فهرست منبع

Allow max retry to be set via env var, allow default retry wait time to be set via env var. Remaining: Update timeutil.ParseDuration to accept durations other than s|m|h|d

Matt Bolt 4 سال پیش
والد
کامیت
844bccd888
8فایلهای تغییر یافته به همراه147 افزوده شده و 56 حذف شده
  1. 13 5
      pkg/cmd/agent/agent.go
  2. 14 6
      pkg/costmodel/router.go
  3. 16 2
      pkg/env/costmodelenv.go
  4. 12 0
      pkg/env/env.go
  5. 41 32
      pkg/prom/prom.go
  6. 40 8
      pkg/prom/ratelimitedclient_test.go
  7. 1 1
      pkg/thanos/thanos.go
  8. 10 2
      pkg/util/httputil/httputil.go

+ 13 - 5
pkg/cmd/agent/agent.go

@@ -87,12 +87,20 @@ func newPrometheusClient() (prometheus.Client, error) {
 	keepAlive := 120 * time.Second
 	tlsHandshakeTimeout := 10 * time.Second
 
+	var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
+	if env.IsPrometheusRetryOnRateLimitResponse() {
+		rateLimitRetryOpts = &prom.RateLimitRetryOpts{
+			MaxRetries:       env.GetPrometheusRetryOnRateLimitMaxRetries(),
+			DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
+		}
+	}
+
 	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
-		Timeout:                  timeout,
-		KeepAlive:                keepAlive,
-		TLSHandshakeTimeout:      tlsHandshakeTimeout,
-		TLSInsecureSkipVerify:    env.GetInsecureSkipVerify(),
-		RetryOnRateLimitResponse: env.IsPrometheusRetryOnRateLimitResponse(),
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
 		Auth: &prom.ClientAuth{
 			Username:    env.GetDBBasicAuthUsername(),
 			Password:    env.GetDBBasicAuthUserPassword(),

+ 14 - 6
pkg/costmodel/router.go

@@ -1345,12 +1345,20 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	tlsHandshakeTimeout := 10 * time.Second
 	scrapeInterval := time.Minute
 
+	var rateLimitRetryOpts *prom.RateLimitRetryOpts = nil
+	if env.IsPrometheusRetryOnRateLimitResponse() {
+		rateLimitRetryOpts = &prom.RateLimitRetryOpts{
+			MaxRetries:       env.GetPrometheusRetryOnRateLimitMaxRetries(),
+			DefaultRetryWait: env.GetPrometheusRetryOnRateLimitDefaultWait(),
+		}
+	}
+
 	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
-		Timeout:                  timeout,
-		KeepAlive:                keepAlive,
-		TLSHandshakeTimeout:      tlsHandshakeTimeout,
-		TLSInsecureSkipVerify:    env.GetInsecureSkipVerify(),
-		RetryOnRateLimitResponse: env.IsPrometheusRetryOnRateLimitResponse(),
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
 		Auth: &prom.ClientAuth{
 			Username:    env.GetDBBasicAuthUsername(),
 			Password:    env.GetDBBasicAuthUserPassword(),
@@ -1471,7 +1479,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 				KeepAlive:                keepAlive,
 				TLSHandshakeTimeout:      tlsHandshakeTimeout,
 				TLSInsecureSkipVerify:    env.GetInsecureSkipVerify(),
-				RetryOnRateLimitResponse: env.IsPrometheusRetryOnRateLimitResponse(),
+				RetryOnRateLimitResponse: rateLimitRetryOpts,
 				Auth: &prom.ClientAuth{
 					Username:    env.GetMultiClusterBasicAuthUsername(),
 					Password:    env.GetMultiClusterBasicAuthPassword(),

+ 16 - 2
pkg/env/costmodelenv.go

@@ -82,8 +82,10 @@ const (
 	ClusterInfoFileEnabledEnvVar  = "CLUSTER_INFO_FILE_ENABLED"
 	ClusterCacheFileEnabledEnvVar = "CLUSTER_CACHE_FILE_ENABLED"
 
-	PrometheusQueryOffsetEnvVar              = "PROMETHEUS_QUERY_OFFSET"
-	PrometheusRetryOnRateLimitResponseEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT"
+	PrometheusQueryOffsetEnvVar                 = "PROMETHEUS_QUERY_OFFSET"
+	PrometheusRetryOnRateLimitResponseEnvVar    = "PROMETHEUS_RETRY_ON_RATE_LIMIT"
+	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
+	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 )
 
 // GetKubecostConfigBucket returns a file location for a mounted bucket configuration which is used to store
@@ -110,6 +112,18 @@ func IsPrometheusRetryOnRateLimitResponse() bool {
 	return GetBool(PrometheusRetryOnRateLimitResponseEnvVar, true)
 }
 
+// GetPrometheusRetryOnRateLimitMaxRetries returns the maximum number of retries that should be attempted prior to failing.
+// Only used if IsPrometheusRetryOnRateLimitResponse() is true.
+func GetPrometheusRetryOnRateLimitMaxRetries() int {
+	return GetInt(PrometheusRetryOnRateLimitMaxRetriesEnvVar, 5)
+}
+
+// GetPrometheusRetryOnRateLimitDefaultWait returns the default wait time for a retriable rate limit response without a
+// Retry-After header.
+func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
+	return GetDuration(PrometheusRetryOnRateLimitDefaultWaitEnvVar, 100*time.Millisecond)
+}
+
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
 // to all non-range queries made via our query context. This should only be applied when there is a significant delay in
 // data arriving in the target prom db. For example, if supplying a thanos or cortex querier for the prometheus server, using

+ 12 - 0
pkg/env/env.go

@@ -2,6 +2,7 @@ package env
 
 import (
 	"os"
+	"time"
 
 	"github.com/kubecost/cost-model/pkg/util/mapper"
 )
@@ -116,6 +117,12 @@ func GetBool(key string, defaultValue bool) bool {
 	return envMapper.GetBool(key, defaultValue)
 }
 
+// GetDuration parses a time.Duration from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetDuration(key string, defaultValue time.Duration) time.Duration {
+	return envMapper.GetDuration(key, defaultValue)
+}
+
 // Set sets the environment variable for the key provided using the value provided.
 func Set(key string, value string) error {
 	return envMapper.Set(key, value)
@@ -175,3 +182,8 @@ func SetUInt64(key string, value uint64) error {
 func SetBool(key string, value bool) error {
 	return envMapper.SetBool(key, value)
 }
+
+// SetDuration sets the environment variable to a string formatted time.Duration
+func SetDuration(key string, value time.Duration) error {
+	return envMapper.SetDuration(key, value)
+}

+ 41 - 32
pkg/prom/prom.go

@@ -58,13 +58,19 @@ func (auth *ClientAuth) Apply(req *http.Request) {
 }
 
 //--------------------------------------------------------------------------
-//  Rate Limited Error
+//  Rate Limit Options
 //--------------------------------------------------------------------------
 
 // MaxRetryAfterDuration is the maximum amount of time we should ever wait
 // during a retry. This is to prevent starvation on the request threads
 const MaxRetryAfterDuration = 10 * time.Second
 
+// RateLimitRetryOpts contains retry options
+type RateLimitRetryOpts struct {
+	MaxRetries       int
+	DefaultRetryWait time.Duration
+}
+
 // RateLimitResponseStatus contains the status of the rate limited retries
 type RateLimitResponseStatus struct {
 	RetriesRemaining int
@@ -105,14 +111,14 @@ func (rlre *RateLimitedResponseError) Error() string {
 // RateLimitedPrometheusClient is a prometheus client which limits the total number of
 // concurrent outbound requests allowed at a given moment.
 type RateLimitedPrometheusClient struct {
-	id               string
-	client           prometheus.Client
-	auth             *ClientAuth
-	queue            collections.BlockingQueue
-	decorator        QueryParamsDecorator
-	retryOnRateLimit bool
-	outbound         *atomic.AtomicInt32
-	fileLogger       *golog.Logger
+	id             string
+	client         prometheus.Client
+	auth           *ClientAuth
+	queue          collections.BlockingQueue
+	decorator      QueryParamsDecorator
+	rateLimitRetry *RateLimitRetryOpts
+	outbound       *atomic.AtomicInt32
+	fileLogger     *golog.Logger
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -130,7 +136,7 @@ func NewRateLimitedClient(
 	maxConcurrency int,
 	auth *ClientAuth,
 	decorator QueryParamsDecorator,
-	retryOnRateLimit bool,
+	rateLimitRetryOpts *RateLimitRetryOpts,
 	queryLogFile string) (prometheus.Client, error) {
 
 	queue := collections.NewBlockingQueue()
@@ -161,14 +167,14 @@ func NewRateLimitedClient(
 	}
 
 	rlpc := &RateLimitedPrometheusClient{
-		id:               id,
-		client:           client,
-		queue:            queue,
-		decorator:        decorator,
-		retryOnRateLimit: retryOnRateLimit,
-		outbound:         outbound,
-		auth:             auth,
-		fileLogger:       logger,
+		id:             id,
+		client:         client,
+		queue:          queue,
+		decorator:      decorator,
+		rateLimitRetry: rateLimitRetryOpts,
+		outbound:       outbound,
+		auth:           auth,
+		fileLogger:     logger,
 	}
 
 	// Start concurrent request processing
@@ -224,7 +230,8 @@ type workResponse struct {
 
 // worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
 func (rlpc *RateLimitedPrometheusClient) worker() {
-	retryRateLimit := rlpc.retryOnRateLimit
+	retryOpts := rlpc.rateLimitRetry
+	retryRateLimit := retryOpts != nil
 
 	for {
 		// blocks until there is an item available
@@ -262,15 +269,17 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 			// * If we couldn't determine how long to wait for a retry, use 1 second by default
 			if retryRateLimit {
 				var status []*RateLimitResponseStatus
-				var retries int = 5
+				var retries int = retryOpts.MaxRetries
+				var defaultWait time.Duration = retryOpts.DefaultRetryWait
 
 				for httputil.IsRateLimited(res, body) && retries > 0 {
+					// calculate amount of time to wait before retry, in the event the default wait is used,
+					// an exponential backoff is applied based on the number of times we've retried.
+					retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
 					retries--
 
-					// calculate amount of time to wait before retry, default to 1s
-					retryAfter := httputil.RateLimitedRetryFor(res, time.Second)
 					status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
-					log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %.2f seconds. Retries Remaining: %d", retryAfter.Seconds(), retries)
+					log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
 
 					// To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
 					// to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
@@ -340,14 +349,14 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 
 // PrometheusClientConfig contains all configurable options for creating a new prometheus client
 type PrometheusClientConfig struct {
-	Timeout                  time.Duration
-	KeepAlive                time.Duration
-	TLSHandshakeTimeout      time.Duration
-	TLSInsecureSkipVerify    bool
-	RetryOnRateLimitResponse bool
-	Auth                     *ClientAuth
-	QueryConcurrency         int
-	QueryLogFile             string
+	Timeout               time.Duration
+	KeepAlive             time.Duration
+	TLSHandshakeTimeout   time.Duration
+	TLSInsecureSkipVerify bool
+	RateLimitRetryOpts    *RateLimitRetryOpts
+	Auth                  *ClientAuth
+	QueryConcurrency      int
+	QueryLogFile          string
 }
 
 // NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
@@ -379,7 +388,7 @@ func NewPrometheusClient(address string, config *PrometheusClientConfig) (promet
 		config.QueryConcurrency,
 		config.Auth,
 		nil,
-		config.RetryOnRateLimitResponse,
+		config.RateLimitRetryOpts,
 		config.QueryLogFile,
 	)
 }

+ 40 - 8
pkg/prom/ratelimitedclient_test.go

@@ -6,12 +6,14 @@ import (
 	"io"
 	"math"
 	"net/http"
+
 	"net/url"
 	"sync"
 	"testing"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/util"
+	"github.com/kubecost/cost-model/pkg/util/httputil"
 	prometheus "github.com/prometheus/client_golang/api"
 )
 
@@ -115,6 +117,13 @@ func newHackyAmazonRateLimitedResponse() *ResponseAndBody {
 	}
 }
 
+func newTestRetryOpts() *RateLimitRetryOpts {
+	return &RateLimitRetryOpts{
+		MaxRetries:       5,
+		DefaultRetryWait: 100 * time.Millisecond,
+	}
+}
+
 func TestRateLimitedOnceAndSuccess(t *testing.T) {
 	t.Parallel()
 
@@ -131,7 +140,7 @@ func TestRateLimitedOnceAndSuccess(t *testing.T) {
 		1,
 		nil,
 		nil,
-		true,
+		newTestRetryOpts(),
 		"",
 	)
 
@@ -172,7 +181,7 @@ func TestRateLimitedOnceAndFail(t *testing.T) {
 		1,
 		nil,
 		nil,
-		true,
+		newTestRetryOpts(),
 		"",
 	)
 
@@ -218,7 +227,7 @@ func TestRateLimitedResponses(t *testing.T) {
 		1,
 		nil,
 		nil,
-		true,
+		newTestRetryOpts(),
 		"",
 	)
 
@@ -266,14 +275,14 @@ func TestRateLimitedResponses(t *testing.T) {
 
 	// check 1s wait
 	seconds = rateLimitRetries[2].WaitTime.Seconds()
-	if !util.IsApproximately(seconds, 1.0) {
-		t.Fatalf("Expected 1.0 seconds. Got %.2f", seconds)
+	if !util.IsApproximately(seconds, 0.4) {
+		t.Fatalf("Expected 0.4 seconds. Got %.2f", seconds)
 	}
 
 	// check 1s wait
 	seconds = rateLimitRetries[3].WaitTime.Seconds()
-	if !util.IsApproximately(seconds, 1.0) {
-		t.Fatalf("Expected 1.0 seconds. Got %.2f", seconds)
+	if !util.IsApproximately(seconds, 0.8) {
+		t.Fatalf("Expected 0.8 seconds. Got %.2f", seconds)
 	}
 
 	// check 3s wait
@@ -284,6 +293,29 @@ func TestRateLimitedResponses(t *testing.T) {
 
 }
 
+//
+func AssertDurationEqual(t *testing.T, expected, actual time.Duration) {
+	if actual != expected {
+		t.Fatalf("Expected: %dms, Got: %dms", expected.Milliseconds(), actual.Milliseconds())
+	}
+}
+
+func TestExponentialBackOff(t *testing.T) {
+	var ExpectedResults = []time.Duration{
+		100 * time.Millisecond,
+		200 * time.Millisecond,
+		400 * time.Millisecond,
+		800 * time.Millisecond,
+		1600 * time.Millisecond,
+	}
+
+	w := 100 * time.Millisecond
+
+	for retry := 0; retry < 5; retry++ {
+		AssertDurationEqual(t, ExpectedResults[retry], httputil.ExponentialBackoffWaitFor(w, retry))
+	}
+}
+
 func TestConcurrentRateLimiting(t *testing.T) {
 	t.Parallel()
 
@@ -309,7 +341,7 @@ func TestConcurrentRateLimiting(t *testing.T) {
 		QueryConcurrency,
 		nil,
 		nil,
-		true,
+		newTestRetryOpts(),
 		"",
 	)
 

+ 1 - 1
pkg/thanos/thanos.go

@@ -102,7 +102,7 @@ func NewThanosClient(address string, config *prom.PrometheusClientConfig) (prome
 		config.QueryConcurrency,
 		config.Auth,
 		maxSourceDecorator,
-		config.RetryOnRateLimitResponse,
+		config.RateLimitRetryOpts,
 		config.QueryLogFile,
 	)
 }

+ 10 - 2
pkg/util/httputil/httputil.go

@@ -3,6 +3,7 @@ package httputil
 import (
 	"context"
 	"fmt"
+	"math"
 	"net/http"
 	"net/url"
 	"strconv"
@@ -101,10 +102,11 @@ func IsRateLimited(resp *http.Response, body []byte) bool {
 // RateLimitedRetryFor returns the parsed Retry-After header relative to the
 // current time. If the Retry-After header does not exist, the defaultWait parameter
 // is returned.
-func RateLimitedRetryFor(resp *http.Response, defaultWait time.Duration) time.Duration {
+func RateLimitedRetryFor(resp *http.Response, defaultWait time.Duration, retry int) time.Duration {
 	if resp.Header == nil {
-		return defaultWait
+		return ExponentialBackoffWaitFor(defaultWait, retry)
 	}
+
 	// Retry-After is either the number of seconds to wait or a target datetime (RFC1123)
 	value := resp.Header.Get("Retry-After")
 	if value == "" {
@@ -131,6 +133,12 @@ func RateLimitedRetryFor(resp *http.Response, defaultWait time.Duration) time.Du
 	return defaultWait
 }
 
+// ExpontentialBackoffWatiFor accepts a default wait duration and the current retry count
+// and returns a new duration
+func ExponentialBackoffWaitFor(defaultWait time.Duration, retry int) time.Duration {
+	return time.Duration(math.Pow(2, float64(retry))*float64(defaultWait.Milliseconds())) * time.Millisecond
+}
+
 // IsRateLimitedResponse returns true if the status code is a 429 (TooManyRequests)
 func IsRateLimitedResponse(resp *http.Response) bool {
 	return resp.StatusCode == http.StatusTooManyRequests