Просмотр исходного кода

Add a query parameter decorator to allow specific query parameters to be added to prometheus/thanos queries.

(cherry picked from commit fb230ee4fd8b756fad1fc200eab4b605d130f006)
Matt Bolt 5 лет назад
Родитель
Сommit
1728a1b784
2 измененных файлов с 28 добавлено и 3 удалено
  1. 17 2
      pkg/prom/prom.go
  2. 11 1
      pkg/thanos/thanos.go

+ 17 - 2
pkg/prom/prom.go

@@ -18,6 +18,14 @@ import (
 	prometheus "github.com/prometheus/client_golang/api"
 )
 
+//--------------------------------------------------------------------------
+//  QueryParamsDecorator
+//--------------------------------------------------------------------------
+
+// QueryParamsDecorator is used to decorate and return query parameters for
+// outgoing requests
+type QueryParamsDecorator = func(path string, values url.Values) url.Values
+
 //--------------------------------------------------------------------------
 //  ClientAuth
 //--------------------------------------------------------------------------
@@ -56,6 +64,7 @@ type RateLimitedPrometheusClient struct {
 	client     prometheus.Client
 	auth       *ClientAuth
 	queue      util.BlockingQueue
+	decorator  QueryParamsDecorator
 	outbound   *util.AtomicInt32
 	fileLogger *golog.Logger
 }
@@ -69,7 +78,7 @@ type requestCounter interface {
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
-func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, queryLogFile string) (prometheus.Client, error) {
+func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, decorator QueryParamsDecorator, queryLogFile string) (prometheus.Client, error) {
 	c, err := prometheus.NewClient(config)
 	if err != nil {
 		return nil, err
@@ -97,6 +106,7 @@ func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency in
 		id:         id,
 		client:     c,
 		queue:      queue,
+		decorator:  decorator,
 		outbound:   outbound,
 		auth:       auth,
 		fileLogger: logger,
@@ -167,6 +177,11 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 			ctx := we.ctx
 			req := we.req
 
+			// decorate the raw query parameters
+			if rlpc.decorator != nil {
+				req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
+			}
+
 			// measure time in queue
 			timeInQueue := time.Since(we.start)
 
@@ -238,7 +253,7 @@ func NewPrometheusClient(address string, timeout, keepAlive time.Duration, query
 		BearerToken: env.GetDBBearerToken(),
 	}
 
-	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, queryLogFile)
+	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, nil, queryLogFile)
 }
 
 // LogPrometheusClientState logs the current state, with respect to outbound requests, if that

+ 11 - 1
pkg/thanos/thanos.go

@@ -5,6 +5,8 @@ import (
 	"fmt"
 	"net"
 	"net/http"
+	"net/url"
+	"strings"
 	"sync"
 	"time"
 
@@ -82,5 +84,13 @@ func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConc
 		BearerToken: env.GetMultiClusterBearerToken(),
 	}
 
-	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, queryLogFile)
+	// max source resolution decorator
+	maxSourceDecorator := func(path string, queryParams url.Values) url.Values {
+		if strings.Contains(path, "query_range") {
+			queryParams.Set("max_source_resolution", "5m")
+		}
+		return queryParams
+	}
+
+	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, maxSourceDecorator, queryLogFile)
 }