2
0
Эх сурвалжийг харах

Revert "added a separate client for prometheus scrape config endpoint"

Cliff Colvin 2 жил өмнө
parent
commit
f8ec2f7380

+ 31 - 40
pkg/cmd/agent/agent.go

@@ -65,16 +65,11 @@ func newKubernetesClusterCache() (kubernetes.Interface, clustercache.ClusterCach
 }
 
 func newPrometheusClient() (prometheus.Client, error) {
-	promAddrs := env.GetPrometheusEndpoints()
-	if promAddrs[env.Server] == "" {
+	address := env.GetPrometheusServerEndpoint()
+	if address == "" {
 		return nil, fmt.Errorf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
-	scrapeKey := env.Scrape
-	if _, ok := promAddrs[env.Scrape]; !ok {
-		scrapeKey = env.Server
-	}
-
 	queryConcurrency := env.GetMaxQueryConcurrency()
 	log.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
 
@@ -90,48 +85,44 @@ func newPrometheusClient() (prometheus.Client, error) {
 		}
 	}
 
-	promClis := map[env.PrometheusType]prometheus.Client{}
-	var err error
-	for clientType, addr := range promAddrs {
-		promClis[clientType], err = prom.NewPrometheusClient(addr, &prom.PrometheusClientConfig{
-			Timeout:               timeout,
-			KeepAlive:             keepAlive,
-			TLSHandshakeTimeout:   tlsHandshakeTimeout,
-			TLSInsecureSkipVerify: env.GetInsecureSkipVerify(env.Server),
-			RateLimitRetryOpts:    rateLimitRetryOpts,
-			Auth: &prom.ClientAuth{
-				Username:    env.GetDBBasicAuthUsername(clientType),
-				Password:    env.GetDBBasicAuthUserPassword(clientType),
-				BearerToken: env.GetDBBearerToken(clientType),
-			},
-			QueryConcurrency: queryConcurrency,
-			QueryLogFile:     "",
-		})
-		if err != nil {
-			return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
-		}
+	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
+		Auth: &prom.ClientAuth{
+			Username:    env.GetDBBasicAuthUsername(),
+			Password:    env.GetDBBasicAuthUserPassword(),
+			BearerToken: env.GetDBBearerToken(),
+		},
+		QueryConcurrency: queryConcurrency,
+		QueryLogFile:     "",
+	})
+	if err != nil {
+		return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
+	}
 
-		m, err := prom.Validate(promClis[clientType])
-		if err != nil || !m.Running {
-			if err != nil {
-				log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", addr, err.Error(), prom.PrometheusTroubleshootingURL)
-			} else if !m.Running {
-				log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", addr, prom.PrometheusTroubleshootingURL)
-			}
-		} else {
-			log.Infof("Success: retrieved the 'up' query against prometheus at: %s", addr)
+	m, err := prom.Validate(promCli)
+	if err != nil || !m.Running {
+		if err != nil {
+			log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		} else if !m.Running {
+			log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
 		}
+	} else {
+		log.Infof("Success: retrieved the 'up' query against prometheus at: %s", address)
 	}
 
-	api := prometheusAPI.NewAPI(promClis[scrapeKey])
+	api := prometheusAPI.NewAPI(promCli)
 	_, err = api.Config(context.Background())
 	if err != nil {
-		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", promAddrs[scrapeKey], err.Error(), prom.PrometheusTroubleshootingURL)
+		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
 	} else {
-		log.Infof("Retrieved a prometheus config file from: %s", promAddrs[scrapeKey])
+		log.Infof("Retrieved a prometheus config file from: %s", address)
 	}
 
-	return promClis[env.Server], nil
+	return promCli, nil
 }
 
 func Execute(opts *AgentOpts) error {

+ 2 - 2
pkg/costmodel/allocation.go

@@ -17,8 +17,8 @@ const (
 	// upstream KSM has implementation change vs OC internal KSM - it sets metric to 0 when pod goes down
 	// VS OC implementation which stops emitting it
 	// by adding != 0 filter, we keep just the active times in the prom result
-	queryFmtPods    = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
-	queryFmtPodsUID = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
+	queryFmtPods                        = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, %s)[%s:%s]`
+	queryFmtPodsUID                     = `avg(kube_pod_container_status_running{%s} != 0) by (pod, namespace, uid, %s)[%s:%s]`
 
 	queryFmtRAMBytesAllocated           = `avg(avg_over_time(container_memory_allocation_bytes{container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s, provider_id)`
 	queryFmtRAMRequests                 = `avg(avg_over_time(kube_pod_container_resource_requests{resource="memory", unit="byte", container!="", container!="POD", node!="", %s}[%s])) by (container, pod, namespace, node, %s)`

+ 61 - 74
pkg/costmodel/router.go

@@ -86,7 +86,6 @@ var (
 type Accesses struct {
 	Router                   *httprouter.Router
 	PrometheusClient         prometheus.Client
-	PrometheusScrapeClient   prometheus.Client
 	ThanosClient             prometheus.Client
 	KubeClientSet            kubernetes.Interface
 	ClusterCache             clustercache.ClusterCache
@@ -1133,7 +1132,7 @@ func (a *Accesses) PrometheusConfig(w http.ResponseWriter, r *http.Request, _ ht
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	pConfig := map[string]string{
-		"address": env.GetPrometheusEndpoints()[env.Scrape],
+		"address": env.GetPrometheusServerEndpoint(),
 	}
 
 	body, err := json.Marshal(pConfig)
@@ -1408,19 +1407,15 @@ func (a *Accesses) Status(w http.ResponseWriter, r *http.Request, _ httprouter.P
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	endpoints := env.GetPrometheusEndpoints()
-	scrapeServer, ok := endpoints[env.Scrape]
-	if !ok {
-		scrapeServer = endpoints[env.Server]
-	}
+	promServer := env.GetPrometheusServerEndpoint()
 
-	api := prometheusAPI.NewAPI(a.PrometheusScrapeClient)
+	api := prometheusAPI.NewAPI(a.PrometheusClient)
 	result, err := api.Config(r.Context())
 	if err != nil {
-		fmt.Fprintf(w, "Using Prometheus at "+scrapeServer+". Error: "+err.Error())
+		fmt.Fprintf(w, "Using Prometheus at "+promServer+". Error: "+err.Error())
 	} else {
 
-		fmt.Fprintf(w, "Using Prometheus at "+scrapeServer+". PrometheusConfig: "+result.YAML)
+		fmt.Fprintf(w, "Using Prometheus at "+promServer+". PrometheusConfig: "+result.YAML)
 	}
 }
 
@@ -1511,8 +1506,8 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		}
 	}
 
-	promAddrs := env.GetPrometheusEndpoints()
-	if promAddrs[env.Server] == "" {
+	address := env.GetPrometheusServerEndpoint()
+	if address == "" {
 		log.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
@@ -1532,53 +1527,46 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		}
 	}
 
-	promClis := map[env.PrometheusType]prometheus.Client{}
-	for clientName, addr := range promAddrs {
-		promClis[clientName], err = prom.NewPrometheusClient(addr, &prom.PrometheusClientConfig{
-			Timeout:               timeout,
-			KeepAlive:             keepAlive,
-			TLSHandshakeTimeout:   tlsHandshakeTimeout,
-			TLSInsecureSkipVerify: env.GetInsecureSkipVerify(clientName),
-			RateLimitRetryOpts:    rateLimitRetryOpts,
-			Auth: &prom.ClientAuth{
-				Username:    env.GetDBBasicAuthUsername(clientName),
-				Password:    env.GetDBBasicAuthUserPassword(clientName),
-				BearerToken: env.GetDBBearerToken(clientName),
-			},
-			QueryConcurrency:  queryConcurrency,
-			QueryLogFile:      "",
-			HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(clientName),
-		})
+	promCli, err := prom.NewPrometheusClient(address, &prom.PrometheusClientConfig{
+		Timeout:               timeout,
+		KeepAlive:             keepAlive,
+		TLSHandshakeTimeout:   tlsHandshakeTimeout,
+		TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
+		RateLimitRetryOpts:    rateLimitRetryOpts,
+		Auth: &prom.ClientAuth{
+			Username:    env.GetDBBasicAuthUsername(),
+			Password:    env.GetDBBasicAuthUserPassword(),
+			BearerToken: env.GetDBBearerToken(),
+		},
+		QueryConcurrency:  queryConcurrency,
+		QueryLogFile:      "",
+		HeaderXScopeOrgId: env.GetPrometheusHeaderXScopeOrgId(),
+	})
+	if err != nil {
+		log.Fatalf("Failed to create prometheus client, Error: %v", err)
+	}
+
+	m, err := prom.Validate(promCli)
+	if err != nil || !m.Running {
 		if err != nil {
-			log.Fatalf("Failed to create prometheus client, Error: %v", err)
+			log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		} else if !m.Running {
+			log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
 		}
-		m, err := prom.Validate(promClis[clientName])
-		if err != nil || !m.Running {
-			if err != nil {
-				log.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", addr, err.Error(), prom.PrometheusTroubleshootingURL)
-			} else if !m.Running {
-				log.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", addr, prom.PrometheusTroubleshootingURL)
-			}
-		} else {
-			log.Infof("Success: retrieved the 'up' query against prometheus at: " + addr)
-		}
-	}
-	scrapeKey := env.Scrape
-	promScrapeCli, ok := promClis[env.Scrape]
-	if !ok {
-		promScrapeCli = promClis[env.Server]
-		scrapeKey = env.Server
+	} else {
+		log.Infof("Success: retrieved the 'up' query against prometheus at: " + address)
 	}
-	api := prometheusAPI.NewAPI(promScrapeCli)
+
+	api := prometheusAPI.NewAPI(promCli)
 	_, err = api.Config(context.Background())
 	if err != nil {
-		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", promAddrs[scrapeKey], err.Error(), prom.PrometheusTroubleshootingURL)
+		log.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/mimir/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
 	} else {
-		log.Infof("Retrieved a prometheus config file from: %s", promAddrs[scrapeKey])
+		log.Infof("Retrieved a prometheus config file from: %s", address)
 	}
 
 	// Lookup scrape interval for kubecost job, update if found
-	si, err := prom.ScrapeIntervalFor(promClis[env.Server], env.GetKubecostJobName())
+	si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
 	if err == nil {
 		scrapeInterval = si
 	}
@@ -1659,7 +1647,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 				Timeout:               timeout,
 				KeepAlive:             keepAlive,
 				TLSHandshakeTimeout:   tlsHandshakeTimeout,
-				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(env.Server),
+				TLSInsecureSkipVerify: env.GetInsecureSkipVerify(),
 				RateLimitRetryOpts:    rateLimitRetryOpts,
 				Auth: &prom.ClientAuth{
 					Username:    env.GetMultiClusterBasicAuthUsername(),
@@ -1699,7 +1687,7 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	if thanosClient != nil {
 		clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
 	} else {
-		clusterMap = clusters.NewClusterMap(promClis[env.Server], clusterInfoProvider, 5*time.Minute)
+		clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
 	}
 
 	// cache responses from model and aggregation for a default of 10 minutes;
@@ -1725,32 +1713,31 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	if thanosClient != nil {
 		pc = thanosClient
 	} else {
-		pc = promClis[env.Server]
+		pc = promCli
 	}
 	costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
-	metricsEmitter := NewCostModelMetricsEmitter(promClis[env.Server], k8sCache, cloudProvider, clusterInfoProvider, costModel)
+	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
 
 	a := &Accesses{
-		Router:                 httprouter.New(),
-		PrometheusClient:       promClis[env.Server],
-		PrometheusScrapeClient: promClis[scrapeKey],
-		ThanosClient:           thanosClient,
-		KubeClientSet:          kubeClientset,
-		ClusterCache:           k8sCache,
-		ClusterMap:             clusterMap,
-		CloudProvider:          cloudProvider,
-		CloudConfigController:  cloudconfig.NewController(cloudProvider),
-		ConfigFileManager:      confManager,
-		ClusterInfoProvider:    clusterInfoProvider,
-		Model:                  costModel,
-		MetricsEmitter:         metricsEmitter,
-		AggregateCache:         aggregateCache,
-		CostDataCache:          costDataCache,
-		ClusterCostsCache:      clusterCostsCache,
-		OutOfClusterCache:      outOfClusterCache,
-		SettingsCache:          settingsCache,
-		CacheExpiration:        cacheExpiration,
-		httpServices:           services.NewCostModelServices(),
+		Router:                httprouter.New(),
+		PrometheusClient:      promCli,
+		ThanosClient:          thanosClient,
+		KubeClientSet:         kubeClientset,
+		ClusterCache:          k8sCache,
+		ClusterMap:            clusterMap,
+		CloudProvider:         cloudProvider,
+		CloudConfigController: cloudconfig.NewController(cloudProvider),
+		ConfigFileManager:     confManager,
+		ClusterInfoProvider:   clusterInfoProvider,
+		Model:                 costModel,
+		MetricsEmitter:        metricsEmitter,
+		AggregateCache:        aggregateCache,
+		CostDataCache:         costDataCache,
+		ClusterCostsCache:     clusterCostsCache,
+		OutOfClusterCache:     outOfClusterCache,
+		SettingsCache:         settingsCache,
+		CacheExpiration:       cacheExpiration,
+		httpServices:          services.NewCostModelServices(),
 	}
 
 	// Use the Accesses instance, itself, as the CostModelAggregator. This is

+ 21 - 42
pkg/env/costmodelenv.go

@@ -26,7 +26,7 @@ const (
 	PodNameEnvVar                  = "POD_NAME"
 	ClusterIDEnvVar                = "CLUSTER_ID"
 	ClusterProfileEnvVar           = "CLUSTER_PROFILE"
-	PrometheusServerEndpointEnvVar = "%sPROMETHEUS_SERVER_ENDPOINT"
+	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
 	MaxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
 	QueryLoggingFileEnvVar         = "QUERY_LOGGING_FILE"
 	RemoteEnabledEnvVar            = "REMOTE_WRITE_ENABLED"
@@ -56,15 +56,15 @@ const (
 	ErrorReportingEnabledEnvVar   = "ERROR_REPORTING_ENABLED"
 	ValuesReportingEnabledEnvVar  = "VALUES_REPORTING_ENABLED"
 
-	DBBasicAuthUsername = "%sDB_BASIC_AUTH_USERNAME"
-	DBBasicAuthPassword = "%sDB_BASIC_AUTH_PW"
-	DBBearerToken       = "%sDB_BEARER_TOKEN"
+	DBBasicAuthUsername = "DB_BASIC_AUTH_USERNAME"
+	DBBasicAuthPassword = "DB_BASIC_AUTH_PW"
+	DBBearerToken       = "DB_BEARER_TOKEN"
 
 	MultiClusterBasicAuthUsername = "MC_BASIC_AUTH_USERNAME"
 	MultiClusterBasicAuthPassword = "MC_BASIC_AUTH_PW"
 	MultiClusterBearerToken       = "MC_BEARER_TOKEN"
 
-	InsecureSkipVerify = "%sINSECURE_SKIP_VERIFY"
+	InsecureSkipVerify = "INSECURE_SKIP_VERIFY"
 
 	KubeConfigPathEnvVar = "KUBECONFIG_PATH"
 
@@ -92,7 +92,7 @@ const (
 	PrometheusRetryOnRateLimitMaxRetriesEnvVar  = "PROMETHEUS_RETRY_ON_RATE_LIMIT_MAX_RETRIES"
 	PrometheusRetryOnRateLimitDefaultWaitEnvVar = "PROMETHEUS_RETRY_ON_RATE_LIMIT_DEFAULT_WAIT"
 
-	PrometheusHeaderXScopeOrgIdEnv = "%sPROMETHEUS_HEADER_X_SCOPE_ORGID"
+	PrometheusHeaderXScopeOrgIdEnvVar = "PROMETHEUS_HEADER_X_SCOPE_ORGID"
 
 	IngestPodUIDEnvVar = "INGEST_POD_UID"
 
@@ -117,21 +117,6 @@ const (
 	CloudCostRunWindowDaysEnvVar    = "CLOUD_COST_RUN_WINDOW_DAYS"
 )
 
-type PrometheusType int64
-
-const (
-	Server PrometheusType = iota
-	Scrape
-)
-
-func (pt PrometheusType) envFormat(envStr string) string {
-	value := ""
-	if pt == Scrape {
-		value = "SCRAPE_"
-	}
-	return fmt.Sprintf(envStr, value)
-}
-
 const DefaultConfigMountPath = "/var/configs"
 
 var offsetRegex = regexp.MustCompile(`^(\+|-)(\d\d):(\d\d)$`)
@@ -197,8 +182,8 @@ func GetPrometheusRetryOnRateLimitDefaultWait() time.Duration {
 // "PROMETHEUS_HEADER_X_SCOPE_ORGID": "my-cluster-name"
 // Then set Prometheus URL to prometheus API endpoint:
 // "PROMETHEUS_SERVER_ENDPOINT": "http://mimir-url/prometheus/"
-func GetPrometheusHeaderXScopeOrgId(clientType PrometheusType) string {
-	return Get(clientType.envFormat(PrometheusHeaderXScopeOrgIdEnv), "")
+func GetPrometheusHeaderXScopeOrgId() string {
+	return Get(PrometheusHeaderXScopeOrgIdEnvVar, "")
 }
 
 // GetPrometheusQueryOffset returns the time.Duration to offset all prometheus queries by. NOTE: This env var is applied
@@ -340,21 +325,14 @@ func GetPromClusterFilter() string {
 	return ""
 }
 
-// GetPrometheusEndpoints returns the environment variables values for
-// PrometheusServerEndpointEnvVar which represents the prometheus server endpoint used to execute prometheus queries and
-// SCRAPE_PrometheusServerEndpointEnvVar which represents prometheus scrape server endpoint used to get scrape config.
-func GetPrometheusEndpoints() map[PrometheusType]string {
-	output := map[PrometheusType]string{
-		Server: Get(Server.envFormat(PrometheusServerEndpointEnvVar), ""),
-	}
-	if value := Get(Scrape.envFormat(PrometheusServerEndpointEnvVar), ""); value != "" {
-		output[Scrape] = value
-	}
-	return output
+// GetPrometheusServerEndpoint returns the environment variable value for PrometheusServerEndpointEnvVar which
+// represents the prometheus server endpoint used to execute prometheus queries.
+func GetPrometheusServerEndpoint() string {
+	return Get(PrometheusServerEndpointEnvVar, "")
 }
 
-func GetInsecureSkipVerify(clientType PrometheusType) bool {
-	return GetBool(clientType.envFormat(InsecureSkipVerify), false)
+func GetInsecureSkipVerify() bool {
+	return GetBool(InsecureSkipVerify, false)
 }
 
 // IsAggregateCostModelCacheDisabled returns the environment variable value for DisableAggregateCostModelCache which
@@ -491,16 +469,17 @@ func GetQueryLoggingFile() string {
 	return Get(QueryLoggingFileEnvVar, "")
 }
 
-func GetDBBasicAuthUsername(clientType PrometheusType) string {
-	return Get(clientType.envFormat(DBBasicAuthUsername), "")
+func GetDBBasicAuthUsername() string {
+	return Get(DBBasicAuthUsername, "")
 }
 
-func GetDBBasicAuthUserPassword(clientType PrometheusType) string {
-	return Get(clientType.envFormat(DBBasicAuthPassword), "")
+func GetDBBasicAuthUserPassword() string {
+	return Get(DBBasicAuthPassword, "")
+
 }
 
-func GetDBBearerToken(clientType PrometheusType) string {
-	return Get(clientType.envFormat(DBBearerToken), "")
+func GetDBBearerToken() string {
+	return Get(DBBearerToken, "")
 }
 
 // GetMultiClusterBasicAuthUsername returns the environment variable value for MultiClusterBasicAuthUsername