Просмотр исходного кода

feat: add mimir support with org id header

Signed-off-by: Ilya Melnikov <me@melnikov.io>
Ilya Melnikov 3 лет назад
Родитель
Сommit
50f7249362
5 измененных файлов с 26 добавлено и 1 удалено
  1. 1 0
      pkg/costmodel/router.go
  2. 7 0
      pkg/env/costmodelenv.go
  3. 13 1
      pkg/prom/prom.go
  4. 4 0
      pkg/prom/ratelimitedclient_test.go
  5. 1 0
      pkg/thanos/thanos.go

+ 1 - 0
pkg/costmodel/router.go

@@ -1535,6 +1535,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		},
 		QueryConcurrency: queryConcurrency,
 		QueryLogFile:     "",
+		MimirHeaderOrgId: env.GetMimirHeaderOrgId(),
 	})
 	if err != nil {
 		log.Fatalf("Failed to create prometheus client, Error: %v", err)

+ 7 - 0
pkg/env/costmodelenv.go

@@ -90,6 +90,8 @@ const (
 	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
 	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 
+	MimirHeaderOrgIdEnvVar = "MIMIR_HEADER_ORG_ID"
+
 	IngestPodUIDEnvVar = "INGEST_POD_UID"
 
 	ETLReadOnlyMode = "ETL_READ_ONLY"
@@ -160,6 +162,11 @@ func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
 	return GetDuration(PrometheusRetryOnRateLimitDefaultWaitEnvVar, 100*time.Millisecond)
 }
 
+// GetMimirOrgIdHeader returns the default value for X-Scope-OrgID header used for requests in Mimir API.
+func GetMimirHeaderOrgId() string {
+	return Get(MimirHeaderOrgIdEnvVar, "")
+}
+
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
 // to all non-range queries made via our query context. This should only be applied when there is a significant delay in
 // data arriving in the target prom db. For example, if supplying a thanos or cortex querier for the prometheus server, using

+ 13 - 1
pkg/prom/prom.go

@@ -68,6 +68,9 @@ func (auth *ClientAuth) Apply(req *http.Request) {
 // during a retry. This is to prevent starvation on the request threads
 const MaxRetryAfterDuration = 10 * time.Second
 
+// Default header key for Mimir API requests
+const MimirHeader = "X-Scope-OrgID"
+
 // RateLimitRetryOpts contains retry options
 type RateLimitRetryOpts struct {
 	MaxRetries       int
@@ -122,6 +125,7 @@ type RateLimitedPrometheusClient struct {
 	rateLimitRetry *RateLimitRetryOpts
 	outbound       atomic.Int32
 	fileLogger     *golog.Logger
+	mimirOrgId     string
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -140,7 +144,8 @@ func NewRateLimitedClient(
 	auth *ClientAuth,
 	decorator QueryParamsDecorator,
 	rateLimitRetryOpts *RateLimitRetryOpts,
-	queryLogFile string) (prometheus.Client, error) {
+	queryLogFile string,
+	mimirHeaderOrgId string) (prometheus.Client, error) {
 
 	queue := collections.NewBlockingQueue[*workRequest]()
 
@@ -176,6 +181,7 @@ func NewRateLimitedClient(
 		rateLimitRetry: rateLimitRetryOpts,
 		auth:           auth,
 		fileLogger:     logger,
+		mimirOrgId:     mimirHeaderOrgId,
 	}
 
 	// Start concurrent request processing
@@ -313,6 +319,10 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 
 // Rate limit and passthrough to prometheus client API
 func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
+	if rlpc.mimirOrgId != "" {
+		req.Header.Set(MimirHeader, rlpc.mimirOrgId)
+	}
+
 	rlpc.auth.Apply(req)
 
 	respChan := make(chan *workResponse)
@@ -353,6 +363,7 @@ type PrometheusClientConfig struct {
 	Auth                  *ClientAuth
 	QueryConcurrency      int
 	QueryLogFile          string
+	MimirHeaderOrgId      string
 }
 
 // NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
@@ -387,6 +398,7 @@ func NewPrometheusClient(address string, config *PrometheusClientConfig) (promet
 		nil,
 		config.RateLimitRetryOpts,
 		config.QueryLogFile,
+		config.MimirHeaderOrgId,
 	)
 }
 

+ 4 - 0
pkg/prom/ratelimitedclient_test.go

@@ -142,6 +142,7 @@ func TestRateLimitedOnceAndSuccess(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {
@@ -183,6 +184,7 @@ func TestRateLimitedOnceAndFail(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {
@@ -229,6 +231,7 @@ func TestRateLimitedResponses(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {
@@ -342,6 +345,7 @@ func TestConcurrentRateLimiting(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {

+ 1 - 0
pkg/thanos/thanos.go

@@ -104,5 +104,6 @@ func NewThanosClient(address string, config *prom.PrometheusClientConfig) (prome
 		maxSourceDecorator,
 		config.RateLimitRetryOpts,
 		config.QueryLogFile,
+		"",
 	)
 }