ソースを参照

Merge pull request #592 from kubecost/bolt/mthanos-max-source-res

Max Source Res
Matt Bolt 5 年 前
コミット
838305b2b9
3 ファイル変更56 行追加6 行削除
  1. 23 3
      pkg/env/costmodelenv.go
  2. 17 2
      pkg/prom/prom.go
  3. 16 1
      pkg/thanos/thanos.go

+ 23 - 3
pkg/env/costmodelenv.go

@@ -22,9 +22,10 @@ const (
 	ConfigPathEnvVar               = "CONFIG_PATH"
 	ConfigPathEnvVar               = "CONFIG_PATH"
 	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
 	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
 
 
-	ThanosEnabledEnvVar  = "THANOS_ENABLED"
-	ThanosQueryUrlEnvVar = "THANOS_QUERY_URL"
-	ThanosOffsetEnvVar   = "THANOS_QUERY_OFFSET"
+	ThanosEnabledEnvVar      = "THANOS_ENABLED"
+	ThanosQueryUrlEnvVar     = "THANOS_QUERY_URL"
+	ThanosOffsetEnvVar       = "THANOS_QUERY_OFFSET"
+	ThanosMaxSourceResEnvVar = "THANOS_MAX_SOURCE_RESOLUTION"
 
 
 	LogCollectionEnabledEnvVar    = "LOG_COLLECTION_ENABLED"
 	LogCollectionEnabledEnvVar    = "LOG_COLLECTION_ENABLED"
 	ProductAnalyticsEnabledEnvVar = "PRODUCT_ANALYTICS_ENABLED"
 	ProductAnalyticsEnabledEnvVar = "PRODUCT_ANALYTICS_ENABLED"
@@ -168,6 +169,25 @@ func GetThanosOffset() string {
 	return Get(ThanosOffsetEnvVar, "3h")
 	return Get(ThanosOffsetEnvVar, "3h")
 }
 }
 
 
+// GetThanosMaxSourceResolution returns the environment variable value for ThanosMaxSourceResEnvVar which represents
+// the max source resolution to use when querying thanos.
+func GetThanosMaxSourceResolution() string {
+	res := Get(ThanosMaxSourceResEnvVar, "raw")
+
+	switch res {
+	case "raw":
+		return "0s"
+	case "0s":
+		fallthrough
+	case "5m":
+		fallthrough
+	case "1h":
+		return res
+	default:
+		return "0s"
+	}
+}
+
 // IsLogCollectionEnabled returns the environment variable value for LogCollectionEnabledEnvVar which represents
 // IsLogCollectionEnabled returns the environment variable value for LogCollectionEnabledEnvVar which represents
 // whether or not log collection has been enabled for kubecost deployments.
 // whether or not log collection has been enabled for kubecost deployments.
 func IsLogCollectionEnabled() bool {
 func IsLogCollectionEnabled() bool {

+ 17 - 2
pkg/prom/prom.go

@@ -18,6 +18,14 @@ import (
 	prometheus "github.com/prometheus/client_golang/api"
 	prometheus "github.com/prometheus/client_golang/api"
 )
 )
 
 
+//--------------------------------------------------------------------------
+//  QueryParamsDecorator
+//--------------------------------------------------------------------------
+
+// QueryParamsDecorator is used to decorate and return query parameters for
+// outgoing requests
+type QueryParamsDecorator = func(path string, values url.Values) url.Values
+
 //--------------------------------------------------------------------------
 //--------------------------------------------------------------------------
 //  ClientAuth
 //  ClientAuth
 //--------------------------------------------------------------------------
 //--------------------------------------------------------------------------
@@ -56,6 +64,7 @@ type RateLimitedPrometheusClient struct {
 	client     prometheus.Client
 	client     prometheus.Client
 	auth       *ClientAuth
 	auth       *ClientAuth
 	queue      util.BlockingQueue
 	queue      util.BlockingQueue
+	decorator  QueryParamsDecorator
 	outbound   *util.AtomicInt32
 	outbound   *util.AtomicInt32
 	fileLogger *golog.Logger
 	fileLogger *golog.Logger
 }
 }
@@ -69,7 +78,7 @@ type requestCounter interface {
 
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
 // prometheus requests.
-func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, queryLogFile string) (prometheus.Client, error) {
+func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, decorator QueryParamsDecorator, queryLogFile string) (prometheus.Client, error) {
 	c, err := prometheus.NewClient(config)
 	c, err := prometheus.NewClient(config)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -97,6 +106,7 @@ func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency in
 		id:         id,
 		id:         id,
 		client:     c,
 		client:     c,
 		queue:      queue,
 		queue:      queue,
+		decorator:  decorator,
 		outbound:   outbound,
 		outbound:   outbound,
 		auth:       auth,
 		auth:       auth,
 		fileLogger: logger,
 		fileLogger: logger,
@@ -167,6 +177,11 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 			ctx := we.ctx
 			ctx := we.ctx
 			req := we.req
 			req := we.req
 
 
+			// decorate the raw query parameters
+			if rlpc.decorator != nil {
+				req.URL.RawQuery = rlpc.decorator(req.URL.Path, req.URL.Query()).Encode()
+			}
+
 			// measure time in queue
 			// measure time in queue
 			timeInQueue := time.Since(we.start)
 			timeInQueue := time.Since(we.start)
 
 
@@ -238,7 +253,7 @@ func NewPrometheusClient(address string, timeout, keepAlive time.Duration, query
 		BearerToken: env.GetDBBearerToken(),
 		BearerToken: env.GetDBBearerToken(),
 	}
 	}
 
 
-	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, queryLogFile)
+	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, nil, queryLogFile)
 }
 }
 
 
 // LogPrometheusClientState logs the current state, with respect to outbound requests, if that
 // LogPrometheusClientState logs the current state, with respect to outbound requests, if that

+ 16 - 1
pkg/thanos/thanos.go

@@ -5,6 +5,8 @@ import (
 	"fmt"
 	"fmt"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
+	"net/url"
+	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
@@ -14,11 +16,16 @@ import (
 	prometheus "github.com/prometheus/client_golang/api"
 	prometheus "github.com/prometheus/client_golang/api"
 )
 )
 
 
+// MaxSourceResulution is the query parameter key used to designate the resolution
+// to use when executing a query.
+const MaxSourceResulution = "max_source_resolution"
+
 var (
 var (
 	lock           = new(sync.Mutex)
 	lock           = new(sync.Mutex)
 	enabled        = env.IsThanosEnabled()
 	enabled        = env.IsThanosEnabled()
 	queryUrl       = env.GetThanosQueryUrl()
 	queryUrl       = env.GetThanosQueryUrl()
 	offset         = env.GetThanosOffset()
 	offset         = env.GetThanosOffset()
+	maxSourceRes   = env.GetThanosMaxSourceResolution()
 	offsetDuration *time.Duration
 	offsetDuration *time.Duration
 	queryOffset    = fmt.Sprintf(" offset %s", offset)
 	queryOffset    = fmt.Sprintf(" offset %s", offset)
 )
 )
@@ -82,5 +89,13 @@ func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConc
 		BearerToken: env.GetMultiClusterBearerToken(),
 		BearerToken: env.GetMultiClusterBearerToken(),
 	}
 	}
 
 
-	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, queryLogFile)
+	// max source resolution decorator
+	maxSourceDecorator := func(path string, queryParams url.Values) url.Values {
+		if strings.Contains(path, "query") {
+			queryParams.Set(MaxSourceResulution, maxSourceRes)
+		}
+		return queryParams
+	}
+
+	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, maxSourceDecorator, queryLogFile)
 }
 }