2
0
Эх сурвалжийг харах

added a separate client for prometheus scrape config endpoint

Signed-off-by: Andrew Chubatiuk <andrew.chubatiuk@motional.com>
Andrew Chubatiuk 2 жил өмнө
parent
commit
14ec7221fa

+ 40 - 31
pkg/cmd/agent/agent.go

@@ -65,11 +65,16 @@ func newKubernetesClusterCache() (kubernetes.Interface, clustercache.ClusterCach
 }
 
 func newPrometheusClient() (prometheus.Client, error) {
-	address := env.GetPrometheusServerEndpoint()
-	if address == "" {
+	promAddrs := env.GetPrometheusEndpoints()
+	if promAddrs[env.Server] == "" {
 		return nil, fmt.Errorf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
+	scrapeKey := env.Scrape
+	if _, ok := promAddrs[env.Scrape]; !ok {
+		scrapeKey = env.Server
+	}
+
 	queryConcurrency := env.GetMaxQueryConcurrency()
 	log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
 
@@ -85,44 +90,48 @@ func newPrometheusClient() (prometheus.Client, error) {
 		}
 	}
 
-	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
-		Timeout:               timeout,
-		KeepAlive:             keepAlive,
-		TLSHandshakeTimeout:   tlsHandshakeTimeout,
-		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
-		RateLimitRetryOpts:    rateLimitRetryOpts,
-		Auth: &prom.ClientAuth{
-			Username:    env.GetDBBasicAuthUsername(),
-			Password:    env.GetDBBasicAuthUserPassword(),
-			BearerToken: env.GetDBBearerToken(),
-		},
-		QueryConcurrency: queryConcurrency,
-		QueryLogFile:     "",
-	})
-	if err != nil {
-		return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
-	}
-
-	m, err := prom.Validate(promCli)
-	if err != nil || !m.Running {
+	promClis := map[env.PrometheusType]prometheus.Client{}
+	var err error
+	for clientType, addr := range promAddrs {
+		promClis[clientType], err = prom.NewPrometheusClient(addr, &prom.PrometheusClientConfig{
+			Timeout:               timeout,
+			KeepAlive:             keepAlive,
+			TLSHandshakeTimeout:   tlsHandshakeTimeout,
+			TLSInsecureSkipVerify: env.GetInsecureSkipVerify(env.Server),
+			RateLimitRetryOpts:    rateLimitRetryOpts,
+			Auth: &prom.ClientAuth{
+				Username:    env.GetDBBasicAuthUsername(clientType),
+				Password:    env.GetDBBasicAuthUserPassword(clientType),
+				BearerToken: env.GetDBBearerToken(clientType),
+			},
+			QueryConcurrency: queryConcurrency,
+			QueryLogFile:     "",
+		})
 		if err != nil {
-			log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
-		} else if !m.Running {
-			log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
+			return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
+		}
+
+		m, err := prom.Validate(promClis[clientType])
+		if err != nil || !m.Running {
+			if err != nil {
+				log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", addr, err.Error(), prom.PrometheusTroubleshootingURL)
+			} else if !m.Running {
+				log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", addr, prom.PrometheusTroubleshootingURL)
+			}
+		} else {
+			log.Infof("Success: retrieved the 'up' query against prometheus at: %s", addr)
 		}
-	} else {
-		log.Infof("Success: retrieved the 'up' query against prometheus at: %s", address)
 	}
 
-	api := prometheusAPI.NewAPI(promCli)
+	api := prometheusAPI.NewAPI(promClis[scrapeKey])
 	_, err = api.Config(context.Background())
 	if err != nil {
-		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", promAddrs[scrapeKey], err.Error(), prom.PrometheusTroubleshootingURL)
 	} else {
-		log.Infof("Retrieved a prometheus config file from: %s", address)
+		log.Infof("Retrieved a prometheus config file from: %s", promAddrs[scrapeKey])
 	}
 
-	return promCli, nil
+	return promClis[env.Server], nil
 }
 
 func Execute(opts *AgentOpts) error {

+ 90 - 77
pkg/costmodel/router.go

@@ -82,23 +82,24 @@ var (
 // Accesses defines a singleton application instance, providing access to
 // Prometheus, Kubernetes, the cloud provider, and caches.
 type Accesses struct {
-	Router              *httprouter.Router
-	PrometheusClient    prometheus.Client
-	ThanosClient        prometheus.Client
-	KubeClientSet       kubernetes.Interface
-	ClusterCache        clustercache.ClusterCache
-	ClusterMap          clusters.ClusterMap
-	CloudProvider       models.Provider
-	ConfigFileManager   *config.ConfigFileManager
-	ClusterInfoProvider clusters.ClusterInfoProvider
-	Model               *CostModel
-	MetricsEmitter      *CostModelMetricsEmitter
-	OutOfClusterCache   *cache.Cache
-	AggregateCache      *cache.Cache
-	CostDataCache       *cache.Cache
-	ClusterCostsCache   *cache.Cache
-	CacheExpiration     map[time.Duration]time.Duration
-	AggAPI              Aggregator
+	Router                 *httprouter.Router
+	PrometheusClient       prometheus.Client
+	PrometheusScrapeClient prometheus.Client
+	ThanosClient           prometheus.Client
+	KubeClientSet          kubernetes.Interface
+	ClusterCache           clustercache.ClusterCache
+	ClusterMap             clusters.ClusterMap
+	CloudProvider          models.Provider
+	ConfigFileManager      *config.ConfigFileManager
+	ClusterInfoProvider    clusters.ClusterInfoProvider
+	Model                  *CostModel
+	MetricsEmitter         *CostModelMetricsEmitter
+	OutOfClusterCache      *cache.Cache
+	AggregateCache         *cache.Cache
+	CostDataCache          *cache.Cache
+	ClusterCostsCache      *cache.Cache
+	CacheExpiration        map[time.Duration]time.Duration
+	AggAPI                 Aggregator
 	// SettingsCache stores current state of app settings
 	SettingsCache *cache.Cache
 	// settingsSubscribers tracks channels through which changes to different
@@ -1127,7 +1128,7 @@ func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ ht
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	pConfig := map[string]string{
-		"address": env.GetPrometheusServerEndpoint(),
+		"address": env.GetPrometheusEndpoints()[env.Scrape],
 	}
 
 	body, err := json.Marshal(pConfig)
@@ -1402,15 +1403,19 @@ func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.P
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	promServer := env.GetPrometheusServerEndpoint()
+	endpoints := env.GetPrometheusEndpoints()
+	scrapeServer, ok := endpoints[env.Scrape]
+	if !ok {
+		scrapeServer = endpoints[env.Server]
+	}
 
-	api := prometheusAPI.NewAPI(a.PrometheusClient)
+	api := prometheusAPI.NewAPI(a.PrometheusScrapeClient)
 	result, err := api.Config(r.Context())
 	if err != nil {
-		fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
+		fmt.Fprintf(w, "Using Prometheus at "+scrapeServer+". Error: "+err.Error())
 	} else {
 
-		fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
+		fmt.Fprintf(w, "Using Prometheus at "+scrapeServer+". PrometheusConfig: "+result.YAML)
 	}
 }
 
@@ -1501,8 +1506,8 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		}
 	}
 
-	address := env.GetPrometheusServerEndpoint()
-	if address == "" {
+	promAddrs := env.GetPrometheusEndpoints()
+	if promAddrs[env.Server] == "" {
 		log.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
@@ -1522,46 +1527,53 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		}
 	}
 
-	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
-		Timeout:               timeout,
-		KeepAlive:             keepAlive,
-		TLSHandshakeTimeout:   tlsHandshakeTimeout,
-		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
-		RateLimitRetryOpts:    rateLimitRetryOpts,
-		Auth: &prom.ClientAuth{
-			Username:    env.GetDBBasicAuthUsername(),
-			Password:    env.GetDBBasicAuthUserPassword(),
-			BearerToken: env.GetDBBearerToken(),
-		},
-		QueryConcurrency:  queryConcurrency,
-		QueryLogFile:      "",
-		HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
-	})
-	if err != nil {
-		log.Fatalf("Failed to create prometheus client, Error: %v", err)
-	}
-
-	m, err := prom.Validate(promCli)
-	if err != nil || !m.Running {
+	promClis := map[env.PrometheusType]prometheus.Client{}
+	for clientName, addr := range promAddrs {
+		promClis[clientName], err = prom.NewPrometheusClient(addr, &prom.PrometheusClientConfig{
+			Timeout:               timeout,
+			KeepAlive:             keepAlive,
+			TLSHandshakeTimeout:   tlsHandshakeTimeout,
+			TLSInsecureSkipVerify: env.GetInsecureSkipVerify(clientName),
+			RateLimitRetryOpts:    rateLimitRetryOpts,
+			Auth: &prom.ClientAuth{
+				Username:    env.GetDBBasicAuthUsername(clientName),
+				Password:    env.GetDBBasicAuthUserPassword(clientName),
+				BearerToken: env.GetDBBearerToken(clientName),
+			},
+			QueryConcurrency:  queryConcurrency,
+			QueryLogFile:      "",
+			HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(clientName),
+		})
 		if err != nil {
-			log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
-		} else if !m.Running {
-			log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
+			log.Fatalf("Failed to create prometheus client, Error: %v", err)
+		}
+		m, err := prom.Validate(promClis[clientName])
+		if err != nil || !m.Running {
+			if err != nil {
+				log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", addr, err.Error(), prom.PrometheusTroubleshootingURL)
+			} else if !m.Running {
+				log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", addr, prom.PrometheusTroubleshootingURL)
+			}
+		} else {
+			log.Infof("Success: retrieved the 'up' query against prometheus at: " + addr)
 		}
-	} else {
-		log.Infof("Success: retrieved the 'up' query against prometheus at: " + address)
 	}
-
-	api := prometheusAPI.NewAPI(promCli)
+	scrapeKey := env.Scrape
+	promScrapeCli, ok := promClis[env.Scrape]
+	if !ok {
+		promScrapeCli = promClis[env.Server]
+		scrapeKey = env.Server
+	}
+	api := prometheusAPI.NewAPI(promScrapeCli)
 	_, err = api.Config(context.Background())
 	if err != nil {
-		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", promAddrs[scrapeKey], err.Error(), prom.PrometheusTroubleshootingURL)
 	} else {
-		log.Infof("Retrieved a prometheus config file from: %s", address)
+		log.Infof("Retrieved a prometheus config file from: %s", promAddrs[scrapeKey])
 	}
 
 	// Lookup scrape interval for kubecost job, update if found
-	si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
+	si, err := prom.ScrapeIntervalFor(promClis[env.Server], env.GetKubecostJobName())
 	if err == nil {
 		scrapeInterval = si
 	}
@@ -1642,7 +1654,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 				Timeout:               timeout,
 				KeepAlive:             keepAlive,
 				TLSHandshakeTimeout:   tlsHandshakeTimeout,
-				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(env.Server),
 				RateLimitRetryOpts:    rateLimitRetryOpts,
 				Auth: &prom.ClientAuth{
 					Username:    env.GetMultiClusterBasicAuthUsername(),
@@ -1682,7 +1694,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	if thanosClient != nil {
 		clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
 	} else {
-		clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
+		clusterMap = clusters.NewClusterMap(promClis[env.Server], clusterInfoProvider, 5*time.Minute)
 	}
 
 	// cache responses from model and aggregation for a default of 10 minutes;
@@ -1708,30 +1720,31 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	if thanosClient != nil {
 		pc = thanosClient
 	} else {
-		pc = promCli
+		pc = promClis[env.Server]
 	}
 	costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
-	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
+	metricsEmitter := NewCostModelMetricsEmitter(promClis[env.Server], k8sCache, cloudProvider, clusterInfoProvider, costModel)
 
 	a := &Accesses{
-		Router:              httprouter.New(),
-		PrometheusClient:    promCli,
-		ThanosClient:        thanosClient,
-		KubeClientSet:       kubeClientset,
-		ClusterCache:        k8sCache,
-		ClusterMap:          clusterMap,
-		CloudProvider:       cloudProvider,
-		ConfigFileManager:   confManager,
-		ClusterInfoProvider: clusterInfoProvider,
-		Model:               costModel,
-		MetricsEmitter:      metricsEmitter,
-		AggregateCache:      aggregateCache,
-		CostDataCache:       costDataCache,
-		ClusterCostsCache:   clusterCostsCache,
-		OutOfClusterCache:   outOfClusterCache,
-		SettingsCache:       settingsCache,
-		CacheExpiration:     cacheExpiration,
-		httpServices:        services.NewCostModelServices(),
+		Router:                 httprouter.New(),
+		PrometheusClient:       promClis[env.Server],
+		PrometheusScrapeClient: promClis[scrapeKey],
+		ThanosClient:           thanosClient,
+		KubeClientSet:          kubeClientset,
+		ClusterCache:           k8sCache,
+		ClusterMap:             clusterMap,
+		CloudProvider:          cloudProvider,
+		ConfigFileManager:      confManager,
+		ClusterInfoProvider:    clusterInfoProvider,
+		Model:                  costModel,
+		MetricsEmitter:         metricsEmitter,
+		AggregateCache:         aggregateCache,
+		CostDataCache:          costDataCache,
+		ClusterCostsCache:      clusterCostsCache,
+		OutOfClusterCache:      outOfClusterCache,
+		SettingsCache:          settingsCache,
+		CacheExpiration:        cacheExpiration,
+		httpServices:           services.NewCostModelServices(),
 	}
 	// Use the Accesses instance, itself, as the CostModelAggregator. This is
 	// confusing and unconventional, but necessary so that we can swap it

+ 42 - 21
pkg/env/costmodelenv.go

@@ -26,7 +26,7 @@ const (
 	PodNameEnvVar                  = "POD_NAME"
 	ClusterIDEnvVar                = "CLUSTER_ID"
 	ClusterProfileEnvVar           = "CLUSTER_PROFILE"
-	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
+	PrometheusServerEndpointEnvVar = "%sPROMETHEUS_SERVER_ENDPOINT"
 	MaxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
 	QueryLoggingFileEnvVar         = "QUERY_LOGGING_FILE"
 	RemoteEnabledEnvVar            = "REMOTE_WRITE_ENABLED"
@@ -56,15 +56,15 @@ const (
 	ErrorReportingEnabledEnvVar   = "ERROR_REPORTING_ENABLED"
 	ValuesReportingEnabledEnvVar  = "VALUES_REPORTING_ENABLED"
 
-	DBBasicAuthUsername = "DB_BASIC_AUTH_USERNAME"
-	DBBasicAuthPassword = "DB_BASIC_AUTH_PW"
-	DBBearerToken       = "DB_BEARER_TOKEN"
+	DBBasicAuthUsername = "%sDB_BASIC_AUTH_USERNAME"
+	DBBasicAuthPassword = "%sDB_BASIC_AUTH_PW"
+	DBBearerToken       = "%sDB_BEARER_TOKEN"
 
 	MultiClusterBasicAuthUsername = "MC_BASIC_AUTH_USERNAME"
 	MultiClusterBasicAuthPassword = "MC_BASIC_AUTH_PW"
 	MultiClusterBearerToken       = "MC_BEARER_TOKEN"
 
-	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
+	InsecureSkipVerify = "%sINSECURE_SKIP_VERIFY"
 
 	KubeConfigPathEnvVar = "KUBECONFIG_PATH"
 
@@ -92,7 +92,7 @@ const (
 	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
 	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 
-	PrometheusHeaderXScopeOrgIdEnvVar = "PROMETHEUS_HEADER_X_SCOPE_ORGID"
+	PrometheusHeaderXScopeOrgIdEnv = "%sPROMETHEUS_HEADER_X_SCOPE_ORGID"
 
 	IngestPodUIDEnvVar = "INGEST_POD_UID"
 
@@ -109,6 +109,21 @@ const (
 	ExportCSVMaxDays    = "EXPORT_CSV_MAX_DAYS"
 )
 
+type PrometheusType int64
+
+const (
+	Server PrometheusType = iota
+	Scrape
+)
+
+func (pt PrometheusType) envFormat(envStr string) string {
+	value := ""
+	if pt == Scrape {
+		value = "SCRAPE_"
+	}
+	return fmt.Sprintf(envStr, value)
+}
+
 const DefaultConfigMountPath = "/var/configs"
 
 var offsetRegex = regexp.MustCompile(`^(\+|-)(\d\d):(\d\d)$`)
@@ -174,8 +189,8 @@ func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
 // "PROMETHEUS_HEADER_X_SCOPE_ORGID": "my-cluster-name"
 // Then set Prometheus URL to prometheus API endpoint:
 // "PROMETHEUS_SERVER_ENDPOINT": "http://mimir-url/prometheus/"
-func GetPrometheusHeaderXScopeOrgId() string {
-	return Get(PrometheusHeaderXScopeOrgIdEnvVar, "")
+func GetPrometheusHeaderXScopeOrgId(clientType PrometheusType) string {
+	return Get(clientType.envFormat(PrometheusHeaderXScopeOrgIdEnv), "")
 }
 
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
@@ -317,14 +332,21 @@ func GetPromClusterFilter() string {
 	return ""
 }
 
-// GetPrometheusServerEndpoint returns the environment variable value for PrometheusServerEndpointEnvVar which
-// represents the prometheus server endpoint used to execute prometheus queries.
-func GetPrometheusServerEndpoint() string {
-	return Get(PrometheusServerEndpointEnvVar, "")
+// GetPrometheusEndpoints returns the environment variables values for
+// PrometheusServerEndpointEnvVar which represents the prometheus server endpoint used to execute prometheus queries and
+// SCRAPE_PrometheusServerEndpointEnvVar which represents prometheus scrape server endpoint used to get scrape config.
+func GetPrometheusEndpoints() map[PrometheusType]string {
+	output := map[PrometheusType]string{
+		Server: Get(Server.envFormat(PrometheusServerEndpointEnvVar), ""),
+	}
+	if value := Get(Scrape.envFormat(PrometheusServerEndpointEnvVar), ""); value != "" {
+		output[Scrape] = value
+	}
+	return output
 }
 
-func GetInsecureSkipVerify() bool {
-	return GetBool(InsecureSkipVerify, false)
+func GetInsecureSkipVerify(clientType PrometheusType) bool {
+	return GetBool(clientType.envFormat(InsecureSkipVerify), false)
 }
 
 // IsAggregateCostModelCacheDisabled returns the environment variable value for DisableAggregateCostModelCache which
@@ -461,17 +483,16 @@ func GetQueryLoggingFile() string {
 	return Get(QueryLoggingFileEnvVar, "")
 }
 
-func GetDBBasicAuthUsername() string {
-	return Get(DBBasicAuthUsername, "")
+func GetDBBasicAuthUsername(clientType PrometheusType) string {
+	return Get(clientType.envFormat(DBBasicAuthUsername), "")
 }
 
-func GetDBBasicAuthUserPassword() string {
-	return Get(DBBasicAuthPassword, "")
-
+func GetDBBasicAuthUserPassword(clientType PrometheusType) string {
+	return Get(clientType.envFormat(DBBasicAuthPassword), "")
 }
 
-func GetDBBearerToken() string {
-	return Get(DBBearerToken, "")
+func GetDBBearerToken(clientType PrometheusType) string {
+	return Get(clientType.envFormat(DBBearerToken), "")
 }
 
 // GetMultiClusterBasicAuthUsername returns the environment variable value for MultiClusterBasicAuthUsername