Просмотр исходного кода

Merge pull request #580 from kubecost/develop

Merge develop into master 1.68.0
Ajay Tripathy 5 лет назад
Родитель
Сommit
fcc4ee49be

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+.idea

+ 50 - 22
pkg/costmodel/cluster.go

@@ -424,7 +424,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeRAMBytes := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
 	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeCPUModePct := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryActiveMins := fmt.Sprintf(`node_total_hourly_cost[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
@@ -435,7 +435,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
 	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
 	resChNodeLabels := ctx.Query(queryNodeLabels)
-	resChNodeCPUModePct := ctx.Query(queryNodeCPUModePct)
+	resChNodeCPUModeTotal := ctx.Query(queryNodeCPUModeTotal)
 	resChNodeRAMSystemPct := ctx.Query(queryNodeRAMSystemPct)
 	resChNodeRAMUserPct := ctx.Query(queryNodeRAMUserPct)
 	resChActiveMins := ctx.Query(queryActiveMins)
@@ -446,7 +446,7 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resNodeRAMCost, _ := resChNodeRAMCost.Await()
 	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
 	resNodeLabels, _ := resChNodeLabels.Await()
-	resNodeCPUModePct, _ := resChNodeCPUModePct.Await()
+	resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
 	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
 	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
 	resActiveMins, _ := resChActiveMins.Await()
@@ -621,13 +621,20 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		}
 	}
 
-	for _, result := range resNodeCPUModePct {
+	// Mapping of cluster/node=cpu for computing resource efficiency
+	clusterNodeCPUTotal := map[string]float64{}
+	// Mapping of cluster/node:mode=cpu for computing resource efficiency
+	clusterNodeModeCPUTotal := map[string]map[string]float64{}
+
+	// Build intermediate structures for CPU usage by (cluster, node) and by
+	// (cluster, node, mode) for computing resouce efficiency
+	for _, result := range resNodeCPUModeTotal {
 		cluster, err := result.GetString("cluster_id")
 		if err != nil {
 			cluster = env.GetClusterID()
 		}
 
-		name, err := result.GetString("kubernetes_node")
+		node, err := result.GetString("kubernetes_node")
 		if err != nil {
 			log.DedupedWarningf(5, "ClusterNodes: CPU mode data missing node")
 			continue
@@ -639,23 +646,43 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 			mode = "other"
 		}
 
-		pct := result.Values[0].Value
+		key := fmt.Sprintf("%s/%s", cluster, node)
 
-		key := fmt.Sprintf("%s/%s", cluster, name)
-		if _, ok := nodeMap[key]; !ok {
-			log.Warningf("ClusterNodes: CPU mode data for unidentified node")
-			continue
+		total := result.Values[0].Value
+
+		// Increment total
+		clusterNodeCPUTotal[key] += total
+
+		// Increment mode
+		if _, ok := clusterNodeModeCPUTotal[key]; !ok {
+			clusterNodeModeCPUTotal[key] = map[string]float64{}
 		}
+		clusterNodeModeCPUTotal[key][mode] += total
+	}
+
+	// Compute resource efficiency from intermediate structures
+	for key, total := range clusterNodeCPUTotal {
+		if modeTotals, ok := clusterNodeModeCPUTotal[key]; ok {
+			for mode, subtotal := range modeTotals {
+				// Compute percentage for the current cluster, node, mode
+				pct := subtotal / total
+
+				if _, ok := nodeMap[key]; !ok {
+					log.Warningf("ClusterNodes: CPU mode data for unidentified node")
+					continue
+				}
 
-		switch mode {
-		case "idle":
-			nodeMap[key].CPUBreakdown.Idle += pct
-		case "system":
-			nodeMap[key].CPUBreakdown.System += pct
-		case "user":
-			nodeMap[key].CPUBreakdown.User += pct
-		default:
-			nodeMap[key].CPUBreakdown.Other += pct
+				switch mode {
+				case "idle":
+					nodeMap[key].CPUBreakdown.Idle += pct
+				case "system":
+					nodeMap[key].CPUBreakdown.System += pct
+				case "user":
+					nodeMap[key].CPUBreakdown.User += pct
+				default:
+					nodeMap[key].CPUBreakdown.Other += pct
+				}
+			}
 		}
 	}
 
@@ -781,7 +808,8 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 		// TODO take RI into account
 		node.Discount = cp.CombinedDiscountForNode(node.NodeType, node.Preemptible, discount, negotiatedDiscount)
 
-		// Apply all remaining RAM to Idle
+		// Apply all remaining resources to Idle
+		node.CPUBreakdown.Idle = 1.0 - (node.CPUBreakdown.System + node.CPUBreakdown.Other + node.CPUBreakdown.User)
 		node.RAMBreakdown.Idle = 1.0 - (node.RAMBreakdown.System + node.RAMBreakdown.Other + node.RAMBreakdown.User)
 	}
 
@@ -815,8 +843,8 @@ func ClusterLoadBalancers(cp cloud.Provider, client prometheus.Client, duration,
 	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
 
 	ctx := prom.NewContext(client)
-	queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
-	queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
+	queryLBCost := fmt.Sprintf(`sum_over_time((avg(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
+	queryActiveMins := fmt.Sprintf(`count(kubecost_load_balancer_cost) by (namespace, service_name, cluster_id)[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
 	resChLBCost := ctx.Query(queryLBCost)
 	resChActiveMins := ctx.Query(queryActiveMins)

+ 296 - 0
pkg/costmodel/clusters/clustermap.go

@@ -0,0 +1,296 @@
+package clusters
+
+import (
+	"fmt"
+	"math/rand"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/thanos"
+
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+const (
+	LoadRetries    int           = 6
+	LoadRetryDelay time.Duration = 10 * time.Second
+)
+
+type ClusterInfo struct {
+	ID          string `json:"id"`
+	Name        string `json:"name"`
+	Profile     string `json:"profile"`
+	Provider    string `json:"provider"`
+	Provisioner string `json:"provisioner"`
+}
+
+// Clone creates a copy of ClusterInfo and returns it
+func (ci *ClusterInfo) Clone() *ClusterInfo {
+	if ci == nil {
+		return nil
+	}
+
+	return &ClusterInfo{
+		ID:          ci.ID,
+		Name:        ci.Name,
+		Profile:     ci.Profile,
+		Provider:    ci.Provider,
+		Provisioner: ci.Provisioner,
+	}
+}
+
+type ClusterMap interface {
+	// GetClusterIDs returns a slice containing all of the cluster identifiers.
+	GetClusterIDs() []string
+
+	// AsMap returns the cluster map as a standard go map
+	AsMap() map[string]*ClusterInfo
+
+	// InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
+	// doesn't exist
+	InfoFor(clusterID string) *ClusterInfo
+
+	// NameFor returns the name of the cluster provided the clusterID.
+	NameFor(clusterID string) string
+
+	// NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
+	// assigned name. Otherwise, just the clusterID is returned.
+	NameIDFor(clusterID string) string
+
+	// SplitNameID splits the nameID back into a separate id and name field
+	SplitNameID(nameID string) (id string, name string)
+
+	// StopRefresh stops the automatic internal map refresh
+	StopRefresh()
+}
+
+// ClusterMap keeps records of all known cost-model clusters.
+type PrometheusClusterMap struct {
+	lock     *sync.RWMutex
+	client   prometheus.Client
+	clusters map[string]*ClusterInfo
+	stop     chan struct{}
+}
+
+// NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
+func NewClusterMap(client prometheus.Client, refresh time.Duration) ClusterMap {
+	stop := make(chan struct{})
+
+	cm := &PrometheusClusterMap{
+		lock:     new(sync.RWMutex),
+		client:   client,
+		clusters: make(map[string]*ClusterInfo),
+		stop:     stop,
+	}
+
+	// Run an updater to ensure cluster data stays relevant over time
+	go func() {
+		// Immediately Attempt to refresh the clusters
+		cm.refreshClusters()
+
+		// Tick on interval and refresh clusters
+		ticker := time.NewTicker(refresh)
+		for {
+			select {
+			case <-ticker.C:
+				cm.refreshClusters()
+			case <-cm.stop:
+				log.Infof("ClusterMap refresh stopped.")
+				return
+			}
+		}
+	}()
+
+	return cm
+}
+
+// clusterInfoQuery returns the query string to load cluster info
+func clusterInfoQuery(offset string) string {
+	return fmt.Sprintf("kubecost_cluster_info%s", offset)
+}
+
+// loadClusters loads all the cluster info to map
+func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error) {
+	var offset string = ""
+	if prom.IsThanos(pcm.client) {
+		offset = thanos.QueryOffset()
+	}
+
+	// Execute Query
+	tryQuery := func() ([]*prom.QueryResult, error) {
+		ctx := prom.NewContext(pcm.client)
+		return ctx.QuerySync(clusterInfoQuery(offset))
+	}
+
+	var qr []*prom.QueryResult
+	var err error
+
+	// Retry on failure
+	delay := LoadRetryDelay
+	for r := LoadRetries; r > 0; r-- {
+		qr, err = tryQuery()
+
+		// non-error breaks out of loop
+		if err == nil {
+			break
+		}
+
+		// wait the delay
+		time.Sleep(delay)
+
+		// add some random backoff
+		jitter := time.Duration(rand.Int63n(int64(delay)))
+		delay = delay + jitter/2
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	clusters := make(map[string]*ClusterInfo)
+
+	// Load the query results. Critical fields are id and name.
+	for _, result := range qr {
+		id, err := result.GetString("id")
+		if err != nil {
+			log.Warningf("Failed to load 'id' field for ClusterInfo")
+			continue
+		}
+
+		name, err := result.GetString("name")
+		if err != nil {
+			log.Warningf("Failed to load 'name' field for ClusterInfo")
+			continue
+		}
+
+		profile, err := result.GetString("clusterprofile")
+		if err != nil {
+			profile = ""
+		}
+
+		provider, err := result.GetString("provider")
+		if err != nil {
+			provider = ""
+		}
+
+		provisioner, err := result.GetString("provisioner")
+		if err != nil {
+			provisioner = ""
+		}
+
+		clusters[id] = &ClusterInfo{
+			ID:          id,
+			Name:        name,
+			Profile:     profile,
+			Provider:    provider,
+			Provisioner: provisioner,
+		}
+	}
+
+	return clusters, nil
+}
+
+// refreshClusters loads the clusters and updates the internal map
+func (pcm *PrometheusClusterMap) refreshClusters() {
+	updated, err := pcm.loadClusters()
+	if err != nil {
+		log.Errorf("Failed to load cluster info via query after %d retries", LoadRetries)
+		return
+	}
+
+	pcm.lock.Lock()
+	pcm.clusters = updated
+	pcm.lock.Unlock()
+}
+
+// GetClusterIDs returns a slice containing all of the cluster identifiers.
+func (pcm *PrometheusClusterMap) GetClusterIDs() []string {
+	pcm.lock.RLock()
+	defer pcm.lock.RUnlock()
+
+	var clusterIDs []string
+	for id := range pcm.clusters {
+		clusterIDs = append(clusterIDs, id)
+	}
+
+	return clusterIDs
+}
+
+// AsMap returns the cluster map as a standard go map
+func (pcm *PrometheusClusterMap) AsMap() map[string]*ClusterInfo {
+	pcm.lock.RLock()
+	defer pcm.lock.RUnlock()
+
+	m := make(map[string]*ClusterInfo)
+	for k, v := range pcm.clusters {
+		m[k] = v.Clone()
+	}
+
+	return m
+}
+
+// InfoFor returns the ClusterInfo entry for the provided clusterID or nil if it
+// doesn't exist
+func (pcm *PrometheusClusterMap) InfoFor(clusterID string) *ClusterInfo {
+	pcm.lock.RLock()
+	defer pcm.lock.RUnlock()
+
+	if info, ok := pcm.clusters[clusterID]; ok {
+		return info.Clone()
+	}
+
+	return nil
+}
+
+// NameFor returns the name of the cluster provided the clusterID.
+func (pcm *PrometheusClusterMap) NameFor(clusterID string) string {
+	pcm.lock.RLock()
+	defer pcm.lock.RUnlock()
+
+	if info, ok := pcm.clusters[clusterID]; ok {
+		return info.Name
+	}
+
+	return ""
+}
+
+// NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
+// assigned name. Otherwise, just the clusterID is returned.
+func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
+	pcm.lock.RLock()
+	defer pcm.lock.RUnlock()
+
+	if info, ok := pcm.clusters[clusterID]; ok {
+		if info.Name == "" {
+			return clusterID
+		}
+
+		return fmt.Sprintf("%s/%s", info.Name, clusterID)
+	}
+
+	return clusterID
+}
+
+func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
+	if !strings.Contains(nameID, "/") {
+		id = nameID
+		name = ""
+		return
+	}
+
+	split := strings.Split(nameID, "/")
+	name = split[0]
+	id = split[1]
+	return
+}
+
+// StopRefresh stops the automatic internal map refresh
+func (pcm *PrometheusClusterMap) StopRefresh() {
+	if pcm.stop != nil {
+		close(pcm.stop)
+		pcm.stop = nil
+	}
+}

+ 35 - 105
pkg/costmodel/costmodel.go

@@ -10,10 +10,10 @@ import (
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
-	"github.com/kubecost/cost-model/pkg/thanos"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	v1 "k8s.io/api/core/v1"
@@ -48,15 +48,17 @@ var isCron = regexp.MustCompile(`^(.+)-\d{10}$`)
 
 type CostModel struct {
 	Cache        clustercache.ClusterCache
+	ClusterMap   clusters.ClusterMap
 	RequestGroup *singleflight.Group
 }
 
-func NewCostModel(cache clustercache.ClusterCache) *CostModel {
+func NewCostModel(cache clustercache.ClusterCache, clusterMap clusters.ClusterMap) *CostModel {
 	// request grouping to prevent over-requesting the same data prior to caching
 	requestGroup := new(singleflight.Group)
 
 	return &CostModel{
 		Cache:        cache,
+		ClusterMap:   clusterMap,
 		RequestGroup: requestGroup,
 	}
 }
@@ -84,6 +86,7 @@ type CostData struct {
 	Labels          map[string]string            `json:"labels,omitempty"`
 	NamespaceLabels map[string]string            `json:"namespaceLabels,omitempty"`
 	ClusterID       string                       `json:"clusterId"`
+	ClusterName     string                       `json:"clusterName"`
 }
 
 func (cd *CostData) String() string {
@@ -223,58 +226,6 @@ const (
 	kubecostUpMinsPerHourStr  = `max(count_over_time(node_cpu_hourly_cost[%s:1m])) / %f`
 )
 
-type PrometheusMetadata struct {
-	Running            bool `json:"running"`
-	KubecostDataExists bool `json:"kubecostDataExists"`
-}
-
-// ValidatePrometheus tells the model what data prometheus has on it.
-func ValidatePrometheus(cli prometheusClient.Client, isThanos bool) (*PrometheusMetadata, error) {
-	q := "up"
-	if isThanos {
-		q += thanos.QueryOffset()
-	}
-
-	ctx := prom.NewContext(cli)
-
-	resUp, err := ctx.QuerySync(q)
-	if err != nil {
-		return &PrometheusMetadata{
-			Running:            false,
-			KubecostDataExists: false,
-		}, err
-	}
-
-	if len(resUp) == 0 {
-		return &PrometheusMetadata{
-			Running:            false,
-			KubecostDataExists: false,
-		}, fmt.Errorf("no running jobs on Prometheus at %s", ctx.QueryURL().Path)
-	}
-
-	for _, result := range resUp {
-		job, err := result.GetString("job")
-		if err != nil {
-			return &PrometheusMetadata{
-				Running:            false,
-				KubecostDataExists: false,
-			}, fmt.Errorf("up query does not have job names")
-		}
-
-		if job == "kubecost" {
-			return &PrometheusMetadata{
-				Running:            true,
-				KubecostDataExists: true,
-			}, err
-		}
-	}
-
-	return &PrometheusMetadata{
-		Running:            true,
-		KubecostDataExists: false,
-	}, nil
-}
-
 func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, window, offset, window, offset)
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, window, offset, window, offset)
@@ -514,27 +465,27 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				RAMReqV, ok := RAMReqMap[newKey]
 				if !ok {
 					klog.V(4).Info("no RAM requests for " + newKey)
-					RAMReqV = []*util.Vector{&util.Vector{}}
+					RAMReqV = []*util.Vector{{}}
 				}
 				RAMUsedV, ok := RAMUsedMap[newKey]
 				if !ok {
 					klog.V(4).Info("no RAM usage for " + newKey)
-					RAMUsedV = []*util.Vector{&util.Vector{}}
+					RAMUsedV = []*util.Vector{{}}
 				}
 				CPUReqV, ok := CPUReqMap[newKey]
 				if !ok {
 					klog.V(4).Info("no CPU requests for " + newKey)
-					CPUReqV = []*util.Vector{&util.Vector{}}
+					CPUReqV = []*util.Vector{{}}
 				}
 				GPUReqV, ok := GPUReqMap[newKey]
 				if !ok {
 					klog.V(4).Info("no GPU requests for " + newKey)
-					GPUReqV = []*util.Vector{&util.Vector{}}
+					GPUReqV = []*util.Vector{{}}
 				}
 				CPUUsedV, ok := CPUUsedMap[newKey]
 				if !ok {
 					klog.V(4).Info("no CPU usage for " + newKey)
-					CPUUsedV = []*util.Vector{&util.Vector{}}
+					CPUUsedV = []*util.Vector{{}}
 				}
 
 				var pvReq []*PersistentVolumeClaimData
@@ -565,6 +516,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 					Labels:          podLabels,
 					NamespaceLabels: nsLabels,
 					ClusterID:       clusterID,
+					ClusterName:     cm.ClusterMap.NameFor(clusterID),
 				}
 				costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed, "CPU")
 				costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed, "RAM")
@@ -584,27 +536,27 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 			RAMReqV, ok := RAMReqMap[key]
 			if !ok {
 				klog.V(4).Info("no RAM requests for " + key)
-				RAMReqV = []*util.Vector{&util.Vector{}}
+				RAMReqV = []*util.Vector{{}}
 			}
 			RAMUsedV, ok := RAMUsedMap[key]
 			if !ok {
 				klog.V(4).Info("no RAM usage for " + key)
-				RAMUsedV = []*util.Vector{&util.Vector{}}
+				RAMUsedV = []*util.Vector{{}}
 			}
 			CPUReqV, ok := CPUReqMap[key]
 			if !ok {
 				klog.V(4).Info("no CPU requests for " + key)
-				CPUReqV = []*util.Vector{&util.Vector{}}
+				CPUReqV = []*util.Vector{{}}
 			}
 			GPUReqV, ok := GPUReqMap[key]
 			if !ok {
 				klog.V(4).Info("no GPU requests for " + key)
-				GPUReqV = []*util.Vector{&util.Vector{}}
+				GPUReqV = []*util.Vector{{}}
 			}
 			CPUUsedV, ok := CPUUsedMap[key]
 			if !ok {
 				klog.V(4).Info("no CPU usage for " + key)
-				CPUUsedV = []*util.Vector{&util.Vector{}}
+				CPUUsedV = []*util.Vector{{}}
 			}
 
 			node, ok := nodes[c.NodeName]
@@ -635,6 +587,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				GPUReq:          GPUReqV,
 				NamespaceLabels: namespacelabels,
 				ClusterID:       c.ClusterID,
+				ClusterName:     cm.ClusterMap.NameFor(c.ClusterID),
 			}
 			costs.CPUAllocation = getContainerAllocation(costs.CPUReq, costs.CPUUsed, "CPU")
 			costs.RAMAllocation = getContainerAllocation(costs.RAMReq, costs.RAMUsed, "RAM")
@@ -649,7 +602,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	}
 	// Use unmounted pvs to create a mapping of "Unmounted-<Namespace>" containers
 	// to pass along the cost data
-	unmounted := findUnmountedPVCostData(unmountedPVs, namespaceLabelsMapping)
+	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping)
 	for k, costs := range unmounted {
 		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
 
@@ -672,7 +625,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	return containerNameCost, err
 }
 
-func findUnmountedPVCostData(unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string) map[string]*CostData {
+func findUnmountedPVCostData(clusterMap clusters.ClusterMap, unmountedPVs map[string][]*PersistentVolumeClaimData, namespaceLabelsMapping map[string]map[string]string) map[string]*CostData {
 	costs := make(map[string]*CostData)
 	if len(unmountedPVs) == 0 {
 		return costs
@@ -707,6 +660,7 @@ func findUnmountedPVCostData(unmountedPVs map[string][]*PersistentVolumeClaimDat
 				NamespaceLabels: namespacelabels,
 				Labels:          namespacelabels,
 				ClusterID:       clusterID,
+				ClusterName:     clusterMap.NameFor(clusterID),
 				PVCData:         pv,
 			}
 		} else {
@@ -736,7 +690,6 @@ func findDeletedPodInfo(cli prometheusClient.Client, missingContainers map[strin
 			cm, _ := NewContainerMetricFromKey(key)
 			labels, ok := podLabels[cm.PodName]
 			if !ok {
-				log.Errorf("unable to find historical data for pod '%s'", cm.PodName)
 				labels = make(map[string]string)
 			}
 			for k, v := range costData.NamespaceLabels {
@@ -1440,15 +1393,15 @@ func sliceToSet(s []string) map[string]bool {
 
 func setToSlice(m map[string]bool) []string {
 	var result []string
-	for k, _ := range m {
+	for k := range m {
 		result = append(result, k)
 	}
 	return result
 }
 
-func costDataPassesFilters(costs *CostData, namespace string, cluster string) bool {
+func costDataPassesFilters(cm clusters.ClusterMap, costs *CostData, namespace string, cluster string) bool {
 	passesNamespace := namespace == "" || costs.Namespace == namespace
-	passesCluster := cluster == "" || costs.ClusterID == cluster
+	passesCluster := cluster == "" || costs.ClusterID == cluster || costs.ClusterName == cluster
 
 	return passesNamespace && passesCluster
 }
@@ -1686,10 +1639,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	normalizationValue, err := getNormalizations(resNormalization)
 	if err != nil {
 		msg := fmt.Sprintf("error computing normalization for start=%s, end=%s, window=%s, res=%f", start, end, window, resolutionHours*60*60)
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap(msg)
-		}
-		return nil, fmt.Errorf("%s: %s", msg, err)
+		return nil, prom.WrapError(err, msg)
 	}
 
 	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): compute normalizations", durHrs))
@@ -1805,10 +1755,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	RAMReqMap, err := GetNormalizedContainerMetricVectors(resRAMRequests, normalizationValue, clusterID)
 	if err != nil {
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap("GetNormalizedContainerMetricVectors(RAMRequests)")
-		}
-		return nil, fmt.Errorf("GetNormalizedContainerMetricVectors(RAMRequests): %s", err)
+		return nil, prom.WrapError(err, "GetNormalizedContainerMetricVectors(RAMRequests)")
 	}
 	for key := range RAMReqMap {
 		containers[key] = true
@@ -1816,10 +1763,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	RAMUsedMap, err := GetNormalizedContainerMetricVectors(resRAMUsage, normalizationValue, clusterID)
 	if err != nil {
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap("GetNormalizedContainerMetricVectors(RAMUsage)")
-		}
-		return nil, fmt.Errorf("GetNormalizedContainerMetricVectors(RAMUsage): %s", err)
+		return nil, prom.WrapError(err, "GetNormalizedContainerMetricVectors(RAMUsage)")
 	}
 	for key := range RAMUsedMap {
 		containers[key] = true
@@ -1827,10 +1771,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	CPUReqMap, err := GetNormalizedContainerMetricVectors(resCPURequests, normalizationValue, clusterID)
 	if err != nil {
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap("GetNormalizedContainerMetricVectors(CPURequests)")
-		}
-		return nil, fmt.Errorf("GetNormalizedContainerMetricVectors(CPURequests): %s", err)
+		return nil, prom.WrapError(err, "GetNormalizedContainerMetricVectors(CPURequests)")
 	}
 	for key := range CPUReqMap {
 		containers[key] = true
@@ -1840,10 +1781,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	// rate(container_cpu_usage_seconds_total) which properly accounts for normalized rates
 	CPUUsedMap, err := GetContainerMetricVectors(resCPUUsage, clusterID)
 	if err != nil {
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap("GetContainerMetricVectors(CPUUsage)")
-		}
-		return nil, fmt.Errorf("GetContainerMetricVectors(CPUUsage): %s", err)
+		return nil, prom.WrapError(err, "GetContainerMetricVectors(CPUUsage)")
 	}
 	for key := range CPUUsedMap {
 		containers[key] = true
@@ -1851,10 +1789,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	RAMAllocMap, err := GetContainerMetricVectors(resRAMAlloc, clusterID)
 	if err != nil {
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap("GetContainerMetricVectors(RAMAllocations)")
-		}
-		return nil, fmt.Errorf("GetContainerMetricVectors(RAMAllocations): %s", err)
+		return nil, prom.WrapError(err, "GetContainerMetricVectors(RAMAllocations)")
 	}
 	for key := range RAMAllocMap {
 		containers[key] = true
@@ -1862,10 +1797,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	CPUAllocMap, err := GetContainerMetricVectors(resCPUAlloc, clusterID)
 	if err != nil {
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap("GetContainerMetricVectors(CPUAllocations)")
-		}
-		return nil, fmt.Errorf("GetContainerMetricVectors(CPUAllocations): %s", err)
+		return nil, prom.WrapError(err, "GetContainerMetricVectors(CPUAllocations)")
 	}
 	for key := range CPUAllocMap {
 		containers[key] = true
@@ -1873,10 +1805,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	GPUReqMap, err := GetNormalizedContainerMetricVectors(resGPURequests, normalizationValue, clusterID)
 	if err != nil {
-		if pce, ok := err.(prom.CommError); ok {
-			return nil, pce.Wrap("GetContainerMetricVectors(GPURequests)")
-		}
-		return nil, fmt.Errorf("GetContainerMetricVectors(GPURequests): %s", err)
+		return nil, prom.WrapError(err, "GetContainerMetricVectors(GPURequests)")
 	}
 	for key := range GPUReqMap {
 		containers[key] = true
@@ -2062,9 +1991,10 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			PVCData:         podPVs,
 			NetworkData:     podNetCosts,
 			ClusterID:       c.ClusterID,
+			ClusterName:     cm.ClusterMap.NameFor(c.ClusterID),
 		}
 
-		if costDataPassesFilters(costs, filterNamespace, filterCluster) {
+		if costDataPassesFilters(cm.ClusterMap, costs, filterNamespace, filterCluster) {
 			containerNameCost[key] = costs
 			missingContainers[key] = costs
 		}
@@ -2072,11 +2002,11 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): build CostData map", durHrs))
 
-	unmounted := findUnmountedPVCostData(unmountedPVs, namespaceLabelsMapping)
+	unmounted := findUnmountedPVCostData(cm.ClusterMap, unmountedPVs, namespaceLabelsMapping)
 	for k, costs := range unmounted {
 		klog.V(4).Infof("Unmounted PVs in Namespace/ClusterID: %s/%s", costs.Namespace, costs.ClusterID)
 
-		if costDataPassesFilters(costs, filterNamespace, filterCluster) {
+		if costDataPassesFilters(cm.ClusterMap, costs, filterNamespace, filterCluster) {
 			containerNameCost[k] = costs
 		}
 	}

+ 3 - 3
pkg/costmodel/promparsers.go

@@ -392,10 +392,10 @@ func getCost(qrs []*prom.QueryResult) (map[string][]*util.Vector, error) {
 // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
 func getNormalization(qrs []*prom.QueryResult) (float64, error) {
 	if len(qrs) == 0 {
-		return 0.0, prom.NoDataErr
+		return 0.0, prom.NoDataErr("getNormalization")
 	}
 	if len(qrs[0].Values) == 0 {
-		return 0.0, prom.NoDataErr
+		return 0.0, prom.NoDataErr("getNormalization")
 	}
 	return qrs[0].Values[0].Value, nil
 }
@@ -404,7 +404,7 @@ func getNormalization(qrs []*prom.QueryResult) (float64, error) {
 // normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running
 func getNormalizations(qrs []*prom.QueryResult) ([]*util.Vector, error) {
 	if len(qrs) == 0 {
-		return nil, prom.NoDataErr
+		return nil, prom.NoDataErr("getNormalizations")
 	}
 
 	return qrs[0].Values, nil

+ 67 - 75
pkg/costmodel/router.go

@@ -2,11 +2,9 @@ package costmodel
 
 import (
 	"context"
-	"crypto/tls"
 	"encoding/json"
 	"flag"
 	"fmt"
-	"net"
 	"net/http"
 	"reflect"
 	"strconv"
@@ -22,6 +20,7 @@ import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	cm "github.com/kubecost/cost-model/pkg/clustermanager"
+	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/prom"
@@ -46,13 +45,7 @@ const (
 
 var (
 	// gitCommit is set by the build system
-	gitCommit                       string
-	dbBasicAuthUsername             string = env.GetDBBasicAuthUsername()
-	dbBasicAuthPW                   string = env.GetDBBasicAuthUserPassword()
-	dbBearerToken                   string = env.GetDBBearerToken()
-	multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
-	multiclusterDBBasicAuthPW       string = env.GetMultiClusterBasicAuthPassword()
-	multiClusterBearerToken         string = env.GetMultiClusterBearerToken()
+	gitCommit string
 )
 
 var Router = httprouter.New()
@@ -63,6 +56,7 @@ type Accesses struct {
 	ThanosClient                  prometheusClient.Client
 	KubeClientSet                 kubernetes.Interface
 	ClusterManager                *cm.ClusterManager
+	ClusterMap                    clusters.ClusterMap
 	Cloud                         costAnalyzerCloud.Provider
 	CPUPriceRecorder              *prometheus.GaugeVec
 	RAMPriceRecorder              *prometheus.GaugeVec
@@ -627,6 +621,15 @@ func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
 	w.Write(WrapData(data, nil))
 }
 
+func (p *Accesses) GetClusterInfoMap(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	data := p.ClusterMap.AsMap()
+
+	w.Write(WrapData(data, nil))
+}
+
 func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -637,7 +640,8 @@ func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Reques
 func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	w.Write(WrapData(ValidatePrometheus(p.PrometheusClient, false)))
+
+	w.Write(WrapData(prom.Validate(p.PrometheusClient)))
 }
 
 // Creates a new ClusterManager instance using a boltdb storage. If that fails,
@@ -730,30 +734,18 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	queryConcurrency := env.GetMaxQueryConcurrency()
 	klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
 
-	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
-	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
-		Proxy: http.ProxyFromEnvironment,
-		DialContext: (&net.Dialer{
-			Timeout:   120 * time.Second,
-			KeepAlive: 120 * time.Second,
-		}).DialContext,
-		TLSHandshakeTimeout: 10 * time.Second,
-		TLSClientConfig:     tlsConfig,
-	}
+	timeout := 120 * time.Second
+	keepAlive := 120 * time.Second
 
-	pc := prometheusClient.Config{
-		Address:      address,
-		RoundTripper: LongTimeoutRoundTripper,
-	}
-	promCli, _ := prom.NewRateLimitedClient(pc, queryConcurrency, dbBasicAuthUsername, dbBasicAuthPW, dbBearerToken, env.GetQueryLoggingFile())
-
-	m, err := ValidatePrometheus(promCli, false)
+	promCli, _ := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	m, err := prom.Validate(promCli)
 	if err != nil || m.Running == false {
 		if err != nil {
 			klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
 		} else if m.Running == false {
 			klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
 		}
+
 		api := prometheusAPI.NewAPI(promCli)
 		_, err = api.Config(context.Background())
 		if err != nil {
@@ -808,6 +800,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 			}
 		}
 	}
+
 	kubecostNamespace := env.GetKubecostNamespace()
 	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
 	configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
@@ -934,30 +927,6 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	// cache responses from model for a default of 5 minutes; clear expired responses every 10 minutes
 	outOfClusterCache := cache.New(time.Minute*5, time.Minute*10)
 
-	A = Accesses{
-		PrometheusClient:              promCli,
-		KubeClientSet:                 kubeClientset,
-		ClusterManager:                clusterManager,
-		Cloud:                         cloudProvider,
-		CPUPriceRecorder:              cpuGv,
-		RAMPriceRecorder:              ramGv,
-		GPUPriceRecorder:              gpuGv,
-		NodeTotalPriceRecorder:        totalGv,
-		NodeSpotRecorder:              spotGv,
-		RAMAllocationRecorder:         RAMAllocation,
-		CPUAllocationRecorder:         CPUAllocation,
-		GPUAllocationRecorder:         GPUAllocation,
-		PVAllocationRecorder:          PVAllocation,
-		NetworkZoneEgressRecorder:     NetworkZoneEgressRecorder,
-		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
-		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
-		PersistentVolumePriceRecorder: pvGv,
-		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
-		LBCostRecorder:                LBCostRecorder,
-		Model:                         NewCostModel(k8sCache),
-		OutOfClusterCache:             outOfClusterCache,
-	}
-
 	remoteEnabled := env.IsRemoteEnabled()
 	if remoteEnabled {
 		info, err := cloudProvider.ClusterInfo()
@@ -972,35 +941,21 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	}
 
 	// Thanos Client
+	var thanosClient prometheusClient.Client
 	if thanos.IsEnabled() {
-		thanosUrl := thanos.QueryURL()
-
-		if thanosUrl != "" {
-			var thanosRT http.RoundTripper = &http.Transport{
-				Proxy: http.ProxyFromEnvironment,
-				DialContext: (&net.Dialer{
-					Timeout:   120 * time.Second,
-					KeepAlive: 120 * time.Second,
-				}).DialContext,
-				TLSHandshakeTimeout: 10 * time.Second,
-				TLSClientConfig:     tlsConfig,
-			}
-
-			thanosConfig := prometheusClient.Config{
-				Address:      thanosUrl,
-				RoundTripper: thanosRT,
-			}
+		thanosAddress := thanos.QueryURL()
 
-			thanosCli, _ := prom.NewRateLimitedClient(thanosConfig, queryConcurrency, multiclusterDBBasicAuthUsername, multiclusterDBBasicAuthPW, multiClusterBearerToken, env.GetQueryLoggingFile())
+		if thanosAddress != "" {
+			thanosCli, _ := thanos.NewThanosClient(thanosAddress, timeout, keepAlive, queryConcurrency, env.GetQueryLoggingFile())
 
-			_, err = ValidatePrometheus(thanosCli, true)
+			_, err = prom.Validate(thanosCli)
 			if err != nil {
-				klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosUrl, err.Error())
-				A.ThanosClient = thanosCli
+				klog.V(1).Infof("[Warning] Failed to query Thanos at %s. Error: %s.", thanosAddress, err.Error())
+				thanosClient = thanosCli
 			} else {
-				klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosUrl)
+				klog.V(1).Info("Success: retrieved the 'up' query against Thanos at: " + thanosAddress)
 
-				A.ThanosClient = thanosCli
+				thanosClient = thanosCli
 			}
 
 		} else {
@@ -1008,6 +963,40 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		}
 	}
 
+	// Initialize ClusterMap for maintaining ClusterInfo by ClusterID
+	var clusterMap clusters.ClusterMap
+	if thanosClient != nil {
+		clusterMap = clusters.NewClusterMap(thanosClient, 10*time.Minute)
+	} else {
+		clusterMap = clusters.NewClusterMap(promCli, 5*time.Minute)
+	}
+
+	A = Accesses{
+		PrometheusClient:              promCli,
+		ThanosClient:                  thanosClient,
+		KubeClientSet:                 kubeClientset,
+		ClusterManager:                clusterManager,
+		ClusterMap:                    clusterMap,
+		Cloud:                         cloudProvider,
+		CPUPriceRecorder:              cpuGv,
+		RAMPriceRecorder:              ramGv,
+		GPUPriceRecorder:              gpuGv,
+		NodeTotalPriceRecorder:        totalGv,
+		NodeSpotRecorder:              spotGv,
+		RAMAllocationRecorder:         RAMAllocation,
+		CPUAllocationRecorder:         CPUAllocation,
+		GPUAllocationRecorder:         GPUAllocation,
+		PVAllocationRecorder:          PVAllocation,
+		NetworkZoneEgressRecorder:     NetworkZoneEgressRecorder,
+		NetworkRegionEgressRecorder:   NetworkRegionEgressRecorder,
+		NetworkInternetEgressRecorder: NetworkInternetEgressRecorder,
+		PersistentVolumePriceRecorder: pvGv,
+		ClusterManagementCostRecorder: ClusterManagementCostRecorder,
+		LBCostRecorder:                LBCostRecorder,
+		Model:                         NewCostModel(k8sCache, clusterMap),
+		OutOfClusterCache:             outOfClusterCache,
+	}
+
 	err = A.Cloud.DownloadPricingData()
 	if err != nil {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())
@@ -1028,8 +1017,11 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	Router.GET("/validatePrometheus", A.GetPrometheusMetadata)
 	Router.GET("/managementPlatform", A.ManagementPlatform)
 	Router.GET("/clusterInfo", A.ClusterInfo)
-	Router.GET("/clusters", managerEndpoints.GetAllClusters)
+	Router.GET("/clusterInfoMap", A.GetClusterInfoMap)
 	Router.GET("/serviceAccountStatus", A.GetServiceAccountStatus)
+
+	// cluster manager endpoints
+	Router.GET("/clusters", managerEndpoints.GetAllClusters)
 	Router.PUT("/clusters", managerEndpoints.PutCluster)
 	Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
 }

+ 31 - 0
pkg/log/counter.go

@@ -0,0 +1,31 @@
+package log
+
+import "sync"
+
+type counter struct {
+	mu   sync.RWMutex
+	seen map[string]int
+}
+
+func newCounter() *counter {
+	return &counter{seen: map[string]int{}}
+}
+
+func (ctr *counter) count(key string) int {
+	ctr.mu.RLock()
+	defer ctr.mu.RUnlock()
+	return ctr.seen[key]
+}
+
+func (ctr *counter) delete(key string) {
+	ctr.mu.Lock()
+	delete(ctr.seen, key)
+	ctr.mu.Unlock()
+}
+
+func (ctr *counter) increment(key string) int {
+	ctr.mu.Lock()
+	defer ctr.mu.Unlock()
+	ctr.seen[key]++
+	return ctr.seen[key]
+}

+ 68 - 0
pkg/log/counter_test.go

@@ -0,0 +1,68 @@
+package log
+
+import (
+	"sync"
+	"testing"
+)
+
+func TestCounter_Ops(t *testing.T) {
+	ctr := newCounter()
+
+	var num int
+
+	// Should return 0 if never seen
+	num = ctr.count("something")
+	if num != 0 {
+		t.Fatalf("counter: count: expected %d; found %d", 0, num)
+	}
+
+	// Should return 1 if seen once
+	num = ctr.increment("something")
+	if num != 1 {
+		t.Fatalf("counter: count: expected %d; found %d", 1, num)
+	}
+
+	// Should still return 1 if seen only once
+	num = ctr.count("something")
+	if num != 1 {
+		t.Fatalf("counter: count: expected %d; found %d", 1, num)
+	}
+
+	// Should return 1 if seen once
+	for i := 2; i <= 234; i++ {
+		num = ctr.increment("something")
+		if num != i {
+			t.Fatalf("counter: count: expected %d; found %d", i, num)
+		}
+	}
+
+	ctr.delete("something")
+	num = ctr.count("something")
+	if num != 0 {
+		t.Fatalf("counter: count: expected %d; found %d", 0, num)
+	}
+}
+
+func TestCounter_Threadsafety(t *testing.T) {
+	var wg sync.WaitGroup
+
+	// Run 1000 goroutines, logging 10000 times each as fast as they can
+	for i := 1; i <= 1000; i++ {
+		wg.Add(1)
+		go func() {
+			for j := 1; j <= 10000; j++ {
+				DedupedInfof(10, "this log seen %d times", j)
+			}
+			wg.Done()
+		}()
+	}
+
+	wg.Wait()
+}
+
+func TestDeduping(t *testing.T) {
+	// Should log 10 times, then stop
+	for i := 1; i <= 234; i++ {
+		DedupedInfof(10, "this log seen %d times", i)
+	}
+}

+ 23 - 94
pkg/log/log.go

@@ -7,25 +7,24 @@ import (
 	"k8s.io/klog"
 )
 
-var seen = make(map[string]int)
+// TODO for deduped functions, if timeLogged > logTypeLimit, should we log once
+// every... 100 (?) times so we don't lose track entirely?
+
+// concurrency-safe counter
+var ctr = newCounter()
 
 func Errorf(format string, a ...interface{}) {
 	klog.Errorf(fmt.Sprintf("[Error] %s", format), a...)
 }
 
 func DedupedErrorf(logTypeLimit int, format string, a ...interface{}) {
-	timesLogged, ok := seen[format]
-	if !ok {
-		seen[format] = 1
+	timesLogged := ctr.increment(format)
+
+	if timesLogged < logTypeLimit {
+		Errorf(format, a...)
 	} else if timesLogged == logTypeLimit {
-		seen[format]++
-		f := fmt.Sprintf("[Error] %s", format)
-		klog.Errorf("%s seen %d times, suppressing future logs", f, logTypeLimit)
-	} else if timesLogged > logTypeLimit {
-		seen[format]++
-	} else {
-		seen[format]++
-		klog.Errorf(fmt.Sprintf("[Error] %s", format), a...)
+		Errorf(format, a...)
+		Infof("%s logged %d times: suppressing future logs", format, logTypeLimit)
 	}
 }
 
@@ -34,18 +33,13 @@ func Warningf(format string, a ...interface{}) {
 }
 
 func DedupedWarningf(logTypeLimit int, format string, a ...interface{}) {
-	timesLogged, ok := seen[format]
-	if !ok {
-		seen[format] = 1
+	timesLogged := ctr.increment(format)
+
+	if timesLogged < logTypeLimit {
+		Warningf(format, a...)
 	} else if timesLogged == logTypeLimit {
-		seen[format]++
-		f := fmt.Sprintf("[Warning] %s", format)
-		klog.Errorf("%s seen %d times, suppressing future logs", f, logTypeLimit)
-	} else if timesLogged > logTypeLimit {
-		seen[format]++
-	} else {
-		seen[format]++
-		klog.V(2).Infof(fmt.Sprintf("[Warning] %s", format), a...)
+		Warningf(format, a...)
+		Infof("%s logged %d times: suppressing future logs", format, logTypeLimit)
 	}
 }
 
@@ -54,18 +48,13 @@ func Infof(format string, a ...interface{}) {
 }
 
 func DedupedInfof(logTypeLimit int, format string, a ...interface{}) {
-	timesLogged, ok := seen[format]
-	if !ok {
-		seen[format] = 1
+	timesLogged := ctr.increment(format)
+
+	if timesLogged < logTypeLimit {
+		Infof(format, a...)
 	} else if timesLogged == logTypeLimit {
-		seen[format]++
-		f := fmt.Sprintf("[Info] %s", format)
-		klog.Errorf("%s seen %d times, suppressing future logs", f, logTypeLimit)
-	} else if timesLogged > logTypeLimit {
-		seen[format]++
-	} else {
-		seen[format]++
-		klog.V(3).Infof(fmt.Sprintf("[Info] %s", format), a...)
+		Infof(format, a...)
+		Infof("%s logged %d times: suppressing future logs", format, logTypeLimit)
 	}
 }
 
@@ -88,63 +77,3 @@ func ProfileWithThreshold(start time.Time, threshold time.Duration, name string)
 		Profilef("%s: %s", elapsed, name)
 	}
 }
-
-type Profiler struct {
-	profiles map[string]time.Duration
-	starts   map[string]time.Time
-}
-
-func NewProfiler() *Profiler {
-	return &Profiler{
-		profiles: map[string]time.Duration{},
-		starts:   map[string]time.Time{},
-	}
-}
-
-func (p *Profiler) Start(name string) {
-	if p == nil {
-		return
-	}
-	p.starts[name] = time.Now()
-}
-
-func (p *Profiler) Stop(name string) time.Duration {
-	if p == nil {
-		return 0
-	}
-	if start, ok := p.starts[name]; ok {
-		elapsed := time.Since(start)
-		p.profiles[name] += elapsed
-		return elapsed
-	}
-	return 0
-}
-
-func (p *Profiler) Log(name string) {
-	if p == nil {
-		return
-	}
-	Profilef("%s: %s", p.profiles[name], name)
-}
-
-func (p *Profiler) LogAll() {
-	if p == nil {
-		return
-	}
-
-	// Print profiles, largest to smallest. (Inefficienct, but shouldn't matter.)
-	print := map[string]time.Duration{}
-	for name, value := range p.profiles {
-		print[name] = value
-	}
-	for len(print) > 0 {
-		largest := ""
-		for name := range print {
-			if print[name] > print[largest] {
-				largest = name
-			}
-		}
-		Profilef("%s: %s", print[largest], largest)
-		delete(print, largest)
-	}
-}

+ 63 - 0
pkg/log/profiler.go

@@ -0,0 +1,63 @@
+package log
+
+import "time"
+
+type Profiler struct {
+	profiles map[string]time.Duration
+	starts   map[string]time.Time
+}
+
+func NewProfiler() *Profiler {
+	return &Profiler{
+		profiles: map[string]time.Duration{},
+		starts:   map[string]time.Time{},
+	}
+}
+
+func (p *Profiler) Start(name string) {
+	if p == nil {
+		return
+	}
+	p.starts[name] = time.Now()
+}
+
+func (p *Profiler) Stop(name string) time.Duration {
+	if p == nil {
+		return 0
+	}
+	if start, ok := p.starts[name]; ok {
+		elapsed := time.Since(start)
+		p.profiles[name] += elapsed
+		return elapsed
+	}
+	return 0
+}
+
+func (p *Profiler) Log(name string) {
+	if p == nil {
+		return
+	}
+	Profilef("%s: %s", p.profiles[name], name)
+}
+
+func (p *Profiler) LogAll() {
+	if p == nil {
+		return
+	}
+
+	// Print profiles, largest to smallest. (Inefficienct, but shouldn't matter.)
+	print := map[string]time.Duration{}
+	for name, value := range p.profiles {
+		print[name] = value
+	}
+	for len(print) > 0 {
+		largest := ""
+		for name := range print {
+			if print[name] > print[largest] {
+				largest = name
+			}
+		}
+		Profilef("%s: %s", print[largest], largest)
+		delete(print, largest)
+	}
+}

+ 51 - 0
pkg/prom/error.go

@@ -5,19 +5,70 @@ import (
 	"strings"
 )
 
+// WrapError wraps the given error with the given message, usually for adding
+// context, but persists the existing type of error.
+func WrapError(err error, msg string) error {
+	switch e := err.(type) {
+	case CommError:
+		return e.Wrap(msg)
+	case NoDataError:
+		return e.Wrap(msg)
+	default:
+		return fmt.Errorf("%s: %s", msg, err)
+	}
+}
+
+// CommError describes an error communicating with Prometheus
 type CommError struct {
 	messages []string
 }
 
+// NewCommError creates a new CommError
 func NewCommError(messages ...string) CommError {
 	return CommError{messages: messages}
 }
 
+// IsCommError returns true if the given error is a CommError
+func IsCommError(err error) bool {
+	_, ok := err.(CommError)
+	return ok
+}
+
+// Error prints the error as a string
 func (pce CommError) Error() string {
 	return fmt.Sprintf("Prometheus communication error: %s", strings.Join(pce.messages, ": "))
 }
 
+// Wrap wraps the error with the given message, but persists the error type.
 func (pce CommError) Wrap(message string) CommError {
 	pce.messages = append([]string{message}, pce.messages...)
 	return pce
 }
+
+// NoDataError indicates that no data was returned by Prometheus. This should
+// be treated like an EOF error, in that it may be expected.
+type NoDataError struct {
+	messages []string
+}
+
+// NewNoDataError creates a new NoDataError
+func NewNoDataError(messages ...string) NoDataError {
+	return NoDataError{messages: messages}
+}
+
+// IsNoDataError returns true if the given error is a NoDataError
+func IsNoDataError(err error) bool {
+	_, ok := err.(NoDataError)
+	return ok
+}
+
+// Error prints the error as a string
+func (nde NoDataError) Error() string {
+	return fmt.Sprintf("No data error: %s", strings.Join(nde.messages, ": "))
+}
+
+// Wrap wraps the error with the given message, but persists the error type.
+func (nde NoDataError) Wrap(message string) NoDataError {
+	nde.messages = append([]string{message}, nde.messages...)
+	return nde
+}

+ 47 - 0
pkg/prom/ids.go

@@ -0,0 +1,47 @@
+package prom
+
+import (
+	"strings"
+
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+const (
+	// PrometheusClientID is the identifier used when creating the client that
+	// targets prometheus. This can be used to check a specific client instance
+	// by calling prom.IsClientID(client, prom.PrometheusClientID)
+	PrometheusClientID string = "Prometheus"
+
+	// ThanosClientID is the identifier used when creating the client that
+	// targets thanos. This can be used to check a specific client instance
+	// by calling prom.IsClientID(client, prom.ThanosClientID)
+	ThanosClientID string = "Thanos"
+)
+
+// identityClient provides an interface for extracting an indentifer from the client objects
+type identityClient interface {
+	ID() string
+}
+
+// IsClientID returns true if the client has an identifier of the specific type.
+func IsClientID(cli prometheus.Client, id string) bool {
+	if cli == nil {
+		return false
+	}
+
+	if idClient, ok := cli.(identityClient); ok {
+		return strings.EqualFold(idClient.ID(), id)
+	}
+
+	return false
+}
+
+// IsPrometheus returns true if the client provided is used to target prometheus
+func IsPrometheus(cli prometheus.Client) bool {
+	return IsClientID(cli, PrometheusClientID)
+}
+
+// IsThanos returns true if the client provided is used to target thanos
+func IsThanos(cli prometheus.Client) bool {
+	return IsClientID(cli, ThanosClientID)
+}

+ 109 - 48
pkg/prom/prom.go

@@ -2,29 +2,62 @@ package prom
 
 import (
 	"context"
+	"crypto/tls"
+	"net"
 	"net/http"
 	"net/url"
 	"os"
 	"time"
 
-	golog "log"
-
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
+
+	golog "log"
+
 	prometheus "github.com/prometheus/client_golang/api"
 )
 
-// Creates a new prometheus client which limits the total number of concurrent outbound requests
-// allowed at a given moment.
+//--------------------------------------------------------------------------
+//  ClientAuth
+//--------------------------------------------------------------------------
+
+// ClientAuth is used to authenticate outgoing client requests.
+type ClientAuth struct {
+	Username    string
+	Password    string
+	BearerToken string
+}
+
+// Apply Applies the authentication data to the request headers
+func (auth *ClientAuth) Apply(req *http.Request) {
+	if auth == nil {
+		return
+	}
+
+	if auth.Username != "" {
+		req.SetBasicAuth(auth.Username, auth.Password)
+	}
+
+	if auth.BearerToken != "" {
+		token := "Bearer " + auth.BearerToken
+		req.Header.Add("Authorization", token)
+	}
+}
+
+//--------------------------------------------------------------------------
+//  RateLimitedPrometheusClient
+//--------------------------------------------------------------------------
+
+// RateLimitedPrometheusClient is a prometheus client which limits the total number of
+// concurrent outbound requests allowed at a given moment.
 type RateLimitedPrometheusClient struct {
-	client      prometheus.Client
-	limiter     *util.Semaphore
-	queue       util.BlockingQueue
-	outbound    *util.AtomicInt32
-	username    string
-	password    string
-	bearerToken string
-	fileLogger  *golog.Logger
+	id         string
+	client     prometheus.Client
+	auth       *ClientAuth
+	queue      util.BlockingQueue
+	outbound   *util.AtomicInt32
+	fileLogger *golog.Logger
 }
 
 // requestCounter is used to determine if the prometheus client keeps track of
@@ -36,7 +69,7 @@ type requestCounter interface {
 
 // NewRateLimitedClient creates a prometheus client which limits the number of concurrent outbound
 // prometheus requests.
-func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username, password, bearerToken string, queryLogFile string) (prometheus.Client, error) {
+func NewRateLimitedClient(id string, config prometheus.Config, maxConcurrency int, auth *ClientAuth, queryLogFile string) (prometheus.Client, error) {
 	c, err := prometheus.NewClient(config)
 	if err != nil {
 		return nil, err
@@ -61,13 +94,12 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	}
 
 	rlpc := &RateLimitedPrometheusClient{
-		client:      c,
-		queue:       queue,
-		outbound:    outbound,
-		username:    username,
-		password:    password,
-		bearerToken: bearerToken,
-		fileLogger:  logger,
+		id:         id,
+		client:     c,
+		queue:      queue,
+		outbound:   outbound,
+		auth:       auth,
+		fileLogger: logger,
 	}
 
 	// Start concurrent request processing
@@ -78,27 +110,9 @@ func NewRateLimitedClient(config prometheus.Config, maxConcurrency int, username
 	return rlpc, nil
 }
 
-// LogPrometheusClientState logs the current state, with respect to outbound requests, if that
-// information is available.
-func LogPrometheusClientState(client prometheus.Client) {
-	if rc, ok := client.(requestCounter); ok {
-		total := rc.TotalRequests()
-		outbound := rc.TotalOutboundRequests()
-		queued := total - outbound
-
-		log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
-	}
-}
-
-// LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
-func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
-	if l == nil {
-		return
-	}
-	qp := util.NewQueryParams(req.URL.Query())
-	query := qp.Get("query", "<Unknown>")
-
-	l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
+// ID is used to identify the type of client
+func (rlpc *RateLimitedPrometheusClient) ID() string {
+	return rlpc.id
 }
 
 // TotalRequests returns the total number of requests that are either waiting to be sent and/or
@@ -180,13 +194,7 @@ func (rlpc *RateLimitedPrometheusClient) worker() {
 
 // Rate limit and passthrough to prometheus client API
 func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Request) (*http.Response, []byte, prometheus.Warnings, error) {
-	if rlpc.username != "" {
-		req.SetBasicAuth(rlpc.username, rlpc.password)
-	}
-	if rlpc.bearerToken != "" {
-		token := "Bearer " + rlpc.bearerToken
-		req.Header.Add("Authorization", token)
-	}
+	rlpc.auth.Apply(req)
 
 	respChan := make(chan *workResponse)
 	defer close(respChan)
@@ -202,3 +210,56 @@ func (rlpc *RateLimitedPrometheusClient) Do(ctx context.Context, req *http.Reque
 	workRes := <-respChan
 	return workRes.res, workRes.body, workRes.warnings, workRes.err
 }
+
+//--------------------------------------------------------------------------
+//  Client Helpers
+//--------------------------------------------------------------------------
+
+func NewPrometheusClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
+	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
+
+	// may be necessary for long prometheus queries. TODO: make this configurable
+	pc := prometheus.Config{
+		Address: address,
+		RoundTripper: &http.Transport{
+			Proxy: http.ProxyFromEnvironment,
+			DialContext: (&net.Dialer{
+				Timeout:   timeout,
+				KeepAlive: keepAlive,
+			}).DialContext,
+			TLSHandshakeTimeout: 10 * time.Second,
+			TLSClientConfig:     tlsConfig,
+		},
+	}
+
+	auth := &ClientAuth{
+		Username:    env.GetDBBasicAuthUsername(),
+		Password:    env.GetDBBasicAuthUserPassword(),
+		BearerToken: env.GetDBBearerToken(),
+	}
+
+	return NewRateLimitedClient(PrometheusClientID, pc, queryConcurrency, auth, queryLogFile)
+}
+
+// LogPrometheusClientState logs the current state, with respect to outbound requests, if that
+// information is available.
+func LogPrometheusClientState(client prometheus.Client) {
+	if rc, ok := client.(requestCounter); ok {
+		total := rc.TotalRequests()
+		outbound := rc.TotalOutboundRequests()
+		queued := total - outbound
+
+		log.Infof("Outbound Requests: %d, Queued Requests: %d, Total Requests: %d", outbound, queued, total)
+	}
+}
+
+// LogQueryRequest logs the query that was send to prom/thanos with the time in queue and total time after being sent
+func LogQueryRequest(l *golog.Logger, req *http.Request, queueTime time.Duration, sendTime time.Duration) {
+	if l == nil {
+		return
+	}
+	qp := util.NewQueryParams(req.URL.Query())
+	query := qp.Get("query", "<Unknown>")
+
+	l.Printf("[Queue: %fs, Outbound: %fs][Query: %s]\n", queueTime.Seconds(), sendTime.Seconds(), query)
+}

+ 5 - 6
pkg/prom/query.go

@@ -13,7 +13,6 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
-	"k8s.io/klog"
 )
 
 const (
@@ -152,20 +151,20 @@ func (ctx *Context) query(query string) (interface{}, error) {
 
 	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
 	for _, w := range warnings {
-		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+		log.Warningf("fetching query '%s': %s", query, w)
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+			return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
 		}
 
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("query error %d: '%s' fetching query '%s'", resp.StatusCode, err.Error(), query)
 	}
 
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		return nil, fmt.Errorf("query error: '%s' fetching query '%s'", err.Error(), query)
 	}
 
 	return toReturn, nil
@@ -243,7 +242,7 @@ func (ctx *Context) queryRange(query string, start, end time.Time, step time.Dur
 
 	resp, body, warnings, err := ctx.Client.Do(context.Background(), req)
 	for _, w := range warnings {
-		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+		log.Warningf("fetching query '%s': %s", query, w)
 	}
 	if err != nil {
 		if resp == nil {

+ 64 - 31
pkg/prom/result.go

@@ -1,7 +1,6 @@
 package prom
 
 import (
-	"errors"
 	"fmt"
 	"math"
 	"strconv"
@@ -15,22 +14,56 @@ var (
 	// Static Warnings for data point parsing
 	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
 	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
-
-	// Static Errors for query result parsing
-	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
-	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
-	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
-	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
-	NoDataErr                  error = errors.New("No data")
-	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
-	QueryResultNilErr          error = NewCommError("nil queryResult")
-	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
-	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
-	ResultFormatErr            error = errors.New("Result is improperly formatted")
-	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
-	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
 )
 
+func DataFieldFormatErr(query string) error {
+	return fmt.Errorf("Data field improperly formatted in prometheus repsonse fetching query '%s'", query)
+}
+
+func DataPointFormatErr(query string) error {
+	return fmt.Errorf("Improperly formatted datapoint from Prometheus fetching query '%s'", query)
+}
+
+func MetricFieldDoesNotExistErr(query string) error {
+	return fmt.Errorf("Metric field does not exist in data result vector fetching query '%s'", query)
+}
+
+func MetricFieldFormatErr(query string) error {
+	return fmt.Errorf("Metric field is improperly formatted fetching query '%s'", query)
+}
+
+func NoDataErr(query string) error {
+	return NewNoDataError(query)
+}
+
+func PromUnexpectedResponseErr(query string) error {
+	return fmt.Errorf("Unexpected response from Prometheus fetching query '%s'", query)
+}
+
+func QueryResultNilErr(query string) error {
+	return NewCommError(query)
+}
+
+func ResultFieldDoesNotExistErr(query string) error {
+	return fmt.Errorf("Result field not does not exist in prometheus response fetching query '%s'", query)
+}
+
+func ResultFieldFormatErr(query string) error {
+	return fmt.Errorf("Result field improperly formatted in prometheus response fetching query '%s'", query)
+}
+
+func ResultFormatErr(query string) error {
+	return fmt.Errorf("Result is improperly formatted fetching query '%s'", query)
+}
+
+func ValueFieldDoesNotExistErr(query string) error {
+	return fmt.Errorf("Value field does not exist in data result vector fetching query '%s'", query)
+}
+
+func ValueFieldFormatErr(query string) error {
+	return fmt.Errorf("Values field is improperly formatted fetching query '%s'", query)
+}
+
 // QueryResultsChan is a channel of query results
 type QueryResultsChan chan *QueryResults
 
@@ -67,13 +100,13 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 	qrs := &QueryResults{Query: query}
 
 	if queryResult == nil {
-		qrs.Error = QueryResultNilErr
+		qrs.Error = QueryResultNilErr(query)
 		return qrs
 	}
 
 	data, ok := queryResult.(map[string]interface{})["data"]
 	if !ok {
-		e, err := wrapPrometheusError(queryResult)
+		e, err := wrapPrometheusError(query, queryResult)
 		if err != nil {
 			qrs.Error = err
 			return qrs
@@ -85,17 +118,17 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 	// Deep Check for proper formatting
 	d, ok := data.(map[string]interface{})
 	if !ok {
-		qrs.Error = DataFieldFormatErr
+		qrs.Error = DataFieldFormatErr(query)
 		return qrs
 	}
 	resultData, ok := d["result"]
 	if !ok {
-		qrs.Error = ResultFieldDoesNotExistErr
+		qrs.Error = ResultFieldDoesNotExistErr(query)
 		return qrs
 	}
 	resultsData, ok := resultData.([]interface{})
 	if !ok {
-		qrs.Error = ResultFieldFormatErr
+		qrs.Error = ResultFieldFormatErr(query)
 		return qrs
 	}
 
@@ -106,18 +139,18 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 	for _, val := range resultsData {
 		resultInterface, ok := val.(map[string]interface{})
 		if !ok {
-			qrs.Error = ResultFormatErr
+			qrs.Error = ResultFormatErr(query)
 			return qrs
 		}
 
 		metricInterface, ok := resultInterface["metric"]
 		if !ok {
-			qrs.Error = MetricFieldDoesNotExistErr
+			qrs.Error = MetricFieldDoesNotExistErr(query)
 			return qrs
 		}
 		metricMap, ok := metricInterface.(map[string]interface{})
 		if !ok {
-			qrs.Error = MetricFieldFormatErr
+			qrs.Error = MetricFieldFormatErr(query)
 			return qrs
 		}
 
@@ -132,12 +165,12 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 		if !isRange {
 			dataPoint, ok := resultInterface["value"]
 			if !ok {
-				qrs.Error = ValueFieldDoesNotExistErr
+				qrs.Error = ValueFieldDoesNotExistErr(query)
 				return qrs
 			}
 
 			// Append new data point, log warnings
-			v, warn, err := parseDataPoint(dataPoint)
+			v, warn, err := parseDataPoint(query, dataPoint)
 			if err != nil {
 				qrs.Error = err
 				return qrs
@@ -156,7 +189,7 @@ func NewQueryResults(query string, queryResult interface{}) *QueryResults {
 
 			// Append new data points, log warnings
 			for _, value := range values {
-				v, warn, err := parseDataPoint(value)
+				v, warn, err := parseDataPoint(query, value)
 				if err != nil {
 					qrs.Error = err
 					return qrs
@@ -222,12 +255,12 @@ func (qr *QueryResult) GetLabels() map[string]string {
 
 // parseDataPoint parses a data point from raw prometheus query results and returns
 // a new Vector instance containing the parsed data along with any warnings or errors.
-func parseDataPoint(dataPoint interface{}) (*util.Vector, warning, error) {
+func parseDataPoint(query string, dataPoint interface{}) (*util.Vector, warning, error) {
 	var w warning = nil
 
 	value, ok := dataPoint.([]interface{})
 	if !ok || len(value) != 2 {
-		return nil, w, DataPointFormatErr
+		return nil, w, DataPointFormatErr(query)
 	}
 
 	strVal := value[1].(string)
@@ -260,11 +293,11 @@ func labelsForMetric(metricMap map[string]interface{}) string {
 	return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
 }
 
-func wrapPrometheusError(qr interface{}) (string, error) {
+func wrapPrometheusError(query string, qr interface{}) (string, error) {
 	e, ok := qr.(map[string]interface{})["error"]
 	if !ok {
-		return "", PromUnexpectedResponseErr
+		return "", PromUnexpectedResponseErr(query)
 	}
 	eStr, ok := e.(string)
-	return eStr, nil
+	return fmt.Sprintf("'%s' parsing query '%s'", eStr, query), nil
 }

+ 72 - 0
pkg/prom/validate.go

@@ -0,0 +1,72 @@
+package prom
+
+import (
+	"fmt"
+
+	"github.com/kubecost/cost-model/pkg/env"
+
+	prometheus "github.com/prometheus/client_golang/api"
+)
+
+var (
+	prometheusValidateQuery string = "up"
+	thanosValidateQuery     string = fmt.Sprintf("up offset %s", env.GetThanosOffset())
+)
+
+// PrometheusMetadata represents a validation result for prometheus/thanos running
+// kubecost.
+type PrometheusMetadata struct {
+	Running            bool `json:"running"`
+	KubecostDataExists bool `json:"kubecostDataExists"`
+}
+
+// Validate tells the model what data prometheus has on it.
+func Validate(cli prometheus.Client) (*PrometheusMetadata, error) {
+	if IsThanos(cli) {
+		return validate(cli, thanosValidateQuery)
+	}
+
+	return validate(cli, prometheusValidateQuery)
+}
+
+// validate executes the prometheus query against the provided client.
+func validate(cli prometheus.Client, q string) (*PrometheusMetadata, error) {
+	ctx := NewContext(cli)
+
+	resUp, err := ctx.QuerySync(q)
+	if err != nil {
+		return &PrometheusMetadata{
+			Running:            false,
+			KubecostDataExists: false,
+		}, err
+	}
+
+	if len(resUp) == 0 {
+		return &PrometheusMetadata{
+			Running:            false,
+			KubecostDataExists: false,
+		}, fmt.Errorf("no running jobs on Prometheus at %s", ctx.QueryURL().Path)
+	}
+
+	for _, result := range resUp {
+		job, err := result.GetString("job")
+		if err != nil {
+			return &PrometheusMetadata{
+				Running:            false,
+				KubecostDataExists: false,
+			}, fmt.Errorf("up query does not have job names")
+		}
+
+		if job == "kubecost" {
+			return &PrometheusMetadata{
+				Running:            true,
+				KubecostDataExists: true,
+			}, err
+		}
+	}
+
+	return &PrometheusMetadata{
+		Running:            true,
+		KubecostDataExists: false,
+	}, nil
+}

+ 31 - 0
pkg/thanos/thanos.go

@@ -1,11 +1,17 @@
 package thanos
 
 import (
+	"crypto/tls"
 	"fmt"
+	"net"
+	"net/http"
 	"sync"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/prom"
+
+	prometheus "github.com/prometheus/client_golang/api"
 )
 
 var (
@@ -53,3 +59,28 @@ func OffsetDuration() time.Duration {
 func QueryOffset() string {
 	return queryOffset
 }
+
+func NewThanosClient(address string, timeout, keepAlive time.Duration, queryConcurrency int, queryLogFile string) (prometheus.Client, error) {
+	tlsConfig := &tls.Config{InsecureSkipVerify: env.GetInsecureSkipVerify()}
+
+	tc := prometheus.Config{
+		Address: address,
+		RoundTripper: &http.Transport{
+			Proxy: http.ProxyFromEnvironment,
+			DialContext: (&net.Dialer{
+				Timeout:   timeout,
+				KeepAlive: keepAlive,
+			}).DialContext,
+			TLSHandshakeTimeout: 10 * time.Second,
+			TLSClientConfig:     tlsConfig,
+		},
+	}
+
+	auth := &prom.ClientAuth{
+		Username:    env.GetMultiClusterBasicAuthUsername(),
+		Password:    env.GetMultiClusterBasicAuthPassword(),
+		BearerToken: env.GetMultiClusterBearerToken(),
+	}
+
+	return prom.NewRateLimitedClient(prom.ThanosClientID, tc, queryConcurrency, auth, queryLogFile)
+}

+ 3 - 0
pkg/util/blockingqueue.go

@@ -63,6 +63,9 @@ func (q *blockingSliceQueue) Dequeue() interface{} {
 	}
 
 	e := q.q[0]
+
+	// nil 0 index to prevent leak
+	q.q[0] = nil
 	q.q = q.q[1:]
 	return e
 }