2
0
Эх сурвалжийг харах

Add profiling queries to query.go and add request counts to RateLimitedPrometheusClient and a function to log the current request counts

Matt Bolt 5 жил өмнө
parent
commit
e0e82ef8dc
3 өөрчлөгдсөн 163 нэмэгдсэн , 27 устгасан
  1. 54 7
      pkg/prom/prom.go
  2. 72 20
      pkg/prom/query.go
  3. 37 0
      pkg/util/atomic.go

+ 54 - 7
pkg/prom/prom.go

@@ -5,10 +5,29 @@ import (
 	"net/http"
 	"net/url"
 
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 )
 
+// Creates a new prometheus client which limits the total number of concurrent outbound requests
+// allowed at a given moment.
+type RateLimitedPrometheusClient struct {
+	client   prometheus.Client
+	limiter  *util.Semaphore
+	requests *util.AtomicInt32
+	outbound *util.AtomicInt32
+	username string
+	password string
+}
+
+// requestCounter is used to determine if the prometheus client keeps track of
+// the concurrent outbound requests
+type requestCounter interface {
+	TotalRequests() int32
+	TotalOutboundRequests() int32
+}
+
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
 func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password string) (prometheus.Client, error) {
@@ -18,22 +37,41 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	}
 
 	limiter := util.NewSemaphore(maxConcurrency)
+	requests := util.NewAtomicInt32(0)
+	outbound := util.NewAtomicInt32(0)
 
 	return &RateLimitedPrometheusClient{
 		client:   c,
 		limiter:  limiter,
+		requests: requests,
+		outbound: outbound,
 		username: username,
 		password: password,
 	}, nil
 }
 
-// Creates a new prometheus client which limits the total number of concurrent outbound requests
-// allowed at a given moment.
-type RateLimitedPrometheusClient struct {
-	client   prometheus.Client
-	limiter  *util.Semaphore
-	username string
-	password string
+// LogPrometheusClientState logs the current state, with respect to outbound requests, if that
+// information is available.
+func LogPrometheusClientState(client prometheus.Client) {
+	if rc, ok := client.(requestCounter); ok {
+		total := rc.TotalRequests()
+		outbound := rc.TotalOutboundRequests()
+		queued := total - outbound
+
+		log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
+	}
+}
+
+// TotalRequests returns the total number of requests that are either waiting to be sent and/or
+// are currently outbound.
+func (rlpc *RateLimitedPrometheusClient) TotalRequests() int32 {
+	return rlpc.requests.Get()
+}
+
+// TotalOutboundRequests returns the total number of concurrent outbound requests, which have been
+// sent to the server and are awaiting response.
+func (rlpc *RateLimitedPrometheusClient) TotalOutboundRequests() int32 {
+	return rlpc.outbound.Get()
 }
 
 // Passthrough to the prometheus client API
@@ -46,8 +84,17 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 	if rlpc.username != "" {
 		req.SetBasicAuth(rlpc.username, rlpc.password)
 	}
+	// Increment the total request counter first
+	rlpc.requests.Increment()
+	defer rlpc.requests.Decrement()
+
+	// Acquire mutex based on concurrency limiter
 	rlpc.limiter.Acquire()
 	defer rlpc.limiter.Return()
 
+	// Increment outbound once mutex acquired
+	rlpc.outbound.Increment()
+	defer rlpc.outbound.Decrement()
+
 	return rlpc.client.Do(ctx, req)
 }

+ 72 - 20
pkg/prom/query.go

@@ -10,6 +10,7 @@ import (
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
@@ -54,19 +55,18 @@ func (ctx *Context) HasErrors() bool {
 func (ctx *Context) Query(query string) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go func(ctx *Context, resCh QueryResultsChan) {
-		defer errors.HandlePanic()
+	go runQuery(query, ctx, resCh, "")
 
-		raw, promErr := ctx.query(query)
-		ctx.ErrorCollector.Report(promErr)
+	return resCh
+}
 
-		results := NewQueryResults(query, raw)
-		if results.Error != nil {
-			ctx.ErrorCollector.Report(results.Error)
-		}
+// ProfileQuery returns a QueryResultsChan, then runs the given query with a profile
+// label and sends the results on the provided channel. Receiver is responsible for closing the
+// channel, preferably using the Read method.
+func (ctx *Context) ProfileQuery(query string, profileLabel string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
 
-		resCh <- results
-	}(ctx, resCh)
+	go runQuery(query, ctx, resCh, profileLabel)
 
 	return resCh
 }
@@ -85,6 +85,20 @@ func (ctx *Context) QueryAll(queries ...string) []QueryResultsChan {
 	return resChs
 }
 
+// ProfileQueryAll returns one QueryResultsChan for each query provided, then runs
+// each ProfileQuery concurrently and returns results on each channel, respectively,
+// in the order they were provided; i.e. the response to queries[1] will be
+// sent on channel resChs[1].
+func (ctx *Context) ProfileQueryAll(queries ...string) []QueryResultsChan {
+	resChs := []QueryResultsChan{}
+
+	for _, q := range queries {
+		resChs = append(resChs, ctx.ProfileQuery(q, fmt.Sprintf("Query #%d", len(resChs)+1)))
+	}
+
+	return resChs
+}
+
 func (ctx *Context) QuerySync(query string) ([]*QueryResult, error) {
 	raw, err := ctx.query(query)
 	if err != nil {
@@ -104,6 +118,27 @@ func (ctx *Context) QueryURL() *url.URL {
 	return ctx.Client.URL(epQuery, nil)
 }
 
+// runQuery executes the prometheus query asynchronously, collects results and
+// errors, and passes them through the results channel.
+func runQuery(query string, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+	defer errors.HandlePanic()
+	startQuery := time.Now()
+
+	raw, promErr := ctx.query(query)
+	ctx.ErrorCollector.Report(promErr)
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		ctx.ErrorCollector.Report(results.Error)
+	}
+
+	if profileLabel != "" {
+		log.Profile(startQuery, profileLabel)
+	}
+
+	resCh <- results
+}
+
 func (ctx *Context) query(query string) (interface{}, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
@@ -139,19 +174,15 @@ func (ctx *Context) query(query string) (interface{}, error) {
 func (ctx *Context) QueryRange(query string, start, end time.Time, step time.Duration) QueryResultsChan {
 	resCh := make(QueryResultsChan)
 
-	go func(ctx *Context, resCh QueryResultsChan) {
-		defer errors.HandlePanic()
+	go runQueryRange(query, start, end, step, ctx, resCh, "")
 
-		raw, promErr := ctx.queryRange(query, start, end, step)
-		ctx.ErrorCollector.Report(promErr)
+	return resCh
+}
 
-		results := NewQueryResults(query, raw)
-		if results.Error != nil {
-			ctx.ErrorCollector.Report(results.Error)
-		}
+func (ctx *Context) ProfileQueryRange(query string, start, end time.Time, step time.Duration, profileLabel string) QueryResultsChan {
+	resCh := make(QueryResultsChan)
 
-		resCh <- results
-	}(ctx, resCh)
+	go runQueryRange(query, start, end, step, ctx, resCh, profileLabel)
 
 	return resCh
 }
@@ -175,6 +206,27 @@ func (ctx *Context) QueryRangeURL() *url.URL {
 	return ctx.Client.URL(epQueryRange, nil)
 }
 
+// runQueryRange executes the prometheus queryRange asynchronously, collects results and
+// errors, and passes them through the results channel.
+func runQueryRange(query string, start, end time.Time, step time.Duration, ctx *Context, resCh QueryResultsChan, profileLabel string) {
+	defer errors.HandlePanic()
+	startQuery := time.Now()
+
+	raw, promErr := ctx.queryRange(query, start, end, step)
+	ctx.ErrorCollector.Report(promErr)
+
+	results := NewQueryResults(query, raw)
+	if results.Error != nil {
+		ctx.ErrorCollector.Report(results.Error)
+	}
+
+	if profileLabel != "" {
+		log.Profile(startQuery, profileLabel)
+	}
+
+	resCh <- results
+}
+
 func (ctx *Context) queryRange(query string, start, end time.Time, step time.Duration) (interface{}, error) {
 	u := ctx.Client.URL(epQueryRange, nil)
 	q := u.Query()

+ 37 - 0
pkg/util/atomic.go

@@ -0,0 +1,37 @@
+package util
+
+import "sync/atomic"
+
+type AtomicInt32 int32
+
+// NewAtomicInt32 creates a new atomic int32 instance.
+func NewAtomicInt32(value int32) *AtomicInt32 {
+	ai := new(AtomicInt32)
+	ai.Set(value)
+	return ai
+}
+
+// Loads the bool value atomically
+func (ai *AtomicInt32) Get() int32 {
+	return atomic.LoadInt32((*int32)(ai))
+}
+
+// Sets the bool value atomically
+func (ai *AtomicInt32) Set(value int32) {
+	atomic.StoreInt32((*int32)(ai), value)
+}
+
+// Increments the atomic int and returns the new value
+func (ai *AtomicInt32) Increment() int32 {
+	return atomic.AddInt32((*int32)(ai), 1)
+}
+
+// Decrements the atomint int and returns the new value
+func (ai *AtomicInt32) Decrement() int32 {
+	return atomic.AddInt32((*int32)(ai), -1)
+}
+
+// CompareAndSet sets value to new if current is equal to the current value
+func (ai *AtomicInt32) CompareAndSet(current, new int32) bool {
+	return atomic.CompareAndSwapInt32((*int32)(ai), current, new)
+}