فهرست منبع

Merge pull request #462 from kubecost/bolt/max-query-concurrency

Rate Limited Prometheus Client
Matt Bolt 5 سال پیش
والد
کامیت
4ca6056b6a
3فایلهای تغییر یافته به همراه69 افزوده شده و 11 حذف شده
  1. 23 2
      pkg/costmodel/router.go
  2. 46 0
      pkg/prom/prom.go
  3. 0 9
      pkg/prom/query.go

+ 23 - 2
pkg/costmodel/router.go

@@ -24,6 +24,8 @@ import (
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	cm "github.com/kubecost/cost-model/pkg/clustermanager"
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -40,6 +42,7 @@ const (
 	logCollectionEnvVar            = "LOG_COLLECTION_ENABLED"
 	productAnalyticsEnvVar         = "PRODUCT_ANALYTICS_ENABLED"
 	errorReportingEnvVar           = "ERROR_REPORTING_ENABLED"
+	maxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
 	prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 	prometheusTroubleshootingEp    = "http://docs.kubecost.com/custom-prom#troubleshoot"
 	RFC3339Milli                   = "2006-01-02T15:04:05.000Z"
@@ -163,6 +166,22 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
+// Parses the max query concurrency environment variable
+func maxQueryConcurrency() int {
+	v := os.Getenv(maxQueryConcurrencyEnvVar)
+	if v == "" {
+		return 5
+	}
+
+	result, err := strconv.Atoi(v)
+	if err != nil {
+		log.Warningf("Failed to parse MAX_QUERY_CONCURRENCY. Defaulting to 5 - %s", err)
+		return 5
+	}
+
+	return result
+}
+
 // writeReportingFlags writes the reporting flags to the cluster info map
 func writeReportingFlags(clusterInfo map[string]string) {
 	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
@@ -928,6 +947,8 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
 	}
 
+	queryConcurrency := maxQueryConcurrency()
+
 	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
 		Proxy: http.ProxyFromEnvironment,
 		DialContext: (&net.Dialer{
@@ -941,7 +962,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		Address:      address,
 		RoundTripper: LongTimeoutRoundTripper,
 	}
-	promCli, _ := prometheusClient.NewClient(pc)
+	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency)
 
 	m, err := ValidatePrometheus(promCli, false)
 	if err != nil || m.Running == false {
@@ -1155,7 +1176,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 				Address:      thanosUrl,
 				RoundTripper: thanosRT,
 			}
-			thanosCli, _ := prometheusClient.NewClient(thanosConfig)
+			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency)
 
 			_, err = ValidatePrometheus(thanosCli, true)
 			if err != nil {

+ 46 - 0
pkg/prom/prom.go

@@ -0,0 +1,46 @@
+package prom
+
+import (
+	"context"
+	"net/http"
+	"net/url"
+
+	"github.com/kubecost/cost-model/pkg/util"
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+// NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
+// prometheus requests.
+func NewRateLimitedClient(config prometheus.Config, maxConcurrency int) (prometheus.Client, error) {
+	c, err := prometheus.NewClient(config)
+	if err != nil {
+		return nil, err
+	}
+
+	limiter := util.NewSemaphore(maxConcurrency)
+
+	return &RateLimitedPrometheusClient{
+		client:  c,
+		limiter: limiter,
+	}, nil
+}
+
+// Creates a new prometheus client which limits the total number of concurrent outbound requests
+// allowed at a given moment.
+type RateLimitedPrometheusClient struct {
+	client  prometheus.Client
+	limiter *util.Semaphore
+}
+
+// Passthrough to the prometheus client API
+func (rlpc *RateLimitedPrometheusClient) URL(ep string, args map[string]string) *url.URL {
+	return rlpc.URL(ep, args)
+}
+
+// Rate limit and passthrough to prometheus client API
+func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
+	rlpc.limiter.Acquire()
+	defer rlpc.limiter.Return()
+
+	return rlpc.client.Do(ctx, req)
+}

+ 0 - 9
pkg/prom/query.go

@@ -7,7 +7,6 @@ import (
 	"net/http"
 
 	"github.com/kubecost/cost-model/pkg/errors"
-	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
@@ -22,20 +21,15 @@ const (
 type Context struct {
 	Client         prometheus.Client
 	ErrorCollector *errors.ErrorCollector
-	semaphore      *util.Semaphore
 }
 
 // NewContext creates a new Promethues querying context from the given client
 func NewContext(client prometheus.Client) *Context {
 	var ec errors.ErrorCollector
 
-	// By deafult, allow 20 concurrent queries, which is the Prometheus default
-	sem := util.NewSemaphore(20)
-
 	return &Context{
 		Client:         client,
 		ErrorCollector: &ec,
-		semaphore:      sem,
 	}
 }
 
@@ -82,9 +76,6 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 }
 
 func (ctx *Context) query(query string) (interface{}, error) {
-	ctx.semaphore.Acquire()
-	defer ctx.semaphore.Return()
-
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)