2
0
Эх сурвалжийг харах

Replace unfair semaphore throttling with fair blocking queue implementation for prometheus queries.

Matt Bolt 5 жил өмнө
parent
commit
14921a283f
2 өөрчлөгдсөн 202 нэмэгдсэн , 24 устгасан
  1. 121 24
      pkg/prom/prom.go
  2. 81 0
      pkg/util/blockingqueue.go

+ 121 - 24
pkg/prom/prom.go

@@ -4,6 +4,10 @@ import (
 	"context"
 	"net/http"
 	"net/url"
+	"os"
+	"time"
+
+	golog "log"
 
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -15,41 +19,63 @@ import (
 type RateLimitedPrometheusClient struct {
 	client      prometheus.Client
 	limiter     *util.Semaphore
-	requests    *util.AtomicInt32
+	queue       util.BlockingQueue
 	outbound    *util.AtomicInt32
 	username    string
 	password    string
 	bearerToken string
+	fileLogger  *golog.Logger
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
 // the concurrent outbound requests
 type requestCounter interface {
-	TotalRequests() int32
-	TotalOutboundRequests() int32
+	TotalRequests() int
+	TotalOutboundRequests() int
 }
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
-func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password, bearerToken string) (prometheus.Client, error) {
+func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password, bearerToken string, queryLogFile string) (prometheus.Client, error) {
 	c, err := prometheus.NewClient(config)
 	if err != nil {
 		return nil, err
 	}
 
-	limiter := util.NewSemaphore(maxConcurrency)
-	requests := util.NewAtomicInt32(0)
+	queue := util.NewBlockingQueue()
 	outbound := util.NewAtomicInt32(0)
 
-	return &RateLimitedPrometheusClient{
+	var logger *golog.Logger
+	if queryLogFile != "" {
+		exists, err := util.FileExists(queryLogFile)
+		if exists {
+			os.Remove(queryLogFile)
+		}
+
+		f, err := os.OpenFile(queryLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
+		if err != nil {
+			log.Infof("Failed to open queryLogFile: %s for query logging: %s", queryLogFile, err)
+		} else {
+			logger = golog.New(f, "query-log", golog.LstdFlags)
+		}
+	}
+
+	rlpc := &RateLimitedPrometheusClient{
 		client:      c,
-		limiter:     limiter,
-		requests:    requests,
+		queue:       queue,
 		outbound:    outbound,
 		username:    username,
 		password:    password,
 		bearerToken: bearerToken,
-	}, nil
+		fileLogger:  logger,
+	}
+
+	// Start concurrent request processing
+	for i := 0; i < maxConcurrency; i++ {
+		go rlpc.worker()
+	}
+
+	return rlpc, nil
 }
 
 // LogPrometheusClientState logs the current state, with respect to outbound requests, if that
@@ -64,16 +90,27 @@ func LogPrometheusClientState(client prometheus.Client) {
 	}
 }
 
+// LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
+func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
+	if l == nil {
+		return
+	}
+	qp := util.NewQueryParams(req.URL.Query())
+	query := qp.Get("query", "<Unknown>")
+
+	l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
+}
+
 // TotalRequests returns the total number of requests that are either waiting to be sent and/or
 // are currently outbound.
-func (rlpc *RateLimitedPrometheusClient) TotalRequests() int32 {
-	return rlpc.requests.Get()
+func (rlpc *RateLimitedPrometheusClient) TotalRequests() int {
+	return rlpc.queue.Length()
 }
 
 // TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
 // sent to the server and are awaiting response.
-func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int32 {
-	return rlpc.outbound.Get()
+func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int {
+	return int(rlpc.outbound.Get())
 }
 
 // Passthrough to the prometheus client API
@@ -81,6 +118,66 @@ func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string)
 	return rlpc.client.URL(ep, args)
 }
 
+// workRequest is used to queue requests
+type workRequest struct {
+	ctx      context.Context
+	req      *http.Request
+	start    time.Time
+	respChan chan *workResponse
+	// used as a sentinel value to close the worker goroutine
+	closer bool
+}
+
+// workResponse is the response payload returned to the Do method
+type workResponse struct {
+	res      *http.Response
+	body     []byte
+	warnings prometheus.Warnings
+	err      error
+}
+
+// worker is used as a consumer goroutine to pull workRequest from the blocking queue and execute them
+func (rlpc *RateLimitedPrometheusClient) worker() {
+	for {
+		// blocks until there is an item available
+		item := rlpc.queue.Dequeue()
+
+		// Ensure the dequeued item was a workRequest
+		if we, ok := item.(*workRequest); ok {
+			// if we need to shut down all workers, we'll need to submit sentinel values
+			// that will force the worker to return
+			if we.closer {
+				return
+			}
+
+			ctx := we.ctx
+			req := we.req
+
+			// measure time in queue
+			timeInQueue := time.Since(we.start)
+
+			// Increment outbound counter
+			rlpc.outbound.Increment()
+
+			// Execute Request
+			roundTripStart := time.Now()
+			res, body, warnings, err := rlpc.client.Do(ctx, req)
+
+			// Decrement outbound counter
+			rlpc.outbound.Decrement()
+			LogQueryRequest(rlpc.fileLogger, req, timeInQueue, time.Since(roundTripStart))
+
+			// Pass back response data over channel to caller
+			we.respChan <- &workResponse{
+				res:      res,
+				body:     body,
+				warnings: warnings,
+				err:      err,
+			}
+		}
+	}
+}
+
 // Rate limit and passthrough to prometheus client API
 func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
 	if rlpc.username != "" {
@@ -90,17 +187,17 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 		token := "Bearer " + rlpc.bearerToken
 		req.Header.Add("Authorization", token)
 	}
-	// Increment the total request counter first
-	rlpc.requests.Increment()
-	defer rlpc.requests.Decrement()
 
-	// Acquire mutex based on concurrency limiter
-	rlpc.limiter.Acquire()
-	defer rlpc.limiter.Return()
+	respChan := make(chan *workResponse)
+	defer close(respChan)
 
-	// Increment outbound once mutex acquired
-	rlpc.outbound.Increment()
-	defer rlpc.outbound.Decrement()
+	rlpc.queue.Enqueue(&workRequest{
+		ctx:      ctx,
+		req:      req,
+		respChan: respChan,
+		closer:   false,
+	})
 
-	return rlpc.client.Do(ctx, req)
+	workRes := <-respChan
+	return workRes.res, workRes.body, workRes.warnings, workRes.err
 }

+ 81 - 0
pkg/util/blockingqueue.go

@@ -0,0 +1,81 @@
+package util
+
+import (
+	"sync"
+)
+
+//--------------------------------------------------------------------------
+//  BlockingQueue
+//--------------------------------------------------------------------------
+
+// BlockingQueue is a queue backed by a slice which blocks if dequeueing while empty.
+// This data structure should use a pool of worker goroutines to await work.
+type BlockingQueue interface {
+	// Enqueue pushes an item onto the queue
+	Enqueue(item interface{})
+
+	// Dequeue removes the first item from the queue and returns it.
+	Dequeue() interface{}
+
+	// Length returns the length of the queue
+	Length() int
+
+	// IsEmpty returns true if the queue is empty
+	IsEmpty() bool
+}
+
+// blockingSliceQueue is an implementation of BlockingQueue which uses a slice for storage.
+type blockingSliceQueue struct {
+	q        []interface{}
+	l        *sync.Mutex
+	nonEmpty *sync.Cond
+}
+
+// NewBlockingQueue returns a new BlockingQueue implementation
+func NewBlockingQueue() BlockingQueue {
+	l := new(sync.Mutex)
+
+	return &blockingSliceQueue{
+		q:        []interface{}{},
+		l:        l,
+		nonEmpty: sync.NewCond(l),
+	}
+}
+
+// Enqueue pushes an item onto the queue
+func (q *blockingSliceQueue) Enqueue(item interface{}) {
+	q.l.Lock()
+	defer q.l.Unlock()
+
+	q.q = append(q.q, item)
+	q.nonEmpty.Broadcast()
+}
+
+// Dequeue removes the first item from the queue and returns it.
+func (q *blockingSliceQueue) Dequeue() interface{} {
+	q.l.Lock()
+	defer q.l.Unlock()
+
+	// need to tight loop here to ensure only one thread wins and
+	// others wait again
+	for len(q.q) == 0 {
+		q.nonEmpty.Wait()
+	}
+
+	e := q.q[0]
+	q.q = q.q[1:]
+	return e
+}
+
+// Length returns the length of the queue
+func (q *blockingSliceQueue) Length() int {
+	q.l.Lock()
+	defer q.l.Unlock()
+
+	return len(q.q)
+}
+
+// IsEmpty returns true if the queue is empty
+func (q *blockingSliceQueue) IsEmpty() bool {
+	return q.Length() == 0
+}