Jelajahi Sumber

Add better identification methods for determining which type of prometheus.Client you're using.
Add ClusterMap which populates with the kubecost_cluster_info metric

Matt Bolt 5 tahun lalu
induk
melakukan
4bffe5119a

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+.idea

+ 234 - 0
pkg/costmodel/clusters/clustermap.go

@@ -0,0 +1,234 @@
+package clusters
+
+import (
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/thanos"
+
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+const (
+	LoadRetries    int           = 6
+	LoadRetryDelay time.Duration = 10 * time.Second
+)
+
+type ClusterInfo struct {
+	ID          string
+	Name        string
+	Profile     string
+	Provider    string
+	Provisioner string
+}
+
+type ClusterMap interface {
+	// InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
+	// doesn't exist
+	InfoFor(clusterID string) *ClusterInfo
+
+	// NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
+	// assigned name. Otherwise, just the clusterID is returned.
+	NameIDFor(clusterID string) string
+
+	// SplitNameID splits the nameID back into a separate id and name field
+	SplitNameID(nameID string) (id string, name string)
+
+	// StopRefresh stops the automatic internal map refresh
+	StopRefresh()
+}
+
+// ClusterMap keeps records of all known cost-model clusters.
+type PrometheusClusterMap struct {
+	lock     *sync.RWMutex
+	client   prometheus.Client
+	clusters map[string]*ClusterInfo
+	stop     chan struct{}
+}
+
+// NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
+func NewClusterMap(client prometheus.Client, refresh time.Duration) ClusterMap {
+	stop := make(chan struct{})
+
+	cm := &PrometheusClusterMap{
+		lock:     new(sync.RWMutex),
+		client:   client,
+		clusters: make(map[string]*ClusterInfo),
+		stop:     stop,
+	}
+
+	// Run an updater to ensure cluster data stays relevant over time
+	go func() {
+		// Immediately Attempt to refresh the clusters
+		cm.refreshClusters()
+
+		// Tick on interval and refresh clusters
+		ticker := time.NewTicker(refresh)
+		for {
+			select {
+			case <-ticker.C:
+				cm.refreshClusters()
+			case <-cm.stop:
+				log.Infof("ClusterMap refresh stopped.")
+				return
+			}
+		}
+	}()
+
+	return cm
+}
+
+// clusterInfoQuery returns the query string to load cluster info
+func clusterInfoQuery(offset string) string {
+	return fmt.Sprintf("kubecost_cluster_info%s", offset)
+}
+
+// loadClusters loads all the cluster info to map
+func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
+	var offset string = ""
+	if prom.IsThanos(pcm.client) {
+		offset = thanos.QueryOffset()
+	}
+
+	// Execute Query
+	tryQuery := func() ([]*prom.QueryResult, error) {
+		ctx := prom.NewContext(pcm.client)
+		return ctx.QuerySync(clusterInfoQuery(offset))
+	}
+
+	var qr []*prom.QueryResult
+	var err error
+
+	// Retry on failure
+	delay := LoadRetryDelay
+	for r := LoadRetries; r > 0; r-- {
+		qr, err = tryQuery()
+
+		// non-error breaks out of loop
+		if err == nil {
+			break
+		}
+
+		// wait the delay
+		time.Sleep(delay)
+
+		// add some random backoff
+		jitter := time.Duration(rand.Int63n(int64(delay)))
+		delay = delay + jitter/2
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	clusters := make(map[string]*ClusterInfo)
+
+	// Load the query results. Critical fields are id and name.
+	for _, result := range qr {
+		id, err := result.GetString("id")
+		if err != nil {
+			log.Warningf("Failed to load 'id' field for ClusterInfo")
+			continue
+		}
+
+		name, err := result.GetString("name")
+		if err != nil {
+			log.Warningf("Failed to load 'name' field for ClusterInfo")
+			continue
+		}
+
+		profile, err := result.GetString("clusterprofile")
+		if err != nil {
+			profile = ""
+		}
+
+		provider, err := result.GetString("provider")
+		if err != nil {
+			provider = ""
+		}
+
+		provisioner, err := result.GetString("provisioner")
+		if err != nil {
+			provisioner = ""
+		}
+
+		clusters[id] = &ClusterInfo{
+			ID:          id,
+			Name:        name,
+			Profile:     profile,
+			Provider:    provider,
+			Provisioner: provisioner,
+		}
+	}
+
+	return clusters, nil
+}
+
+// refreshClusters loads the clusters and updates the internal map
+func (pcm *PrometheusClusterMap) refreshClusters() {
+	updated, err := pcm.loadClusters()
+	if err != nil {
+		log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
+		return
+	}
+
+	pcm.lock.Lock()
+	pcm.clusters = updated
+	pcm.lock.Unlock()
+}
+
+// InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
+// doesn't exist
+func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
+	pcm.lock.RLock()
+	defer pcm.lock.RUnlock()
+
+	if info, ok := pcm.clusters[clusterID]; ok {
+		return info
+	}
+
+	return nil
+}
+
+// NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
+// assigned name. Otherwise, just the clusterID is returned.
+func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
+	pcm.lock.RLock()
+	defer pcm.lock.RUnlock()
+
+	if info, ok := pcm.clusters[clusterID]; ok {
+		if info.Name == "" {
+			return clusterID
+		}
+
+		return fmt.Sprintf("%s/%s", info.Name, clusterID)
+	}
+
+	return clusterID
+}
+
+func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
+	if !strings.Contains(nameID, "/") {
+		id = nameID
+		name = ""
+		return
+	}
+
+	split := strings.Split(nameID, "/")
+	name = split[0]
+	id = split[1]
+	return
+}
+
+// StopRefresh stops the automatic internal map refresh
+func (pcm *PrometheusClusterMap) StopRefresh() {
+	if pcm.stop != nil {
+		close(pcm.stop)
+		pcm.stop = nil
+	}
+}

+ 31 - 76
pkg/costmodel/costmodel.go

@@ -10,10 +10,10 @@ import (
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
-	"github.com/kubecost/cost-model/pkg/thanos"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	v1 "k8s.io/api/core/v1"
@@ -48,15 +48,17 @@ var isCron = regexp.MustCompile(`^(.+)-\d{10}$`)
 
 type CostModel struct {
 	Cache        clustercache.ClusterCache
+	ClusterMap   clusters.ClusterMap
 	RequestGroup *singleflight.Group
 }
 
-func NewCostModel(cache clustercache.ClusterCache) *CostModel {
+func NewCostModel(cache clustercache.ClusterCache, clusterMap clusters.ClusterMap) *CostModel {
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 
 	return &CostModel{
 		Cache:        cache,
+		ClusterMap:   clusterMap,
 		RequestGroup: requestGroup,
 	}
 }
@@ -223,58 +225,6 @@ const (
 	kubecostUpMinsPerHourStr  = `max(count_over_time(node_cpu_hourly_cost[%s:1m])) / %f`
 )
 
-type PrometheusMetadata struct {
-	Running            bool `json:"running"`
-	KubecostDataExists bool `json:"kubecostDataExists"`
-}
-
-// ValidatePrometheus tells the model what data prometheus has on it.
-func ValidatePrometheus(cli prometheusClient.Client, isThanos bool) (*PrometheusMetadata, error) {
-	q := "up"
-	if isThanos {
-		q += thanos.QueryOffset()
-	}
-
-	ctx := prom.NewContext(cli)
-
-	resUp, err := ctx.QuerySync(q)
-	if err != nil {
-		return &PrometheusMetadata{
-			Running:            false,
-			KubecostDataExists: false,
-		}, err
-	}
-
-	if len(resUp) == 0 {
-		return &PrometheusMetadata{
-			Running:            false,
-			KubecostDataExists: false,
-		}, fmt.Errorf("no running jobs on Prometheus at %s", ctx.QueryURL().Path)
-	}
-
-	for _, result := range resUp {
-		job, err := result.GetString("job")
-		if err != nil {
-			return &PrometheusMetadata{
-				Running:            false,
-				KubecostDataExists: false,
-			}, fmt.Errorf("up query does not have job names")
-		}
-
-		if job == "kubecost" {
-			return &PrometheusMetadata{
-				Running:            true,
-				KubecostDataExists: true,
-			}, err
-		}
-	}
-
-	return &PrometheusMetadata{
-		Running:            true,
-		KubecostDataExists: false,
-	}, nil
-}
-
 func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, window, offset, window, offset)
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
@@ -514,27 +464,27 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				RAMReqV, ok := RAMReqMap[newKey]
 				if !ok {
 					klog.V(4).Info("no RAM requests for " + newKey)
-					RAMReqV = []*util.Vector{&util.Vector{}}
+					RAMReqV = []*util.Vector{{}}
 				}
 				RAMUsedV, ok := RAMUsedMap[newKey]
 				if !ok {
 					klog.V(4).Info("no RAM usage for " + newKey)
-					RAMUsedV = []*util.Vector{&util.Vector{}}
+					RAMUsedV = []*util.Vector{{}}
 				}
 				CPUReqV, ok := CPUReqMap[newKey]
 				if !ok {
 					klog.V(4).Info("no CPU requests for " + newKey)
-					CPUReqV = []*util.Vector{&util.Vector{}}
+					CPUReqV = []*util.Vector{{}}
 				}
 				GPUReqV, ok := GPUReqMap[newKey]
 				if !ok {
 					klog.V(4).Info("no GPU requests for " + newKey)
-					GPUReqV = []*util.Vector{&util.Vector{}}
+					GPUReqV = []*util.Vector{{}}
 				}
 				CPUUsedV, ok := CPUUsedMap[newKey]
 				if !ok {
 					klog.V(4).Info("no CPU usage for " + newKey)
-					CPUUsedV = []*util.Vector{&util.Vector{}}
+					CPUUsedV = []*util.Vector{{}}
 				}
 
 				var pvReq []*PersistentVolumeClaimData
@@ -564,7 +514,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					NetworkData:     netReq,
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
-					ClusterID:       clusterID,
+					ClusterID:       cm.ClusterMap.NameIDFor(clusterID),
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed, "CPU")
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed, "RAM")
@@ -584,27 +534,27 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 			RAMReqV, ok := RAMReqMap[key]
 			if !ok {
 				klog.V(4).Info("no RAM requests for " + key)
-				RAMReqV = []*util.Vector{&util.Vector{}}
+				RAMReqV = []*util.Vector{{}}
 			}
 			RAMUsedV, ok := RAMUsedMap[key]
 			if !ok {
 				klog.V(4).Info("no RAM usage for " + key)
-				RAMUsedV = []*util.Vector{&util.Vector{}}
+				RAMUsedV = []*util.Vector{{}}
 			}
 			CPUReqV, ok := CPUReqMap[key]
 			if !ok {
 				klog.V(4).Info("no CPU requests for " + key)
-				CPUReqV = []*util.Vector{&util.Vector{}}
+				CPUReqV = []*util.Vector{{}}
 			}
 			GPUReqV, ok := GPUReqMap[key]
 			if !ok {
 				klog.V(4).Info("no GPU requests for " + key)
-				GPUReqV = []*util.Vector{&util.Vector{}}
+				GPUReqV = []*util.Vector{{}}
 			}
 			CPUUsedV, ok := CPUUsedMap[key]
 			if !ok {
 				klog.V(4).Info("no CPU usage for " + key)
-				CPUUsedV = []*util.Vector{&util.Vector{}}
+				CPUUsedV = []*util.Vector{{}}
 			}
 
 			node, ok := nodes[c.NodeName]
@@ -634,7 +584,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				CPUUsed:         CPUUsedV,
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
-				ClusterID:       c.ClusterID,
+				ClusterID:       cm.ClusterMap.NameIDFor(c.ClusterID),
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed, "CPU")
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed, "RAM")
@@ -649,7 +599,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	}
 	// Use unmounted pvs to create a mapping of "Unmounted-<Namespace>" containers
 	// to pass along the cost data
-	unmounted := findUnmountedPVCostData(unmountedPVs, namespaceLabelsMapping)
+	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping)
 	for k, costs := range unmounted {
 		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
 
@@ -672,7 +622,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	return containerNameCost, err
 }
 
-func findUnmountedPVCostData(unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string) map[string]*CostData {
+func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string) map[string]*CostData {
 	costs := make(map[string]*CostData)
 	if len(unmountedPVs) == 0 {
 		return costs
@@ -706,7 +656,7 @@ func findUnmountedPVCostData(unmountedPVs map[string][]*PersistentVolumeClaimDat
 				Namespace:       ns,
 				NamespaceLabels: namespacelabels,
 				Labels:          namespacelabels,
-				ClusterID:       clusterID,
+				ClusterID:       clusterMap.NameIDFor(clusterID),
 				PVCData:         pv,
 			}
 		} else {
@@ -1440,15 +1390,20 @@ func sliceToSet(s []string) map[string]bool {
 
 func setToSlice(m map[string]bool) []string {
 	var result []string
-	for k, _ := range m {
+	for k := range m {
 		result = append(result, k)
 	}
 	return result
 }
 
-func costDataPassesFilters(costs *CostData, namespace string, cluster string) bool {
+func costDataPassesFilters(cm clusters.ClusterMap, costs *CostData, namespace string, cluster string) bool {
 	passesNamespace := namespace == "" || costs.Namespace == namespace
-	passesCluster := cluster == "" || costs.ClusterID == cluster
+
+	passesCluster := true
+	if cluster != "" {
+		id, name := cm.SplitNameID(costs.ClusterID)
+		passesCluster = id == cluster || name == cluster
+	}
 
 	return passesNamespace && passesCluster
 }
@@ -2061,10 +2016,10 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			NamespaceLabels: namespaceLabels,
 			PVCData:         podPVs,
 			NetworkData:     podNetCosts,
-			ClusterID:       c.ClusterID,
+			ClusterID:       cm.ClusterMap.NameIDFor(c.ClusterID),
 		}
 
-		if costDataPassesFilters(costs, filterNamespace, filterCluster) {
+		if costDataPassesFilters(cm.ClusterMap, costs, filterNamespace, filterCluster) {
 			containerNameCost[key] = costs
 			missingContainers[key] = costs
 		}
@@ -2072,11 +2027,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): build CostData map", durHrs))
 
-	unmounted := findUnmountedPVCostData(unmountedPVs, namespaceLabelsMapping)
+	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping)
 	for k, costs := range unmounted {
 		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
 
-		if costDataPassesFilters(costs, filterNamespace, filterCluster) {
+		if costDataPassesFilters(cm.ClusterMap, costs, filterNamespace, filterCluster) {
 			containerNameCost[k] = costs
 		}
 	}

+ 54 - 74
pkg/costmodel/router.go

@@ -2,11 +2,9 @@ package costmodel
 
 import (
 	"context"
-	"crypto/tls"
 	"encoding/json"
 	"flag"
 	"fmt"
-	"net"
 	"net/http"
 	"reflect"
 	"strconv"
@@ -22,6 +20,7 @@ import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	cm "github.com/kubecost/cost-model/pkg/clustermanager"
+	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/prom"
@@ -46,13 +45,7 @@ const (
 
 var (
 	// gitCommit is set by the build system
-	gitCommit                       string
-	dbBasicAuthUsername             string = env.GetDBBasicAuthUsername()
-	dbBasicAuthPW                   string = env.GetDBBasicAuthUserPassword()
-	dbBearerToken                   string = env.GetDBBearerToken()
-	multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
-	multiclusterDBBasicAuthPW       string = env.GetMultiClusterBasicAuthPassword()
-	multiClusterBearerToken         string = env.GetMultiClusterBearerToken()
+	gitCommit string
 )
 
 var Router = httprouter.New()
@@ -63,6 +56,7 @@ type Accesses struct {
 	ThanosClient                  prometheusClient.Client
 	KubeClientSet                 kubernetes.Interface
 	ClusterManager                *cm.ClusterManager
+	ClusterMap                    clusters.ClusterMap
 	Cloud                         costAnalyzerCloud.Provider
 	CPUPriceRecorder              *prometheus.GaugeVec
 	RAMPriceRecorder              *prometheus.GaugeVec
@@ -637,7 +631,8 @@ func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Reques
 func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
+
+	w.Write(WrapData(prom.Validate(p.PrometheusClient)))
 }
 
 // Creates a new ClusterManager instance using a boltdb storage. If that fails,
@@ -730,30 +725,18 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	queryConcurrency := env.GetMaxQueryConcurrency()
 	klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
 
-	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
-	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
-		Proxy: http.ProxyFromEnvironment,
-		DialContext: (&net.Dialer{
-			Timeout:   120 * time.Second,
-			KeepAlive: 120 * time.Second,
-		}).DialContext,
-		TLSHandshakeTimeout: 10 * time.Second,
-		TLSClientConfig:     tlsConfig,
-	}
-
-	pc := prometheusClient.Config{
-		Address:      address,
-		RoundTripper: LongTimeoutRoundTripper,
-	}
-	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, dbBasicAuthUsername, dbBasicAuthPW, dbBearerToken, env.GetQueryLoggingFile())
+	timeout := 120 * time.Second
+	keepAlive := 120 * time.Second
 
-	m, err := ValidatePrometheus(promCli, false)
+	promCli, _ := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	m, err := prom.Validate(promCli)
 	if err != nil || m.Running == false {
 		if err != nil {
 			klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
 		} else if m.Running == false {
 			klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
 		}
+
 		api := prometheusAPI.NewAPI(promCli)
 		_, err = api.Config(context.Background())
 		if err != nil {
@@ -808,6 +791,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 			}
 		}
 	}
+
 	kubecostNamespace := env.GetKubecostNamespace()
 	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
 	configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
@@ -934,30 +918,6 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 
-	A = Accesses{
-		PrometheusClient:              promCli,
-		KubeClientSet:                 kubeClientset,
-		ClusterManager:                clusterManager,
-		Cloud:                         cloudProvider,
-		CPUPriceRecorder:              cpuGv,
-		RAMPriceRecorder:              ramGv,
-		GPUPriceRecorder:              gpuGv,
-		NodeTotalPriceRecorder:        totalGv,
-		NodeSpotRecorder:              spotGv,
-		RAMAllocationRecorder:         RAMAllocation,
-		CPUAllocationRecorder:         CPUAllocation,
-		GPUAllocationRecorder:         GPUAllocation,
-		PVAllocationRecorder:          PVAllocation,
-		NetworkZoneEgressRecorder:     NetworkZoneEgressRecorder,
-		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
-		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
-		PersistentVolumePriceRecorder: pvGv,
-		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
-		LBCostRecorder:                LBCostRecorder,
-		Model:                         NewCostModel(k8sCache),
-		OutOfClusterCache:             outOfClusterCache,
-	}
-
 	remoteEnabled := env.IsRemoteEnabled()
 	if remoteEnabled {
 		info, err := cloudProvider.ClusterInfo()
@@ -972,35 +932,21 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	}
 
 	// Thanos Client
+	var thanosClient prometheusClient.Client
 	if thanos.IsEnabled() {
-		thanosUrl := thanos.QueryURL()
-
-		if thanosUrl != "" {
-			var thanosRT http.RoundTripper = &http.Transport{
-				Proxy: http.ProxyFromEnvironment,
-				DialContext: (&net.Dialer{
-					Timeout:   120 * time.Second,
-					KeepAlive: 120 * time.Second,
-				}).DialContext,
-				TLSHandshakeTimeout: 10 * time.Second,
-				TLSClientConfig:     tlsConfig,
-			}
-
-			thanosConfig := prometheusClient.Config{
-				Address:      thanosUrl,
-				RoundTripper: thanosRT,
-			}
+		thanosAddress := thanos.QueryURL()
 
-			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW, multiClusterBearerToken, env.GetQueryLoggingFile())
+		if thanosAddress != "" {
+			thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
 
-			_, err = ValidatePrometheus(thanosCli, true)
+			_, err = prom.Validate(thanosCli)
 			if err != nil {
-				klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
-				A.ThanosClient = thanosCli
+				klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
+				thanosClient = thanosCli
 			} else {
-				klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
+				klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
 
-				A.ThanosClient = thanosCli
+				thanosClient = thanosCli
 			}
 
 		} else {
@@ -1008,6 +954,40 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		}
 	}
 
+	// Initialize ClusterMap for maintaining ClusterInfo by ClusterID
+	var clusterMap clusters.ClusterMap
+	if thanosClient != nil {
+		clusterMap = clusters.NewClusterMap(thanosClient, 10*time.Minute)
+	} else {
+		clusterMap = clusters.NewClusterMap(promCli, 5*time.Minute)
+	}
+
+	A = Accesses{
+		PrometheusClient:              promCli,
+		ThanosClient:                  thanosClient,
+		KubeClientSet:                 kubeClientset,
+		ClusterManager:                clusterManager,
+		ClusterMap:                    clusterMap,
+		Cloud:                         cloudProvider,
+		CPUPriceRecorder:              cpuGv,
+		RAMPriceRecorder:              ramGv,
+		GPUPriceRecorder:              gpuGv,
+		NodeTotalPriceRecorder:        totalGv,
+		NodeSpotRecorder:              spotGv,
+		RAMAllocationRecorder:         RAMAllocation,
+		CPUAllocationRecorder:         CPUAllocation,
+		GPUAllocationRecorder:         GPUAllocation,
+		PVAllocationRecorder:          PVAllocation,
+		NetworkZoneEgressRecorder:     NetworkZoneEgressRecorder,
+		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
+		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
+		PersistentVolumePriceRecorder: pvGv,
+		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
+		LBCostRecorder:                LBCostRecorder,
+		Model:                         NewCostModel(k8sCache, clusterMap),
+		OutOfClusterCache:             outOfClusterCache,
+	}
+
 	err = A.Cloud.DownloadPricingData()
 	if err != nil {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())

+ 47 - 0
pkg/prom/ids.go

@@ -0,0 +1,47 @@
+package prom
+
+import (
+	"strings"
+
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+const (
+	// PrometheusClientID is the identifier used when creating the client that
+	// targets prometheus. This can be used to check a specific client instance
+	// by calling prom.IsClientID(client, prom.PrometheusClientID)
+	PrometheusClientID string = "Prometheus"
+
+	// ThanosClientID is the identifier used when creating the client that
+	// targets thanos. This can be used to check a specific client instance
+	// by calling prom.IsClientID(client, prom.ThanosClientID)
+	ThanosClientID string = "Thanos"
+)
+
+// identityClient provides an interface for extracting an indentifer from the client objects
+type identityClient interface {
+	ID() string
+}
+
+// IsClientID returns true if the client has an identifier of the specific type.
+func IsClientID(cli prometheus.Client, id string) bool {
+	if cli == nil {
+		return false
+	}
+
+	if idClient, ok := cli.(identityClient); ok {
+		return strings.EqualFold(idClient.ID(), id)
+	}
+
+	return false
+}
+
+// IsPrometheus returns true if the client provided is used to target prometheus
+func IsPrometheus(cli prometheus.Client) bool {
+	return IsClientID(cli, PrometheusClientID)
+}
+
+// IsThanos returns true if the client provided is used to target thanos
+func IsThanos(cli prometheus.Client) bool {
+	return IsClientID(cli, ThanosClientID)
+}

+ 109 - 48
pkg/prom/prom.go

@@ -2,29 +2,62 @@ package prom
 
 import (
 	"context"
+	"crypto/tls"
+	"net"
 	"net/http"
 	"net/url"
 	"os"
 	"time"
 
-	golog "log"
-
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
+
+	golog "log"
+
 	prometheus "github.com/prometheus/client_golang/api"
 )
 
-// Creates a new prometheus client which limits the total number of concurrent outbound requests
-// allowed at a given moment.
+//--------------------------------------------------------------------------
+//  ClientAuth
+//--------------------------------------------------------------------------
+
+// ClientAuth is used to authenticate outgoing client requests.
+type ClientAuth struct {
+	Username    string
+	Password    string
+	BearerToken string
+}
+
+// Apply Applies the authentication data to the request headers
+func (auth *ClientAuth) Apply(req *http.Request) {
+	if auth == nil {
+		return
+	}
+
+	if auth.Username != "" {
+		req.SetBasicAuth(auth.Username, auth.Password)
+	}
+
+	if auth.BearerToken != "" {
+		token := "Bearer " + auth.BearerToken
+		req.Header.Add("Authorization", token)
+	}
+}
+
+//--------------------------------------------------------------------------
+//  RateLimitedPrometheusClient
+//--------------------------------------------------------------------------
+
+// RateLimitedPrometheusClient is a prometheus client which limits the total number of
+// concurrent outbound requests allowed at a given moment.
 type RateLimitedPrometheusClient struct {
-	client      prometheus.Client
-	limiter     *util.Semaphore
-	queue       util.BlockingQueue
-	outbound    *util.AtomicInt32
-	username    string
-	password    string
-	bearerToken string
-	fileLogger  *golog.Logger
+	id         string
+	client     prometheus.Client
+	auth       *ClientAuth
+	queue      util.BlockingQueue
+	outbound   *util.AtomicInt32
+	fileLogger *golog.Logger
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -36,7 +69,7 @@ type requestCounter interface {
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
-func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password, bearerToken string, queryLogFile string) (prometheus.Client, error) {
+func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, queryLogFile string) (prometheus.Client, error) {
 	c, err := prometheus.NewClient(config)
 	if err != nil {
 		return nil, err
@@ -61,13 +94,12 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	}
 
 	rlpc := &RateLimitedPrometheusClient{
-		client:      c,
-		queue:       queue,
-		outbound:    outbound,
-		username:    username,
-		password:    password,
-		bearerToken: bearerToken,
-		fileLogger:  logger,
+		id:         id,
+		client:     c,
+		queue:      queue,
+		outbound:   outbound,
+		auth:       auth,
+		fileLogger: logger,
 	}
 
 	// Start concurrent request processing
@@ -78,27 +110,9 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	return rlpc, nil
 }
 
-// LogPrometheusClientState logs the current state, with respect to outbound requests, if that
-// information is available.
-func LogPrometheusClientState(client prometheus.Client) {
-	if rc, ok := client.(requestCounter); ok {
-		total := rc.TotalRequests()
-		outbound := rc.TotalOutboundRequests()
-		queued := total - outbound
-
-		log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
-	}
-}
-
-// LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
-func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
-	if l == nil {
-		return
-	}
-	qp := util.NewQueryParams(req.URL.Query())
-	query := qp.Get("query", "<Unknown>")
-
-	l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
+// ID is used to identify the type of client
+func (rlpc *RateLimitedPrometheusClient) ID() string {
+	return rlpc.id
 }
 
 // TotalRequests returns the total number of requests that are either waiting to be sent and/or
@@ -180,13 +194,7 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 
 // Rate limit and passthrough to prometheus client API
 func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
-	if rlpc.username != "" {
-		req.SetBasicAuth(rlpc.username, rlpc.password)
-	}
-	if rlpc.bearerToken != "" {
-		token := "Bearer " + rlpc.bearerToken
-		req.Header.Add("Authorization", token)
-	}
+	rlpc.auth.Apply(req)
 
 	respChan := make(chan *workResponse)
 	defer close(respChan)
@@ -202,3 +210,56 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 	workRes := <-respChan
 	return workRes.res, workRes.body, workRes.warnings, workRes.err
 }
+
+//--------------------------------------------------------------------------
+//  Client Helpers
+//--------------------------------------------------------------------------
+
+func NewPrometheusClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
+	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
+
+	// may be necessary for long prometheus queries. TODO: make this configurable
+	pc := prometheus.Config{
+		Address: address,
+		RoundTripper: &http.Transport{
+			Proxy: http.ProxyFromEnvironment,
+			DialContext: (&net.Dialer{
+				Timeout:   timeout,
+				KeepAlive: keepAlive,
+			}).DialContext,
+			TLSHandshakeTimeout: 10 * time.Second,
+			TLSClientConfig:     tlsConfig,
+		},
+	}
+
+	auth := &ClientAuth{
+		Username:    env.GetDBBasicAuthUsername(),
+		Password:    env.GetDBBasicAuthUserPassword(),
+		BearerToken: env.GetDBBearerToken(),
+	}
+
+	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, queryLogFile)
+}
+
+// LogPrometheusClientState logs the current state, with respect to outbound requests, if that
+// information is available.
+func LogPrometheusClientState(client prometheus.Client) {
+	if rc, ok := client.(requestCounter); ok {
+		total := rc.TotalRequests()
+		outbound := rc.TotalOutboundRequests()
+		queued := total - outbound
+
+		log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
+	}
+}
+
+// LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
+func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
+	if l == nil {
+		return
+	}
+	qp := util.NewQueryParams(req.URL.Query())
+	query := qp.Get("query", "<Unknown>")
+
+	l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
+}

+ 72 - 0
pkg/prom/validate.go

@@ -0,0 +1,72 @@
+package prom
+
+import (
+	"fmt"
+
+	"github.com/kubecost/cost-model/pkg/env"
+
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+var (
+	prometheusValidateQuery string = "up"
+	thanosValidateQuery     string = fmt.Sprintf("up offset %s", env.GetThanosOffset())
+)
+
+// PrometheusMetadata represents a validation result for prometheus/thanos running
+// kubecost.
+type PrometheusMetadata struct {
+	Running            bool `json:"running"`
+	KubecostDataExists bool `json:"kubecostDataExists"`
+}
+
+// Validate tells the model what data prometheus has on it.
+func Validate(cli prometheus.Client) (*PrometheusMetadata, error) {
+	if IsClientID(cli, ThanosClientID) {
+		return validate(cli, thanosValidateQuery)
+	}
+
+	return validate(cli, prometheusValidateQuery)
+}
+
+// validate runs prometheus
+func validate(cli prometheus.Client, q string) (*PrometheusMetadata, error) {
+	ctx := NewContext(cli)
+
+	resUp, err := ctx.QuerySync(q)
+	if err != nil {
+		return &PrometheusMetadata{
+			Running:            false,
+			KubecostDataExists: false,
+		}, err
+	}
+
+	if len(resUp) == 0 {
+		return &PrometheusMetadata{
+			Running:            false,
+			KubecostDataExists: false,
+		}, fmt.Errorf("no running jobs on Prometheus at %s", ctx.QueryURL().Path)
+	}
+
+	for _, result := range resUp {
+		job, err := result.GetString("job")
+		if err != nil {
+			return &PrometheusMetadata{
+				Running:            false,
+				KubecostDataExists: false,
+			}, fmt.Errorf("up query does not have job names")
+		}
+
+		if job == "kubecost" {
+			return &PrometheusMetadata{
+				Running:            true,
+				KubecostDataExists: true,
+			}, err
+		}
+	}
+
+	return &PrometheusMetadata{
+		Running:            true,
+		KubecostDataExists: false,
+	}, nil
+}

+ 31 - 0
pkg/thanos/thanos.go

@@ -1,11 +1,17 @@
 package thanos
 
 import (
+	"crypto/tls"
 	"fmt"
+	"net"
+	"net/http"
 	"sync"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/prom"
+
+	prometheus "github.com/prometheus/client_golang/api"
 )
 
 var (
@@ -53,3 +59,28 @@ func OffsetDuration() time.Duration {
 func QueryOffset() string {
 	return queryOffset
 }
+
+func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
+	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
+
+	tc := prometheus.Config{
+		Address: address,
+		RoundTripper: &http.Transport{
+			Proxy: http.ProxyFromEnvironment,
+			DialContext: (&net.Dialer{
+				Timeout:   timeout,
+				KeepAlive: keepAlive,
+			}).DialContext,
+			TLSHandshakeTimeout: 10 * time.Second,
+			TLSClientConfig:     tlsConfig,
+		},
+	}
+
+	auth := &prom.ClientAuth{
+		Username:    env.GetMultiClusterBasicAuthUsername(),
+		Password:    env.GetMultiClusterBasicAuthPassword(),
+		BearerToken: env.GetMultiClusterBearerToken(),
+	}
+
+	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, queryLogFile)
+}

+ 3 - 0
pkg/util/blockingqueue.go

@@ -63,6 +63,9 @@ func (q *blockingSliceQueue) Dequeue() interface{} {
 	}
 
 	e := q.q[0]
+
+	// nil 0 index to prevent leak
+	q.q[0] = nil
 	q.q = q.q[1:]
 	return e
 }