|
|
@@ -3,14 +3,15 @@ package prom
|
|
|
import (
|
|
|
"context"
|
|
|
"crypto/tls"
|
|
|
+ "fmt"
|
|
|
"net"
|
|
|
"net/http"
|
|
|
"net/url"
|
|
|
"os"
|
|
|
+ "strings"
|
|
|
"time"
|
|
|
|
|
|
"github.com/kubecost/cost-model/pkg/collections"
|
|
|
- "github.com/kubecost/cost-model/pkg/env"
|
|
|
"github.com/kubecost/cost-model/pkg/log"
|
|
|
"github.com/kubecost/cost-model/pkg/util/atomic"
|
|
|
"github.com/kubecost/cost-model/pkg/util/fileutil"
|
|
|
@@ -56,6 +57,42 @@ func (auth *ClientAuth) Apply(req *http.Request) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+//--------------------------------------------------------------------------
|
|
|
+// Rate Limited Error
|
|
|
+//--------------------------------------------------------------------------
|
|
|
+
|
|
|
+type RateLimitResponseStatus struct {
|
|
|
+ RetriesRemaining int
|
|
|
+ WaitTime time.Duration
|
|
|
+}
|
|
|
+
|
|
|
+// String creates a string representation of the rate limit status
|
|
|
+func (rtrs *RateLimitResponseStatus) String() string {
|
|
|
+ return fmt.Sprintf("Wait Time: %.2f seconds, Retries Remaining: %d", rtrs.WaitTime.Seconds(), rtrs.RetriesRemaining)
|
|
|
+}
|
|
|
+
|
|
|
+// RateLimitedError contains a list of retry statuses that occurred during
|
|
|
+// retries on a rate limited response
|
|
|
+type RateLimitedResponseError struct {
|
|
|
+ RateLimitStatus []*RateLimitResponseStatus
|
|
|
+}
|
|
|
+
|
|
|
+// Error returns a string representation of the error, including the rate limit
|
|
|
+// status reports
|
|
|
+func (rlre *RateLimitedResponseError) Error() string {
|
|
|
+ var sb strings.Builder
|
|
|
+
|
|
|
+ sb.WriteString("Request was Rate Limited and Retries Exhausted:\n")
|
|
|
+
|
|
|
+ for _, rls := range rlre.RateLimitStatus {
|
|
|
+ sb.WriteString(" * ")
|
|
|
+ sb.WriteString(rls.String())
|
|
|
+ sb.WriteString("\n")
|
|
|
+ }
|
|
|
+
|
|
|
+ return sb.String()
|
|
|
+}
|
|
|
+
|
|
|
//--------------------------------------------------------------------------
|
|
|
// RateLimitedPrometheusClient
|
|
|
//--------------------------------------------------------------------------
|
|
|
@@ -63,13 +100,14 @@ func (auth *ClientAuth) Apply(req *http.Request) {
|
|
|
// RateLimitedPrometheusClient is a prometheus client which limits the total number of
|
|
|
// concurrent outbound requests allowed at a given moment.
|
|
|
type RateLimitedPrometheusClient struct {
|
|
|
- id string
|
|
|
- client prometheus.Client
|
|
|
- auth *ClientAuth
|
|
|
- queue collections.BlockingQueue
|
|
|
- decorator QueryParamsDecorator
|
|
|
- outbound *atomic.AtomicInt32
|
|
|
- fileLogger *golog.Logger
|
|
|
+ id string
|
|
|
+ client prometheus.Client
|
|
|
+ auth *ClientAuth
|
|
|
+ queue collections.BlockingQueue
|
|
|
+ decorator QueryParamsDecorator
|
|
|
+ retryOnRateLimit bool
|
|
|
+ outbound *atomic.AtomicInt32
|
|
|
+ fileLogger *golog.Logger
|
|
|
}
|
|
|
|
|
|
// requestCounter is used to determine if the prometheus client keeps track of
|
|
|
@@ -81,11 +119,14 @@ type requestCounter interface {
|
|
|
|
|
|
// NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
|
|
|
// prometheus requests.
|
|
|
-func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, decorator QueryParamsDecorator, queryLogFile string) (prometheus.Client, error) {
|
|
|
- c, err := prometheus.NewClient(config)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+func NewRateLimitedClient(
|
|
|
+ id string,
|
|
|
+ client prometheus.Client,
|
|
|
+ maxConcurrency int,
|
|
|
+ auth *ClientAuth,
|
|
|
+ decorator QueryParamsDecorator,
|
|
|
+ retryOnRateLimit bool,
|
|
|
+ queryLogFile string) (prometheus.Client, error) {
|
|
|
|
|
|
queue := collections.NewBlockingQueue()
|
|
|
outbound := atomic.NewAtomicInt32(0)
|
|
|
@@ -105,14 +146,24 @@ func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency in
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // default authentication
|
|
|
+ if auth == nil {
|
|
|
+ auth = &ClientAuth{
|
|
|
+ Username: "",
|
|
|
+ Password: "",
|
|
|
+ BearerToken: "",
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
rlpc := &RateLimitedPrometheusClient{
|
|
|
- id: id,
|
|
|
- client: c,
|
|
|
- queue: queue,
|
|
|
- decorator: decorator,
|
|
|
- outbound: outbound,
|
|
|
- auth: auth,
|
|
|
- fileLogger: logger,
|
|
|
+ id: id,
|
|
|
+ client: client,
|
|
|
+ queue: queue,
|
|
|
+ decorator: decorator,
|
|
|
+ retryOnRateLimit: retryOnRateLimit,
|
|
|
+ outbound: outbound,
|
|
|
+ auth: auth,
|
|
|
+ fileLogger: logger,
|
|
|
}
|
|
|
|
|
|
// Start concurrent request processing
|
|
|
@@ -168,6 +219,8 @@ type workResponse struct {
|
|
|
|
|
|
// worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
|
|
|
func (rlpc *RateLimitedPrometheusClient) worker() {
|
|
|
+ retryRateLimit := rlpc.retryOnRateLimit
|
|
|
+
|
|
|
for {
|
|
|
// blocks until there is an item available
|
|
|
item := rlpc.queue.Dequeue()
|
|
|
@@ -198,6 +251,34 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
|
|
|
roundTripStart := time.Now()
|
|
|
res, body, warnings, err := rlpc.client.Do(ctx, req)
|
|
|
|
|
|
+ // If retries on rate limited response is enabled:
|
|
|
+ // * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
|
|
|
+ // * Attempt to parse a Retry-After from response headers (common on 429)
|
|
|
+ // * If we couldn't determine how long to wait for a retry, use 1 second by default
|
|
|
+ if retryRateLimit {
|
|
|
+ var status []*RateLimitResponseStatus
|
|
|
+ var retries int = 5
|
|
|
+
|
|
|
+ for httputil.IsRateLimited(res, body) && retries > 0 {
|
|
|
+ retries--
|
|
|
+
|
|
|
+ // calculate amount of time to wait before retry, default to 1s
|
|
|
+ retryAfter := httputil.RateLimitedRetryFor(res, time.Second)
|
|
|
+ status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
|
|
|
+ log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %.2f seconds. Retries Remaining: %d", retryAfter.Seconds(), retries)
|
|
|
+
|
|
|
+ // execute wait and retry
|
|
|
+ time.Sleep(retryAfter)
|
|
|
+ res, body, warnings, err = rlpc.client.Do(ctx, req)
|
|
|
+ }
|
|
|
+
|
|
|
+ // if we've broken out of our retry loop and the resp is still rate limited,
|
|
|
+ // then let's generate a meaningful error to pass back
|
|
|
+ if retries == 0 && httputil.IsRateLimited(res, body) {
|
|
|
+ err = &RateLimitedResponseError{RateLimitStatus: status}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Decrement outbound counter
|
|
|
rlpc.outbound.Decrement()
|
|
|
LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
|
|
|
@@ -245,30 +326,50 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
|
|
|
// Client Helpers
|
|
|
//--------------------------------------------------------------------------
|
|
|
|
|
|
-func NewPrometheusClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
|
|
|
- tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
|
|
|
+// PrometheusClientConfig contains all configurable options for creating a new prometheus client
|
|
|
+type PrometheusClientConfig struct {
|
|
|
+ Timeout time.Duration
|
|
|
+ KeepAlive time.Duration
|
|
|
+ TLSHandshakeTimeout time.Duration
|
|
|
+ TLSInsecureSkipVerify bool
|
|
|
+ RetryOnRateLimitResponse bool
|
|
|
+ Auth *ClientAuth
|
|
|
+ QueryConcurrency int
|
|
|
+ QueryLogFile string
|
|
|
+}
|
|
|
|
|
|
- // may be necessary for long prometheus queries. TODO: make this configurable
|
|
|
+// NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
|
|
|
+func NewPrometheusClient(address string, config *PrometheusClientConfig) (prometheus.Client, error) {
|
|
|
+ // may be necessary for long prometheus queries
|
|
|
pc := prometheus.Config{
|
|
|
Address: address,
|
|
|
RoundTripper: &http.Transport{
|
|
|
Proxy: http.ProxyFromEnvironment,
|
|
|
DialContext: (&net.Dialer{
|
|
|
- Timeout: timeout,
|
|
|
- KeepAlive: keepAlive,
|
|
|
+ Timeout: config.Timeout,
|
|
|
+ KeepAlive: config.KeepAlive,
|
|
|
}).DialContext,
|
|
|
- TLSHandshakeTimeout: 10 * time.Second,
|
|
|
- TLSClientConfig: tlsConfig,
|
|
|
+ TLSHandshakeTimeout: config.TLSHandshakeTimeout,
|
|
|
+ TLSClientConfig: &tls.Config{
|
|
|
+ InsecureSkipVerify: config.TLSInsecureSkipVerify,
|
|
|
+ },
|
|
|
},
|
|
|
}
|
|
|
|
|
|
- auth := &ClientAuth{
|
|
|
- Username: env.GetDBBasicAuthUsername(),
|
|
|
- Password: env.GetDBBasicAuthUserPassword(),
|
|
|
- BearerToken: env.GetDBBearerToken(),
|
|
|
+ client, err := prometheus.NewClient(pc)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
- return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, nil, queryLogFile)
|
|
|
+ return NewRateLimitedClient(
|
|
|
+ PrometheusClientID,
|
|
|
+ client,
|
|
|
+ config.QueryConcurrency,
|
|
|
+ config.Auth,
|
|
|
+ nil,
|
|
|
+ config.RetryOnRateLimitResponse,
|
|
|
+ config.QueryLogFile,
|
|
|
+ )
|
|
|
}
|
|
|
|
|
|
// LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
|