Kaynağa Gözat

Updated BlockingQueue to leverage generics, updated implementation in Prometheus rate limiter to use generic version.

Matt Bolt 4 yıl önce
ebeveyn
işleme
3e23697cb0
2 değiştirilmiş dosya ile 95 ekleme ve 92 silme
  1. 28 22
      pkg/collections/blockingqueue.go
  2. 67 70
      pkg/prom/prom.go

+ 28 - 22
pkg/collections/blockingqueue.go

@@ -10,20 +10,20 @@ import (
 
 // BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
 // This data structure should use a pool of worker goroutines to await work.
-type BlockingQueue interface {
+type BlockingQueue[T any] interface {
 	// Enqueue pushes an item onto the queue
-	Enqueue(item interface{})
+	Enqueue(item T)
 
 	// Dequeue removes the first item from the queue and returns it.
-	Dequeue() interface{}
+	Dequeue() T
 
 	// TryDequeue attempts to remove the first item from the queue and return it. This
 	// method does not block, and instead, returns true if the item was available and false
 	// otherwise
-	TryDequeue() (interface{}, bool)
+	TryDequeue() (T, bool)
 
 	// Each blocks modification and allows iteration of the queue.
-	Each(f func(int, interface{}))
+	Each(f func(int, T))
 
 	// Length returns the length of the queue
 	Length() int
@@ -36,25 +36,25 @@ type BlockingQueue interface {
 }
 
 // blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
-type blockingSliceQueue struct {
-	q        []interface{}
+type blockingSliceQueue[T any] struct {
+	q        []T
 	l        *sync.Mutex
 	nonEmpty *sync.Cond
 }
 
 // NewBlockingQueue returns a new BlockingQueue implementation
-func NewBlockingQueue() BlockingQueue {
+func NewBlockingQueue[T any]() BlockingQueue[T] {
 	l := new(sync.Mutex)
 
-	return &blockingSliceQueue{
-		q:        []interface{}{},
+	return &blockingSliceQueue[T]{
+		q:        []T{},
 		l:        l,
 		nonEmpty: sync.NewCond(l),
 	}
 }
 
 // Enqueue pushes an item onto the queue
-func (q *blockingSliceQueue) Enqueue(item interface{}) {
+func (q *blockingSliceQueue[T]) Enqueue(item T) {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -63,7 +63,7 @@ func (q *blockingSliceQueue) Enqueue(item interface{}) {
 }
 
 // Dequeue removes the first item from the queue and returns it.
-func (q *blockingSliceQueue) Dequeue() interface{} {
+func (q *blockingSliceQueue[T]) Dequeue() T {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -76,7 +76,7 @@ func (q *blockingSliceQueue) Dequeue() interface{} {
 	e := q.q[0]
 
 	// nil 0 index to prevent leak
-	q.q[0] = nil
+	q.q[0] = defaultValue[T]()
 	q.q = q.q[1:]
 	return e
 }
@@ -84,24 +84,24 @@ func (q *blockingSliceQueue) Dequeue() interface{} {
 // TryDequeue attempts to remove the first item from the queue and return it. This
 // method does not block, and instead, returns true if the item was available and false
 // otherwise
-func (q *blockingSliceQueue) TryDequeue() (interface{}, bool) {
+func (q *blockingSliceQueue[T]) TryDequeue() (T, bool) {
 	q.l.Lock()
 	defer q.l.Unlock()
 
 	if len(q.q) == 0 {
-		return nil, false
+		return defaultValue[T](), false
 	}
 
 	e := q.q[0]
 
-	// nil 0 index to prevent leak
-	q.q[0] = nil
+	// default 0 index to prevent leak
+	q.q[0] = defaultValue[T]()
 	q.q = q.q[1:]
 	return e, true
 }
 
 // Each blocks modification and allows iteration of the queue.
-func (q *blockingSliceQueue) Each(f func(int, interface{})) {
+func (q *blockingSliceQueue[T]) Each(f func(int, T)) {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -111,7 +111,7 @@ func (q *blockingSliceQueue) Each(f func(int, interface{})) {
 }
 
 // Length returns the length of the queue
-func (q *blockingSliceQueue) Length() int {
+func (q *blockingSliceQueue[T]) Length() int {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -119,12 +119,12 @@ func (q *blockingSliceQueue) Length() int {
 }
 
 // IsEmpty returns true if the queue is empty
-func (q *blockingSliceQueue) IsEmpty() bool {
+func (q *blockingSliceQueue[T]) IsEmpty() bool {
 	return q.Length() == 0
 }
 
 // Clear empties the queue
-func (q *blockingSliceQueue) Clear() {
+func (q *blockingSliceQueue[T]) Clear() {
 	q.l.Lock()
 	defer q.l.Unlock()
 
@@ -132,5 +132,11 @@ func (q *blockingSliceQueue) Clear() {
 	// avoid capacity ballooning, but does feel like an implementation
 	// specific detail -- we can revisit if there are other relevant
 	// use-cases
-	q.q = []interface{}{}
+	q.q = []T{}
+}
+
+// defaultValue returns the language default for the T type
+func defaultValue[T any]() T {
+	var t T
+	return t
 }

+ 67 - 70
pkg/prom/prom.go

@@ -114,7 +114,7 @@ type RateLimitedPrometheusClient struct {
 	id             string
 	client         prometheus.Client
 	auth           *ClientAuth
-	queue          collections.BlockingQueue
+	queue          collections.BlockingQueue[*workRequest]
 	decorator      QueryParamsDecorator
 	rateLimitRetry *RateLimitRetryOpts
 	outbound       *atomic.AtomicInt32
@@ -139,7 +139,7 @@ func NewRateLimitedClient(
 	rateLimitRetryOpts *RateLimitRetryOpts,
 	queryLogFile string) (prometheus.Client, error) {
 
-	queue := collections.NewBlockingQueue()
+	queue := collections.NewBlockingQueue[*workRequest]()
 	outbound := atomic.NewAtomicInt32(0)
 
 	var logger *golog.Logger
@@ -235,83 +235,80 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 
 	for {
 		// blocks until there is an item available
-		item := rlpc.queue.Dequeue()
-
-		// Ensure the dequeued item was a workRequest
-		if we, ok := item.(*workRequest); ok {
-			// if we need to shut down all workers, we'll need to submit sentinel values
-			// that will force the worker to return
-			if we.closer {
-				return
-			}
+		we := rlpc.queue.Dequeue()
 
-			ctx := we.ctx
-			req := we.req
+		// if we need to shut down all workers, we'll need to submit sentinel values
+		// that will force the worker to return
+		if we.closer {
+			return
+		}
 
-			// decorate the raw query parameters
-			if rlpc.decorator != nil {
-				req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
-			}
+		ctx := we.ctx
+		req := we.req
 
-			// measure time in queue
-			timeInQueue := time.Since(we.start)
-
-			// Increment outbound counter
-			rlpc.outbound.Increment()
-
-			// Execute Request
-			roundTripStart := time.Now()
-			res, body, warnings, err := rlpc.client.Do(ctx, req)
-
-			// If retries on rate limited response is enabled:
-			// * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
-			// * Attempt to parse a Retry-After from response headers (common on 429)
-			// * If we couldn't determine how long to wait for a retry, use 1 second by default
-			if res != nil && retryRateLimit {
-				var status []*RateLimitResponseStatus
-				var retries int = retryOpts.MaxRetries
-				var defaultWait time.Duration = retryOpts.DefaultRetryWait
-
-				for httputil.IsRateLimited(res, body) && retries > 0 {
-					// calculate amount of time to wait before retry, in the event the default wait is used,
-					// an exponential backoff is applied based on the number of times we've retried.
-					retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
-					retries--
-
-					status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
-					log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
-
-					// To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
-					// to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
-					// don't want to block for 10 days.
-					if retryAfter > MaxRetryAfterDuration {
-						retryAfter = MaxRetryAfterDuration
-					}
-
-					// execute wait and retry
-					time.Sleep(retryAfter)
-					res, body, warnings, err = rlpc.client.Do(ctx, req)
-				}
+		// decorate the raw query parameters
+		if rlpc.decorator != nil {
+			req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
+		}
 
-				// if we've broken out of our retry loop and the resp is still rate limited,
-				// then let's generate a meaningful error to pass back
-				if retries == 0 && httputil.IsRateLimited(res, body) {
-					err = &RateLimitedResponseError{RateLimitStatus: status}
+		// measure time in queue
+		timeInQueue := time.Since(we.start)
+
+		// Increment outbound counter
+		rlpc.outbound.Increment()
+
+		// Execute Request
+		roundTripStart := time.Now()
+		res, body, warnings, err := rlpc.client.Do(ctx, req)
+
+		// If retries on rate limited response is enabled:
+		// * Check for a 429 StatusCode OR 400 StatusCode and message containing "ThrottlingException"
+		// * Attempt to parse a Retry-After from response headers (common on 429)
+		// * If we couldn't determine how long to wait for a retry, use 1 second by default
+		if res != nil && retryRateLimit {
+			var status []*RateLimitResponseStatus
+			var retries int = retryOpts.MaxRetries
+			var defaultWait time.Duration = retryOpts.DefaultRetryWait
+
+			for httputil.IsRateLimited(res, body) && retries > 0 {
+				// calculate amount of time to wait before retry, in the event the default wait is used,
+				// an exponential backoff is applied based on the number of times we've retried.
+				retryAfter := httputil.RateLimitedRetryFor(res, defaultWait, retryOpts.MaxRetries-retries)
+				retries--
+
+				status = append(status, &RateLimitResponseStatus{RetriesRemaining: retries, WaitTime: retryAfter})
+				log.DedupedInfof(50, "Rate Limited Prometheus Request. Waiting for: %d ms. Retries Remaining: %d", retryAfter.Milliseconds(), retries)
+
+				// To prevent total starvation of request threads, hard limit wait time to 10s. We also want quota limits/throttles
+				// to eventually pass through as an error. For example, if some quota is reached with 10 days left, we clearly
+				// don't want to block for 10 days.
+				if retryAfter > MaxRetryAfterDuration {
+					retryAfter = MaxRetryAfterDuration
 				}
-			}
 
-			// Decrement outbound counter
-			rlpc.outbound.Decrement()
-			LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
+				// execute wait and retry
+				time.Sleep(retryAfter)
+				res, body, warnings, err = rlpc.client.Do(ctx, req)
+			}
 
-			// Pass back response data over channel to caller
-			we.respChan <- &workResponse{
-				res:      res,
-				body:     body,
-				warnings: warnings,
-				err:      err,
+			// if we've broken out of our retry loop and the resp is still rate limited,
+			// then let's generate a meaningful error to pass back
+			if retries == 0 && httputil.IsRateLimited(res, body) {
+				err = &RateLimitedResponseError{RateLimitStatus: status}
 			}
 		}
+
+		// Decrement outbound counter
+		rlpc.outbound.Decrement()
+		LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
+
+		// Pass back response data over channel to caller
+		we.respChan <- &workResponse{
+			res:      res,
+			body:     body,
+			warnings: warnings,
+			err:      err,
+		}
 	}
 }