Procházet zdrojové kódy

Merge pull request #1861 from AndrewChubatiuk/optional-scrape-endpoint

added a separate client for prometheus scrape config endpoint
Cliff Colvin před 2 roky
rodič
revize
6f3ae331e0
4 změnil soubory, kde provedl 158 přidání a 115 odebrání
  1. 40 31
      pkg/cmd/agent/agent.go
  2. 2 2
      pkg/costmodel/allocation.go
  3. 74 61
      pkg/costmodel/router.go
  4. 42 21
      pkg/env/costmodelenv.go

+ 40 - 31
pkg/cmd/agent/agent.go

@@ -65,11 +65,16 @@ func newKubernetesClusterCache() (kubernetes.Interface, clustercache.ClusterCach
 }
 
 func newPrometheusClient() (prometheus.Client, error) {
-	address := env.GetPrometheusServerEndpoint()
-	if address == "" {
+	promAddrs := env.GetPrometheusEndpoints()
+	if promAddrs[env.Server] == "" {
 		return nil, fmt.Errorf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
+	scrapeKey := env.Scrape
+	if _, ok := promAddrs[env.Scrape]; !ok {
+		scrapeKey = env.Server
+	}
+
 	queryConcurrency := env.GetMaxQueryConcurrency()
 	log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
 
@@ -85,44 +90,48 @@ func newPrometheusClient() (prometheus.Client, error) {
 		}
 	}
 
-	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
-		Timeout:               timeout,
-		KeepAlive:             keepAlive,
-		TLSHandshakeTimeout:   tlsHandshakeTimeout,
-		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
-		RateLimitRetryOpts:    rateLimitRetryOpts,
-		Auth: &prom.ClientAuth{
-			Username:    env.GetDBBasicAuthUsername(),
-			Password:    env.GetDBBasicAuthUserPassword(),
-			BearerToken: env.GetDBBearerToken(),
-		},
-		QueryConcurrency: queryConcurrency,
-		QueryLogFile:     "",
-	})
-	if err != nil {
-		return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
-	}
-
-	m, err := prom.Validate(promCli)
-	if err != nil || !m.Running {
+	promClis := map[env.PrometheusType]prometheus.Client{}
+	var err error
+	for clientType, addr := range promAddrs {
+		promClis[clientType], err = prom.NewPrometheusClient(addr, &prom.PrometheusClientConfig{
+			Timeout:               timeout,
+			KeepAlive:             keepAlive,
+			TLSHandshakeTimeout:   tlsHandshakeTimeout,
+			TLSInsecureSkipVerify: env.GetInsecureSkipVerify(env.Server),
+			RateLimitRetryOpts:    rateLimitRetryOpts,
+			Auth: &prom.ClientAuth{
+				Username:    env.GetDBBasicAuthUsername(clientType),
+				Password:    env.GetDBBasicAuthUserPassword(clientType),
+				BearerToken: env.GetDBBearerToken(clientType),
+			},
+			QueryConcurrency: queryConcurrency,
+			QueryLogFile:     "",
+		})
 		if err != nil {
-			log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
-		} else if !m.Running {
-			log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
+			return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
+		}
+
+		m, err := prom.Validate(promClis[clientType])
+		if err != nil || !m.Running {
+			if err != nil {
+				log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", addr, err.Error(), prom.PrometheusTroubleshootingURL)
+			} else if !m.Running {
+				log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", addr, prom.PrometheusTroubleshootingURL)
+			}
+		} else {
+			log.Infof("Success: retrieved the 'up' query against prometheus at: %s", addr)
 		}
-	} else {
-		log.Infof("Success: retrieved the 'up' query against prometheus at: %s", address)
 	}
 
-	api := prometheusAPI.NewAPI(promCli)
+	api := prometheusAPI.NewAPI(promClis[scrapeKey])
 	_, err = api.Config(context.Background())
 	if err != nil {
-		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", promAddrs[scrapeKey], err.Error(), prom.PrometheusTroubleshootingURL)
 	} else {
-		log.Infof("Retrieved a prometheus config file from: %s", address)
+		log.Infof("Retrieved a prometheus config file from: %s", promAddrs[scrapeKey])
 	}
 
-	return promCli, nil
+	return promClis[env.Server], nil
 }
 
 func Execute(opts *AgentOpts) error {

+ 2 - 2
pkg/costmodel/allocation.go

@@ -17,8 +17,8 @@ const (
 	// upstream KSM has implementation change vs OC internal KSM - it sets metric to 0 when pod goes down
 	// VS OC implementation which stops emitting it
 	// by adding != 0 filter, we keep just the active times in the prom result
-	queryFmtPods                        = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
-	queryFmtPodsUID                     = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
+	queryFmtPods    = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
+	queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
 
 	queryFmtRAMBytesAllocated           = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s, provider_id)`
 	queryFmtRAMRequests                 = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`

+ 74 - 61
pkg/costmodel/router.go

@@ -86,6 +86,7 @@ var (
 type Accesses struct {
 	Router                   *httprouter.Router
 	PrometheusClient         prometheus.Client
+	PrometheusScrapeClient   prometheus.Client
 	ThanosClient             prometheus.Client
 	KubeClientSet            kubernetes.Interface
 	ClusterCache             clustercache.ClusterCache
@@ -1132,7 +1133,7 @@ func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ ht
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	pConfig := map[string]string{
-		"address": env.GetPrometheusServerEndpoint(),
+		"address": env.GetPrometheusEndpoints()[env.Scrape],
 	}
 
 	body, err := json.Marshal(pConfig)
@@ -1407,15 +1408,19 @@ func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.P
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	promServer := env.GetPrometheusServerEndpoint()
+	endpoints := env.GetPrometheusEndpoints()
+	scrapeServer, ok := endpoints[env.Scrape]
+	if !ok {
+		scrapeServer = endpoints[env.Server]
+	}
 
-	api := prometheusAPI.NewAPI(a.PrometheusClient)
+	api := prometheusAPI.NewAPI(a.PrometheusScrapeClient)
 	result, err := api.Config(r.Context())
 	if err != nil {
-		fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
+		fmt.Fprintf(w, "Using Prometheus at "+scrapeServer+". Error: "+err.Error())
 	} else {
 
-		fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
+		fmt.Fprintf(w, "Using Prometheus at "+scrapeServer+". PrometheusConfig: "+result.YAML)
 	}
 }
 
@@ -1506,8 +1511,8 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		}
 	}
 
-	address := env.GetPrometheusServerEndpoint()
-	if address == "" {
+	promAddrs := env.GetPrometheusEndpoints()
+	if promAddrs[env.Server] == "" {
 		log.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
@@ -1527,46 +1532,53 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		}
 	}
 
-	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
-		Timeout:               timeout,
-		KeepAlive:             keepAlive,
-		TLSHandshakeTimeout:   tlsHandshakeTimeout,
-		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
-		RateLimitRetryOpts:    rateLimitRetryOpts,
-		Auth: &prom.ClientAuth{
-			Username:    env.GetDBBasicAuthUsername(),
-			Password:    env.GetDBBasicAuthUserPassword(),
-			BearerToken: env.GetDBBearerToken(),
-		},
-		QueryConcurrency:  queryConcurrency,
-		QueryLogFile:      "",
-		HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
-	})
-	if err != nil {
-		log.Fatalf("Failed to create prometheus client, Error: %v", err)
-	}
-
-	m, err := prom.Validate(promCli)
-	if err != nil || !m.Running {
+	promClis := map[env.PrometheusType]prometheus.Client{}
+	for clientName, addr := range promAddrs {
+		promClis[clientName], err = prom.NewPrometheusClient(addr, &prom.PrometheusClientConfig{
+			Timeout:               timeout,
+			KeepAlive:             keepAlive,
+			TLSHandshakeTimeout:   tlsHandshakeTimeout,
+			TLSInsecureSkipVerify: env.GetInsecureSkipVerify(clientName),
+			RateLimitRetryOpts:    rateLimitRetryOpts,
+			Auth: &prom.ClientAuth{
+				Username:    env.GetDBBasicAuthUsername(clientName),
+				Password:    env.GetDBBasicAuthUserPassword(clientName),
+				BearerToken: env.GetDBBearerToken(clientName),
+			},
+			QueryConcurrency:  queryConcurrency,
+			QueryLogFile:      "",
+			HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(clientName),
+		})
 		if err != nil {
-			log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
-		} else if !m.Running {
-			log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
+			log.Fatalf("Failed to create prometheus client, Error: %v", err)
+		}
+		m, err := prom.Validate(promClis[clientName])
+		if err != nil || !m.Running {
+			if err != nil {
+				log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", addr, err.Error(), prom.PrometheusTroubleshootingURL)
+			} else if !m.Running {
+				log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", addr, prom.PrometheusTroubleshootingURL)
+			}
+		} else {
+			log.Infof("Success: retrieved the 'up' query against prometheus at: " + addr)
 		}
-	} else {
-		log.Infof("Success: retrieved the 'up' query against prometheus at: " + address)
 	}
-
-	api := prometheusAPI.NewAPI(promCli)
+	scrapeKey := env.Scrape
+	promScrapeCli, ok := promClis[env.Scrape]
+	if !ok {
+		promScrapeCli = promClis[env.Server]
+		scrapeKey = env.Server
+	}
+	api := prometheusAPI.NewAPI(promScrapeCli)
 	_, err = api.Config(context.Background())
 	if err != nil {
-		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", promAddrs[scrapeKey], err.Error(), prom.PrometheusTroubleshootingURL)
 	} else {
-		log.Infof("Retrieved a prometheus config file from: %s", address)
+		log.Infof("Retrieved a prometheus config file from: %s", promAddrs[scrapeKey])
 	}
 
 	// Lookup scrape interval for kubecost job, update if found
-	si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
+	si, err := prom.ScrapeIntervalFor(promClis[env.Server], env.GetKubecostJobName())
 	if err == nil {
 		scrapeInterval = si
 	}
@@ -1647,7 +1659,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 				Timeout:               timeout,
 				KeepAlive:             keepAlive,
 				TLSHandshakeTimeout:   tlsHandshakeTimeout,
-				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(env.Server),
 				RateLimitRetryOpts:    rateLimitRetryOpts,
 				Auth: &prom.ClientAuth{
 					Username:    env.GetMultiClusterBasicAuthUsername(),
@@ -1687,7 +1699,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	if thanosClient != nil {
 		clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
 	} else {
-		clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
+		clusterMap = clusters.NewClusterMap(promClis[env.Server], clusterInfoProvider, 5*time.Minute)
 	}
 
 	// cache responses from model and aggregation for a default of 10 minutes;
@@ -1713,31 +1725,32 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	if thanosClient != nil {
 		pc = thanosClient
 	} else {
-		pc = promCli
+		pc = promClis[env.Server]
 	}
 	costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
-	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
+	metricsEmitter := NewCostModelMetricsEmitter(promClis[env.Server], k8sCache, cloudProvider, clusterInfoProvider, costModel)
 
 	a := &Accesses{
-		Router:                httprouter.New(),
-		PrometheusClient:      promCli,
-		ThanosClient:          thanosClient,
-		KubeClientSet:         kubeClientset,
-		ClusterCache:          k8sCache,
-		ClusterMap:            clusterMap,
-		CloudProvider:         cloudProvider,
-		CloudConfigController: cloudconfig.NewController(cloudProvider),
-		ConfigFileManager:     confManager,
-		ClusterInfoProvider:   clusterInfoProvider,
-		Model:                 costModel,
-		MetricsEmitter:        metricsEmitter,
-		AggregateCache:        aggregateCache,
-		CostDataCache:         costDataCache,
-		ClusterCostsCache:     clusterCostsCache,
-		OutOfClusterCache:     outOfClusterCache,
-		SettingsCache:         settingsCache,
-		CacheExpiration:       cacheExpiration,
-		httpServices:          services.NewCostModelServices(),
+		Router:                 httprouter.New(),
+		PrometheusClient:       promClis[env.Server],
+		PrometheusScrapeClient: promClis[scrapeKey],
+		ThanosClient:           thanosClient,
+		KubeClientSet:          kubeClientset,
+		ClusterCache:           k8sCache,
+		ClusterMap:             clusterMap,
+		CloudProvider:          cloudProvider,
+		CloudConfigController:  cloudconfig.NewController(cloudProvider),
+		ConfigFileManager:      confManager,
+		ClusterInfoProvider:    clusterInfoProvider,
+		Model:                  costModel,
+		MetricsEmitter:         metricsEmitter,
+		AggregateCache:         aggregateCache,
+		CostDataCache:          costDataCache,
+		ClusterCostsCache:      clusterCostsCache,
+		OutOfClusterCache:      outOfClusterCache,
+		SettingsCache:          settingsCache,
+		CacheExpiration:        cacheExpiration,
+		httpServices:           services.NewCostModelServices(),
 	}
 
 	// Use the Accesses instance, itself, as the CostModelAggregator. This is

+ 42 - 21
pkg/env/costmodelenv.go

@@ -26,7 +26,7 @@ const (
 	PodNameEnvVar                  = "POD_NAME"
 	ClusterIDEnvVar                = "CLUSTER_ID"
 	ClusterProfileEnvVar           = "CLUSTER_PROFILE"
-	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
+	PrometheusServerEndpointEnvVar = "%sPROMETHEUS_SERVER_ENDPOINT"
 	MaxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
 	QueryLoggingFileEnvVar         = "QUERY_LOGGING_FILE"
 	RemoteEnabledEnvVar            = "REMOTE_WRITE_ENABLED"
@@ -56,15 +56,15 @@ const (
 	ErrorReportingEnabledEnvVar   = "ERROR_REPORTING_ENABLED"
 	ValuesReportingEnabledEnvVar  = "VALUES_REPORTING_ENABLED"
 
-	DBBasicAuthUsername = "DB_BASIC_AUTH_USERNAME"
-	DBBasicAuthPassword = "DB_BASIC_AUTH_PW"
-	DBBearerToken       = "DB_BEARER_TOKEN"
+	DBBasicAuthUsername = "%sDB_BASIC_AUTH_USERNAME"
+	DBBasicAuthPassword = "%sDB_BASIC_AUTH_PW"
+	DBBearerToken       = "%sDB_BEARER_TOKEN"
 
 	MultiClusterBasicAuthUsername = "MC_BASIC_AUTH_USERNAME"
 	MultiClusterBasicAuthPassword = "MC_BASIC_AUTH_PW"
 	MultiClusterBearerToken       = "MC_BEARER_TOKEN"
 
-	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
+	InsecureSkipVerify = "%sINSECURE_SKIP_VERIFY"
 
 	KubeConfigPathEnvVar = "KUBECONFIG_PATH"
 
@@ -92,7 +92,7 @@ const (
 	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
 	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 
-	PrometheusHeaderXScopeOrgIdEnvVar = "PROMETHEUS_HEADER_X_SCOPE_ORGID"
+	PrometheusHeaderXScopeOrgIdEnv = "%sPROMETHEUS_HEADER_X_SCOPE_ORGID"
 
 	IngestPodUIDEnvVar = "INGEST_POD_UID"
 
@@ -117,6 +117,21 @@ const (
 	CloudCostRunWindowDaysEnvVar    = "CLOUD_COST_RUN_WINDOW_DAYS"
 )
 
+type PrometheusType int64
+
+const (
+	Server PrometheusType = iota
+	Scrape
+)
+
+func (pt PrometheusType) envFormat(envStr string) string {
+	value := ""
+	if pt == Scrape {
+		value = "SCRAPE_"
+	}
+	return fmt.Sprintf(envStr, value)
+}
+
 const DefaultConfigMountPath = "/var/configs"
 
 var offsetRegex = regexp.MustCompile(`^(\+|-)(\d\d):(\d\d)$`)
@@ -182,8 +197,8 @@ func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
 // "PROMETHEUS_HEADER_X_SCOPE_ORGID": "my-cluster-name"
 // Then set Prometheus URL to prometheus API endpoint:
 // "PROMETHEUS_SERVER_ENDPOINT": "http://mimir-url/prometheus/"
-func GetPrometheusHeaderXScopeOrgId() string {
-	return Get(PrometheusHeaderXScopeOrgIdEnvVar, "")
+func GetPrometheusHeaderXScopeOrgId(clientType PrometheusType) string {
+	return Get(clientType.envFormat(PrometheusHeaderXScopeOrgIdEnv), "")
 }
 
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
@@ -325,14 +340,21 @@ func GetPromClusterFilter() string {
 	return ""
 }
 
-// GetPrometheusServerEndpoint returns the environment variable value for PrometheusServerEndpointEnvVar which
-// represents the prometheus server endpoint used to execute prometheus queries.
-func GetPrometheusServerEndpoint() string {
-	return Get(PrometheusServerEndpointEnvVar, "")
+// GetPrometheusEndpoints returns the environment variables values for
+// PrometheusServerEndpointEnvVar which represents the prometheus server endpoint used to execute prometheus queries and
+// SCRAPE_PrometheusServerEndpointEnvVar which represents prometheus scrape server endpoint used to get scrape config.
+func GetPrometheusEndpoints() map[PrometheusType]string {
+	output := map[PrometheusType]string{
+		Server: Get(Server.envFormat(PrometheusServerEndpointEnvVar), ""),
+	}
+	if value := Get(Scrape.envFormat(PrometheusServerEndpointEnvVar), ""); value != "" {
+		output[Scrape] = value
+	}
+	return output
 }
 
-func GetInsecureSkipVerify() bool {
-	return GetBool(InsecureSkipVerify, false)
+func GetInsecureSkipVerify(clientType PrometheusType) bool {
+	return GetBool(clientType.envFormat(InsecureSkipVerify), false)
 }
 
 // IsAggregateCostModelCacheDisabled returns the environment variable value for DisableAggregateCostModelCache which
@@ -469,17 +491,16 @@ func GetQueryLoggingFile() string {
 	return Get(QueryLoggingFileEnvVar, "")
 }
 
-func GetDBBasicAuthUsername() string {
-	return Get(DBBasicAuthUsername, "")
+func GetDBBasicAuthUsername(clientType PrometheusType) string {
+	return Get(clientType.envFormat(DBBasicAuthUsername), "")
 }
 
-func GetDBBasicAuthUserPassword() string {
-	return Get(DBBasicAuthPassword, "")
-
+func GetDBBasicAuthUserPassword(clientType PrometheusType) string {
+	return Get(clientType.envFormat(DBBasicAuthPassword), "")
 }
 
-func GetDBBearerToken() string {
-	return Get(DBBearerToken, "")
+func GetDBBearerToken(clientType PrometheusType) string {
+	return Get(clientType.envFormat(DBBearerToken), "")
 }
 
 // GetMultiClusterBasicAuthUsername returns the environment variable value for MultiClusterBasicAuthUsername