Browse Source

Merge pull request #1928 from melnikovio/develop

Add mimir support with org id header
Niko Kovacevic 2 years ago
parent
commit
dfe384ab5c
5 changed files with 47 additions and 18 deletions
  1. 3 2
      pkg/costmodel/router.go
  2. 11 0
      pkg/env/costmodelenv.go
  3. 28 16
      pkg/prom/prom.go
  4. 4 0
      pkg/prom/ratelimitedclient_test.go
  5. 1 0
      pkg/thanos/thanos.go

+ 3 - 2
pkg/costmodel/router.go

@@ -1533,8 +1533,9 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 			Password:    env.GetDBBasicAuthUserPassword(),
 			BearerToken: env.GetDBBearerToken(),
 		},
-		QueryConcurrency: queryConcurrency,
-		QueryLogFile:     "",
+		QueryConcurrency:  queryConcurrency,
+		QueryLogFile:      "",
+		HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
 	})
 	if err != nil {
 		log.Fatalf("Failed to create prometheus client, Error: %v", err)

+ 11 - 0
pkg/env/costmodelenv.go

@@ -92,6 +92,8 @@ const (
 	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
 	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 
+	PrometheusHeaderXScopeOrgIdEnvVar = "PROMETHEUS_HEADER_X_SCOPE_ORGID"
+
 	IngestPodUIDEnvVar = "INGEST_POD_UID"
 
 	ETLReadOnlyMode = "ETL_READ_ONLY"
@@ -162,6 +164,15 @@ func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
 	return GetDuration(PrometheusRetryOnRateLimitDefaultWaitEnvVar, 100*time.Millisecond)
 }
 
+// GetPrometheusHeaderXScopeOrgId returns the default value for X-Scope-OrgID header used for requests in Mimir/Cortex-Tenant API.
+// To use Mimir(or Cortex-Tenant) instead of Prometheus add variable from cluster settings:
+// "PROMETHEUS_HEADER_X_SCOPE_ORGID": "my-cluster-name"
+// Then set Prometheus URL to prometheus API endpoint:
+// "PROMETHEUS_SERVER_ENDPOINT": "http://mimir-url/prometheus/"
+func GetPrometheusHeaderXScopeOrgId() string {
+	return Get(PrometheusHeaderXScopeOrgIdEnvVar, "")
+}
+
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
 // to all non-range queries made via our query context. This should only be applied when there is a significant delay in
 // data arriving in the target prom db. For example, if supplying a thanos or cortex querier for the prometheus server, using

+ 28 - 16
pkg/prom/prom.go

@@ -68,6 +68,9 @@ func (auth *ClientAuth) Apply(req *http.Request) {
 // during a retry. This is to prevent starvation on the request threads
 const MaxRetryAfterDuration = 10 * time.Second
 
+// Default header key for Mimir/Cortex-Tenant API requests
+const HeaderXScopeOrgId = "X-Scope-OrgID"
+
 // RateLimitRetryOpts contains retry options
 type RateLimitRetryOpts struct {
 	MaxRetries       int
@@ -114,14 +117,15 @@ func (rlre *RateLimitedResponseError) Error() string {
 // RateLimitedPrometheusClient is a prometheus client which limits the total number of
 // concurrent outbound requests allowed at a given moment.
 type RateLimitedPrometheusClient struct {
-	id             string
-	client         prometheus.Client
-	auth           *ClientAuth
-	queue          collections.BlockingQueue[*workRequest]
-	decorator      QueryParamsDecorator
-	rateLimitRetry *RateLimitRetryOpts
-	outbound       atomic.Int32
-	fileLogger     *golog.Logger
+	id                string
+	client            prometheus.Client
+	auth              *ClientAuth
+	queue             collections.BlockingQueue[*workRequest]
+	decorator         QueryParamsDecorator
+	rateLimitRetry    *RateLimitRetryOpts
+	outbound          atomic.Int32
+	fileLogger        *golog.Logger
+	headerXScopeOrgId string
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -140,7 +144,8 @@ func NewRateLimitedClient(
 	auth *ClientAuth,
 	decorator QueryParamsDecorator,
 	rateLimitRetryOpts *RateLimitRetryOpts,
-	queryLogFile string) (prometheus.Client, error) {
+	queryLogFile string,
+	headerXScopeOrgId string) (prometheus.Client, error) {
 
 	queue := collections.NewBlockingQueue[*workRequest]()
 
@@ -169,13 +174,14 @@ func NewRateLimitedClient(
 	}
 
 	rlpc := &RateLimitedPrometheusClient{
-		id:             id,
-		client:         client,
-		queue:          queue,
-		decorator:      decorator,
-		rateLimitRetry: rateLimitRetryOpts,
-		auth:           auth,
-		fileLogger:     logger,
+		id:                id,
+		client:            client,
+		queue:             queue,
+		decorator:         decorator,
+		rateLimitRetry:    rateLimitRetryOpts,
+		auth:              auth,
+		fileLogger:        logger,
+		headerXScopeOrgId: headerXScopeOrgId,
 	}
 
 	// Start concurrent request processing
@@ -313,6 +319,10 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 
 // Rate limit and passthrough to prometheus client API
 func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, error) {
+	if rlpc.headerXScopeOrgId != "" {
+		req.Header.Set(HeaderXScopeOrgId, rlpc.headerXScopeOrgId)
+	}
+
 	rlpc.auth.Apply(req)
 
 	respChan := make(chan *workResponse)
@@ -353,6 +363,7 @@ type PrometheusClientConfig struct {
 	Auth                  *ClientAuth
 	QueryConcurrency      int
 	QueryLogFile          string
+	HeaderXScopeOrgId     string
 }
 
 // NewPrometheusClient creates a new rate limited client which limits by outbound concurrent requests.
@@ -387,6 +398,7 @@ func NewPrometheusClient(address string, config *PrometheusClientConfig) (promet
 		nil,
 		config.RateLimitRetryOpts,
 		config.QueryLogFile,
+		config.HeaderXScopeOrgId,
 	)
 }
 

+ 4 - 0
pkg/prom/ratelimitedclient_test.go

@@ -142,6 +142,7 @@ func TestRateLimitedOnceAndSuccess(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {
@@ -183,6 +184,7 @@ func TestRateLimitedOnceAndFail(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {
@@ -229,6 +231,7 @@ func TestRateLimitedResponses(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {
@@ -342,6 +345,7 @@ func TestConcurrentRateLimiting(t *testing.T) {
 		nil,
 		newTestRetryOpts(),
 		"",
+		"",
 	)
 
 	if err != nil {

+ 1 - 0
pkg/thanos/thanos.go

@@ -104,5 +104,6 @@ func NewThanosClient(address string, config *prom.PrometheusClientConfig) (prome
 		maxSourceDecorator,
 		config.RateLimitRetryOpts,
 		config.QueryLogFile,
+		"",
 	)
 }