Sfoglia il codice sorgente

fix merge conflicts, add variables to env

Ajay Tripathy 5 anni fa
parent
commit
19a3baaa7a

+ 22 - 28
pkg/cloud/awsprovider.go

@@ -19,6 +19,7 @@ import (
 	"k8s.io/klog"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
@@ -37,8 +38,6 @@ import (
 	v1 "k8s.io/api/core/v1"
 )
 
-const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
-const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
 const awsReservedInstancePricePerHour = 0.0287
 const supportedSpotFeedVersion = "1"
 const SpotInfoUpdateType = "spotinfo"
@@ -386,9 +385,8 @@ func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			}
 		}
 
-		remoteEnabled := os.Getenv(remoteEnabled)
-		if remoteEnabled == "true" {
-			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+		if env.IsRemoteEnabled() {
+			err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
 			if err != nil {
 				return err
 			}
@@ -966,17 +964,13 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		return nil, err
 	}
 
-	remote := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if os.Getenv(remote) == "true" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled()
 
 	if c.ClusterName != "" {
 		m := make(map[string]string)
 		m["name"] = c.ClusterName
 		m["provider"] = "AWS"
-		m["id"] = os.Getenv(clusterIDKey)
+		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		return m, nil
 	}
@@ -985,12 +979,12 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m := make(map[string]string)
 		m["name"] = clusterName
 		m["provider"] = "AWS"
-		m["id"] = os.Getenv(clusterIDKey)
+		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		return m, nil
 	}
 
-	maybeClusterId := os.Getenv(ClusterIdEnvVar)
+	maybeClusterId := env.GetAWSClusterID()
 	if len(maybeClusterId) != 0 {
 		return makeStructure(maybeClusterId)
 	}
@@ -1049,7 +1043,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 			}
 		}
 	}*/
-	klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
+	klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
 	return makeStructure(defaultClusterName)
 }
 
@@ -1067,7 +1061,7 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string)
 	}
 
 	// 3. Fall back to env vars
-	return os.Getenv(awsAccessKeyIDEnvVar), os.Getenv(awsAccessKeySecretEnvVar)
+	return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
 }
 
 // Load once and cache the result (even on failure). This is an install time secret, so
@@ -1103,11 +1097,11 @@ func (aws *AWS) configureAWSAuth() error {
 	accessKeyID := aws.ServiceKeyName
 	accessKeySecret := aws.ServiceKeySecret
 	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
-		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
+		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
 		if err != nil {
 			return err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
 		if err != nil {
 			return err
 		}
@@ -1138,7 +1132,7 @@ func getClusterConfig(ccFile string) (map[string]string, error) {
 // a new AWS Session are set.
 func (a *AWS) SetKeyEnv() error {
 	// TODO add this to the helm chart, mirroring the cost-model
-	// configPath := os.Getenv("CONFIG_PATH")
+	// configPath := env.GetConfigPath()
 	configPath := defaultConfigPath
 	path := configPath + "aws.json"
 
@@ -1165,8 +1159,8 @@ func (a *AWS) SetKeyEnv() error {
 	keySecret := configMap["awsServiceKeySecret"]
 
 	// These are required before calling NewEnvCredentials below
-	os.Setenv("AWS_ACCESS_KEY_ID", keyName)
-	os.Setenv("AWS_SECRET_ACCESS_KEY", keySecret)
+	env.Set(env.AWSAccessKeyIDEnvVar, keyName)
+	env.Set(env.AWSAccessKeySecretEnvVar, keySecret)
 
 	return nil
 }
@@ -1420,11 +1414,11 @@ func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutpu
 		return nil, err
 	}
 	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
 		if err != nil {
 			return nil, err
 		}
@@ -1612,11 +1606,11 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 	klog.V(3).Infof("Running Query: %s", query)
 
 	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
 		if err != nil {
 			return nil, err
 		}
@@ -1724,11 +1718,11 @@ func (a *AWS) QuerySQL(query string) ([]byte, error) {
 		return nil, err
 	}
 	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
 		if err != nil {
 			return nil, err
 		}
@@ -1850,11 +1844,11 @@ func (f fnames) Less(i, j int) bool {
 func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
 	// credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
 	if accessKeyID != "" && accessKeySecret != "" {
-		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
+		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
 		if err != nil {
 			return nil, err
 		}

+ 5 - 10
pkg/cloud/azureprovider.go

@@ -6,13 +6,13 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"os"
 	"regexp"
 	"strconv"
 	"strings"
 	"sync"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
 
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-09-01/skus"
@@ -672,11 +672,7 @@ func (*Azure) GetDisks() ([]byte, error) {
 }
 
 func (az *Azure) ClusterInfo() (map[string]string, error) {
-	remote := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if os.Getenv(remote) == "true" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled()
 
 	m := make(map[string]string)
 	m["name"] = "Azure Cluster #1"
@@ -689,7 +685,7 @@ func (az *Azure) ClusterInfo() (map[string]string, error) {
 	}
 	m["provider"] = "azure"
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
-	m["id"] = os.Getenv(clusterIDKey)
+	m["id"] = env.GetClusterID()
 	return m, nil
 
 }
@@ -725,9 +721,8 @@ func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, e
 			}
 		}
 
-		remoteEnabled := os.Getenv(remoteEnabled)
-		if remoteEnabled == "true" {
-			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+		if env.IsRemoteEnabled() {
+			err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
 			if err != nil {
 				return err
 			}

+ 3 - 1
pkg/cloud/csvprovider.go

@@ -10,6 +10,8 @@ import (
 	"sync"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/env"
+
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/s3"
@@ -60,7 +62,7 @@ func (c *CSVProvider) DownloadPricingData() error {
 	var csvr io.Reader
 	var csverr error
 	if strings.HasPrefix(c.CSVLocation, "s3://") {
-		region := os.Getenv("CSV_REGION")
+		region := env.GetCSVRegion()
 		conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
 		s3Client := s3.New(session.New(conf))
 		bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")

+ 3 - 2
pkg/cloud/customprovider.go

@@ -3,12 +3,13 @@ package cloud
 import (
 	"encoding/json"
 	"io"
-	"os"
 	"strconv"
 	"strings"
 	"sync"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
+
 	v1 "k8s.io/api/core/v1"
 )
 
@@ -106,7 +107,7 @@ func (cp *CustomProvider) ClusterInfo() (map[string]string, error) {
 		m["name"] = conf.ClusterName
 	}
 	m["provider"] = "custom"
-	m["id"] = os.Getenv(clusterIDKey)
+	m["id"] = env.GetClusterID()
 	return m, nil
 }
 

+ 10 - 18
pkg/cloud/gcpprovider.go

@@ -8,7 +8,6 @@ import (
 	"io/ioutil"
 	"math"
 	"net/http"
-	"os"
 	"regexp"
 	"strconv"
 	"strings"
@@ -19,8 +18,11 @@ import (
 
 	"cloud.google.com/go/bigquery"
 	"cloud.google.com/go/compute/metadata"
+
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"
 	compute "google.golang.org/api/compute/v1"
@@ -185,10 +187,7 @@ func (gcp *GCP) GetManagementPlatform() (string, error) {
 
 // Attempts to load a GCP auth secret and copy the contents to the key file.
 func (*GCP) loadGCPAuthSecret() {
-	path := os.Getenv("CONFIG_PATH")
-	if path == "" {
-		path = "/models/"
-	}
+	path := env.GetConfigPathWithDefault("/models/")
 
 	keyPath := path + "key.json"
 	keyExists, _ := util.FileExists(keyPath)
@@ -242,10 +241,7 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 					return err
 				}
 
-				path := os.Getenv("CONFIG_PATH")
-				if path == "" {
-					path = "/models/"
-				}
+				path := env.GetConfigPathWithDefault("/models/")
 
 				keyPath := path + "key.json"
 				err = ioutil.WriteFile(keyPath, j, 0644)
@@ -292,9 +288,8 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			}
 		}
 
-		remoteEnabled := os.Getenv(remoteEnabled)
-		if remoteEnabled == "true" {
-			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+		if env.IsRemoteEnabled() {
+			err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
 			if err != nil {
 				return err
 			}
@@ -469,11 +464,8 @@ func (gcp *GCP) QuerySQL(query string) ([]*OutOfClusterAllocation, error) {
 
 // ClusterName returns the name of a GKE cluster, as provided by metadata.
 func (gcp *GCP) ClusterInfo() (map[string]string, error) {
-	remote := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if os.Getenv(remote) == "true" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled()
+
 	metadataClient := metadata.NewClient(&http.Client{Transport: userAgentTransport{
 		userAgent: "kubecost",
 		base:      http.DefaultTransport,
@@ -495,7 +487,7 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = "GCP"
-	m["id"] = os.Getenv(clusterIDKey)
+	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 	return m, nil
 }

+ 13 - 16
pkg/cloud/provider.go

@@ -5,21 +5,18 @@ import (
 	"errors"
 	"fmt"
 	"io"
-	"os"
 	"strings"
 
 	"k8s.io/klog"
 
 	"cloud.google.com/go/compute/metadata"
+
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 
 	v1 "k8s.io/api/core/v1"
 )
 
-const clusterIDKey = "CLUSTER_ID"
-const remoteEnabled = "REMOTE_WRITE_ENABLED"
-const remotePW = "REMOTE_WRITE_PASSWORD"
-const sqlAddress = "SQL_ADDRESS"
 const authSecretPath = "/var/secrets/service-key.json"
 
 var createTableStatements = []string{
@@ -186,12 +183,12 @@ type Provider interface {
 func ClusterName(p Provider) string {
 	info, err := p.ClusterInfo()
 	if err != nil {
-		return os.Getenv(clusterIDKey)
+		return env.GetClusterID()
 	}
 
 	name, ok := info["name"]
 	if !ok {
-		return os.Getenv(clusterIDKey)
+		return env.GetClusterID()
 	}
 
 	return name
@@ -238,8 +235,8 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 
 	provider := strings.ToLower(nodes[0].Spec.ProviderID)
 
-	if os.Getenv("USE_CSV_PROVIDER") == "true" {
-		klog.Infof("Using CSV Provider with CSV at %s", os.Getenv("CSV_PATH"))
+	if env.IsUseCSVProvider() {
+		klog.Infof("Using CSV Provider with CSV at %s", env.GetCSVPath())
 		configFileName := ""
 		if metadata.OnGCE() {
 			configFileName = "gcp.json"
@@ -252,7 +249,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 			configFileName = "default.json"
 		}
 		return &CSVProvider{
-			CSVLocation: os.Getenv("CSV_PATH"),
+			CSVLocation: env.GetCSVPath(),
 			CustomProvider: &CustomProvider{
 				Clientset: cache,
 				Config:    NewProviderConfig(configFileName),
@@ -293,8 +290,8 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 }
 
 func UpdateClusterMeta(cluster_id, cluster_name string) error {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	if err != nil {
@@ -310,8 +307,8 @@ func UpdateClusterMeta(cluster_id, cluster_name string) error {
 }
 
 func CreateClusterMeta(cluster_id, cluster_name string) error {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	if err != nil {
@@ -333,8 +330,8 @@ func CreateClusterMeta(cluster_id, cluster_name string) error {
 }
 
 func GetClusterMeta(cluster_id string) (string, string, error) {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	defer db.Close()

+ 2 - 5
pkg/cloud/providerconfig.go

@@ -4,12 +4,12 @@ import (
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
-	"os"
 	"reflect"
 	"strconv"
 	"strings"
 	"sync"
 
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
 
 	"k8s.io/klog"
@@ -210,9 +210,6 @@ func fileExists(filename string) (bool, error) {
 
 // Returns the configuration directory concatenated with a specific config file name
 func configPathFor(filename string) string {
-	path := os.Getenv("CONFIG_PATH")
-	if path == "" {
-		path = "/models/"
-	}
+	path := env.GetConfigPathWithDefault("/models/")
 	return path + filename
 }

+ 2 - 2
pkg/clustercache/clustercache.go

@@ -1,9 +1,9 @@
 package clustercache
 
 import (
-	"os"
 	"sync"
 
+	"github.com/kubecost/cost-model/pkg/env"
 	"k8s.io/klog"
 
 	appsv1 "k8s.io/api/apps/v1"
@@ -88,7 +88,7 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 
-	kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
+	kubecostNamespace := env.GetKubecostNamespace()
 	klog.Infof("NAMESPACE: %s", kubecostNamespace)
 
 	kcc := &KubernetesClusterCache{

+ 7 - 30
pkg/costmodel/cluster.go

@@ -2,14 +2,13 @@ package costmodel
 
 import (
 	"fmt"
-	"os"
-	"sync"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/cloud"
-	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
+
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
@@ -38,31 +37,6 @@ const (
 	queryNodes = `sum(avg(node_total_hourly_cost) by (node, cluster_id)) * 730 %s`
 )
 
-// TODO move this to a package-accessible helper
-type PromQueryContext struct {
-	Client         prometheus.Client
-	ErrorCollector *errors.ErrorCollector
-	WaitGroup      *sync.WaitGroup
-}
-
-// TODO move this to a package-accessible helper function once dependencies are able to
-// be extricated from costmodel package (PromQueryResult -> util.Vector). Otherwise, circular deps.
-func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
-	if ctx.WaitGroup != nil {
-		defer ctx.WaitGroup.Done()
-	}
-
-	defer errors.HandlePanic()
-
-	raw, promErr := Query(ctx.Client, query)
-	ctx.ErrorCollector.Report(promErr)
-
-	results, parseErr := NewQueryResults(raw)
-	ctx.ErrorCollector.Report(parseErr)
-
-	resultCh <- results
-}
-
 // Costs represents cumulative and monthly cluster costs over a given duration. Costs
 // are broken down by cores, memory, and storage.
 type ClusterCosts struct {
@@ -242,7 +216,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		resChs = append(resChs, bdResChs...)
 	}
 
-	defaultClusterID := os.Getenv(clusterIDKey)
+	defaultClusterID := env.GetClusterID()
 
 	dataMinsByCluster := map[string]float64{}
 	for _, result := range resChs[0].Await() {
@@ -412,11 +386,14 @@ type Totals struct {
 }
 
 func resultToTotals(qr interface{}) ([][]string, error) {
-	results, err := NewQueryResults(qr)
+	// TODO: Provide an actual query instead of resultToTotals
+	qResults, err := prom.NewQueryResults("resultToTotals", qr)
 	if err != nil {
 		return nil, err
 	}
 
+	results := qResults.Results
+
 	if len(results) == 0 {
 		return [][]string{}, fmt.Errorf("Not enough data available in the selected time range")
 	}

+ 212 - 0
pkg/costmodel/containerkeys.go

@@ -0,0 +1,212 @@
+package costmodel
+
+import (
+	"errors"
+	"strings"
+
+	"github.com/kubecost/cost-model/pkg/log"
+	"k8s.io/api/core/v1"
+)
+
+var (
+	// Static KeyTuple Errors
+	NewKeyTupleErr = errors.New("NewKeyTuple() Provided key not containing exactly 3 components.")
+
+	// Static Errors for ContainerMetric creation
+	InvalidKeyErr error = errors.New("Not a valid key")
+	NoContainerErr error = errors.New("Prometheus vector does not have container name")
+	NoContainerNameErr error = errors.New("Prometheus vector does not have string container name")
+	NoPodErr error = errors.New("Prometheus vector does not have pod name")
+	NoPodNameErr error = errors.New("Prometheus vector does not have string pod name")
+	NoNamespaceErr error = errors.New("Prometheus vector does not have namespace")
+	NoNamespaceNameErr error = errors.New("Prometheus vector does not have string namespace")
+	NoNodeNameErr error = errors.New("Prometheus vector does not have string node")
+	NoClusterIDErr error = errors.New("Prometheus vector does not have string cluster_id")
+)
+
+//--------------------------------------------------------------------------
+//  KeyTuple
+//--------------------------------------------------------------------------
+
+// KeyTuple contains is a utility which parses Namespace, Key, and ClusterID from a 
+// comma delimitted string.
+type KeyTuple struct {
+	key    string
+	kIndex int
+	cIndex int
+}
+
+// Namespace returns the the namespace from the string key.
+func (kt *KeyTuple) Namespace() string {
+	return kt.key[0 : kt.kIndex-1]
+}
+
+// Key returns the identifier from the string key.
+func (kt *KeyTuple) Key() string {
+	return kt.key[kt.kIndex : kt.cIndex-1]
+}
+
+// ClusterID returns the cluster identifier from the string key.
+func (kt *KeyTuple) ClusterID() string {
+	return kt.key[kt.cIndex:]
+}
+
+// NewKeyTuple creates a new KeyTuple instance by determining the exact indices of each tuple
+// entry. When each component is requested, a string slice is returned using the boundaries.
+func NewKeyTuple(key string) (*KeyTuple, error) {
+	kIndex := strings.IndexRune(key, ',')
+	if kIndex < 0 {
+		return nil, NewKeyTupleErr
+	}
+	kIndex += 1
+
+	subIndex := strings.IndexRune(key[kIndex:], ',')
+	if subIndex < 0 {
+		return nil, NewKeyTupleErr
+	}
+	cIndex := kIndex + subIndex + 1
+
+	if strings.ContainsRune(key[cIndex:], ',') {
+		return nil, NewKeyTupleErr
+	}
+
+	return &KeyTuple{
+		key:    key,
+		kIndex: kIndex,
+		cIndex: cIndex,
+	}, nil
+}
+
+//--------------------------------------------------------------------------
+//  ContainerMetric
+//--------------------------------------------------------------------------
+
+// ContainerMetric contains a set of identifiers specific to a kubernetes container including
+// a unique string key
+type ContainerMetric struct {
+	Namespace     string
+	PodName       string
+	ContainerName string
+	NodeName      string
+	ClusterID     string
+	key           string
+}
+
+// Key returns a unique string key that can be used in map[string]interface{}
+func (c *ContainerMetric) Key() string {
+	return c.key
+}
+
+// containerMetricKey creates a unique string key, a comma delimitted list of the provided
+// parameters.
+func containerMetricKey(ns, podName, containerName, nodeName, clusterID string) string {
+	return ns + "," + podName + "," + containerName + "," + nodeName + "," + clusterID
+}
+
+// NewContainerMetricFromKey creates a new ContainerMetric instance using a provided comma delimitted 
+// string key. 
+func NewContainerMetricFromKey(key string) (*ContainerMetric, error) {
+	s := strings.Split(key, ",")
+	if len(s) == 5 {
+		return &ContainerMetric{
+			Namespace:     s[0],
+			PodName:       s[1],
+			ContainerName: s[2],
+			NodeName:      s[3],
+			ClusterID:     s[4],
+			key:           key,
+		}, nil
+	}
+	return nil, InvalidKeyErr
+}
+
+// NewContainerMetricFromValues creates a new ContainerMetric instance using the provided string parameters.
+func NewContainerMetricFromValues(ns, podName, containerName, nodeName, clusterId string) *ContainerMetric {
+	return &ContainerMetric{
+		Namespace:     ns,
+		PodName:       podName,
+		ContainerName: containerName,
+		NodeName:      nodeName,
+		ClusterID:     clusterId,
+		key:           containerMetricKey(ns, podName, containerName, nodeName, clusterId),
+	}
+}
+
+// NewContainerMetricsFromPod creates a slice of ContainerMetric instances for each container in the 
+// provided Pod.
+func NewContainerMetricsFromPod(pod *v1.Pod, clusterID string) ([]*ContainerMetric, error) {
+	podName := pod.GetObjectMeta().GetName()
+	ns := pod.GetObjectMeta().GetNamespace()
+	node := pod.Spec.NodeName
+
+	var cs []*ContainerMetric
+	for _, container := range pod.Spec.Containers {
+		containerName := container.Name
+		cs = append(cs, &ContainerMetric{
+			Namespace:     ns,
+			PodName:       podName,
+			ContainerName: containerName,
+			NodeName:      node,
+			ClusterID:     clusterID,
+			key:           containerMetricKey(ns, podName, containerName, node, clusterID),
+		})
+	}
+	return cs, nil
+}
+
+// NewContainerMetricFromPrometheus accepts the metrics map from a QueryResult and returns a new ContainerMetric
+// instance
+func NewContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClusterID string) (*ContainerMetric, error) {
+	// TODO: Can we use *prom.QueryResult.GetString() here?
+	cName, ok := metrics["container_name"]
+	if !ok {
+		return nil, NoContainerErr
+	}
+	containerName, ok := cName.(string)
+	if !ok {
+		return nil, NoContainerNameErr
+	}
+	pName, ok := metrics["pod_name"]
+	if !ok {
+		return nil, NoPodErr
+	}
+	podName, ok := pName.(string)
+	if !ok {
+		return nil, NoPodNameErr
+	}
+	ns, ok := metrics["namespace"]
+	if !ok {
+		return nil, NoNamespaceErr
+	}
+	namespace, ok := ns.(string)
+	if !ok {
+		return nil, NoNamespaceNameErr
+	}
+	node, ok := metrics["node"]
+	if !ok {
+		log.Debugf("Prometheus vector does not have node name")
+		node = ""
+	}
+	nodeName, ok := node.(string)
+	if !ok {
+		return nil, NoNodeNameErr
+	}
+	cid, ok := metrics["cluster_id"]
+	if !ok {
+		log.Debugf("Prometheus vector does not have cluster id")
+		cid = defaultClusterID
+	}
+	clusterID, ok := cid.(string)
+	if !ok {
+		return nil, NoClusterIDErr
+	}
+
+	return &ContainerMetric{
+		ContainerName: containerName,
+		PodName:       podName,
+		Namespace:     namespace,
+		NodeName:      nodeName,
+		ClusterID:     clusterID,
+		key:           containerMetricKey(namespace, podName, containerName, nodeName, clusterID),
+	}, nil
+}

+ 57 - 189
pkg/costmodel/costmodel.go

@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"math"
 	"net/http"
-	"os"
 	"strconv"
 	"strings"
 	"sync"
@@ -14,9 +13,11 @@ import (
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/thanos"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	v1 "k8s.io/api/core/v1"
@@ -46,11 +47,6 @@ const (
 	epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
 	epConfig          = apiPrefix + "/status/config"
 	epFlags           = apiPrefix + "/status/flags"
-
-	clusterIDKey   = "CLUSTER_ID"
-	remoteEnabled  = "REMOTE_WRITE_ENABLED"
-	thanosEnabled  = "THANOS_ENABLED"
-	thanosQueryUrl = "THANOS_QUERY_URL"
 )
 
 type CostModel struct {
@@ -238,7 +234,7 @@ type PrometheusMetadata struct {
 func ValidatePrometheus(cli prometheusClient.Client, isThanos bool) (*PrometheusMetadata, error) {
 	q := "up"
 	if isThanos {
-		q += " offset 3h"
+		q += thanos.QueryOffset()
 	}
 	data, err := Query(cli, q)
 	if err != nil {
@@ -331,7 +327,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	normalization := fmt.Sprintf(normalizationStr, window, offset)
 
 	// Cluster ID is specific to the source cluster
-	clusterID := os.Getenv(clusterIDKey)
+	clusterID := env.GetClusterID()
 
 	var wg sync.WaitGroup
 	wg.Add(11)
@@ -577,7 +573,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 		if pod.Status.Phase != v1.PodRunning {
 			continue
 		}
-		cs, err := newContainerMetricsFromPod(*pod, clusterID)
+		cs, err := NewContainerMetricsFromPod(pod, clusterID)
 		if err != nil {
 			return nil, err
 		}
@@ -661,7 +657,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 				containerName := container.Name
 
 				// recreate the key and look up data for this container
-				newKey := newContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
+				newKey := NewContainerMetricFromValues(ns, podName, containerName, pod.Spec.NodeName, clusterID).Key()
 
 				RAMReqV, ok := RAMReqMap[newKey]
 				if !ok {
@@ -847,7 +843,7 @@ func findUnmountedPVCostData(unmountedPVs map[string][]*PersistentVolumeClaimDat
 		// Should be a unique "Unmounted" cost data type
 		name := "unmounted-pvs"
 
-		metric := newContainerMetricFromValues(ns, name, name, "", clusterID)
+		metric := NewContainerMetricFromValues(ns, name, name, "", clusterID)
 		key := metric.Key()
 
 		if costData, ok := costs[key]; !ok {
@@ -1729,7 +1725,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
-	clusterID := os.Getenv(clusterIDKey)
+	clusterID := env.GetClusterID()
 
 	durHrs := end.Sub(start).Hours() + 1
 
@@ -2588,12 +2584,14 @@ type PersistentVolumeClaimData struct {
 
 func getCost(qr interface{}) (map[string][]*util.Vector, error) {
 	toReturn := make(map[string][]*util.Vector)
-	result, err := NewQueryResults(qr)
+
+	// TODO: Pass actual query instead of getCost
+	result, err := prom.NewQueryResults("getCost", qr)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		instance, err := val.GetString("instance")
 		if err != nil {
 			return toReturn, err
@@ -2621,18 +2619,28 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 
 	resp, body, warnings, err := cli.Do(context.Background(), req)
 	for _, w := range warnings {
-		klog.V(3).Infof("[Warning] '%s' fetching query '%s'", w, query)
+		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
 	}
 	if err != nil {
-		return nil, fmt.Errorf("[Error] %s fetching query %s", err.Error(), query)
+		if resp == nil {
+			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
+		}
+
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
+	}
+
+	// Unsuccessful Status Code, log body and status
+	statusCode := resp.StatusCode
+	statusText := http.StatusText(statusCode)
+	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+		return nil, fmt.Errorf("%d (%s) Headers: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
 	}
 
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("[Error] %d %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
-
 	return toReturn, nil
 }
 
@@ -2653,26 +2661,37 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
 		}
 
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
 	}
+
+	// Unsuccessful Status Code, log body and status
+	statusCode := resp.StatusCode
+	statusText := http.StatusText(statusCode)
+	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+		return nil, fmt.Errorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
+	}
+
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
 	return toReturn, nil
 }
 
 //todo: don't cast, implement unmarshaler interface
 func getNormalization(qr interface{}) (float64, error) {
-	queryResults, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of getNormalization
+	qResults, err := prom.NewQueryResults("getNormalization", qr)
 	if err != nil {
 		return 0, err
 	}
 
+	queryResults := qResults.Results
+
 	if len(queryResults) > 0 {
 		values := queryResults[0].Values
 
@@ -2686,11 +2705,14 @@ func getNormalization(qr interface{}) (float64, error) {
 
 //todo: don't cast, implement unmarshaler interface
 func getNormalizations(qr interface{}) ([]*util.Vector, error) {
-	queryResults, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of getNormalizations
+	qResults, err := prom.NewQueryResults("getNormalizations", qr)
 	if err != nil {
 		return nil, err
 	}
 
+	queryResults := qResults.Results
+
 	if len(queryResults) > 0 {
 		vectors := []*util.Vector{}
 		for _, value := range queryResults {
@@ -2701,172 +2723,16 @@ func getNormalizations(qr interface{}) ([]*util.Vector, error) {
 	return nil, fmt.Errorf("normalization data is empty: time window may be invalid or kube-state-metrics or node-exporter may not be running")
 }
 
-type ContainerMetric struct {
-	Namespace     string
-	PodName       string
-	ContainerName string
-	NodeName      string
-	ClusterID     string
-	key           string
-}
-
-func (c *ContainerMetric) Key() string {
-	return c.key
-}
-
-func containerMetricKey(ns, podName, containerName, nodeName, clusterID string) string {
-	return ns + "," + podName + "," + containerName + "," + nodeName + "," + clusterID
-}
-
-func NewContainerMetricFromKey(key string) (*ContainerMetric, error) {
-	s := strings.Split(key, ",")
-	if len(s) == 5 {
-		return &ContainerMetric{
-			Namespace:     s[0],
-			PodName:       s[1],
-			ContainerName: s[2],
-			NodeName:      s[3],
-			ClusterID:     s[4],
-			key:           key,
-		}, nil
-	}
-	return nil, fmt.Errorf("Not a valid key")
-}
-
-func newContainerMetricFromValues(ns string, podName string, containerName string, nodeName string, clusterId string) *ContainerMetric {
-	return &ContainerMetric{
-		Namespace:     ns,
-		PodName:       podName,
-		ContainerName: containerName,
-		NodeName:      nodeName,
-		ClusterID:     clusterId,
-		key:           containerMetricKey(ns, podName, containerName, nodeName, clusterId),
-	}
-}
-
-func newContainerMetricsFromPod(pod v1.Pod, clusterID string) ([]*ContainerMetric, error) {
-	podName := pod.GetObjectMeta().GetName()
-	ns := pod.GetObjectMeta().GetNamespace()
-	node := pod.Spec.NodeName
-	var cs []*ContainerMetric
-	for _, container := range pod.Spec.Containers {
-		containerName := container.Name
-		cs = append(cs, &ContainerMetric{
-			Namespace:     ns,
-			PodName:       podName,
-			ContainerName: containerName,
-			NodeName:      node,
-			ClusterID:     clusterID,
-			key:           containerMetricKey(ns, podName, containerName, node, clusterID),
-		})
-	}
-	return cs, nil
-}
-
-func newContainerMetricFromPrometheus(metrics map[string]interface{}, defaultClusterID string) (*ContainerMetric, error) {
-	cName, ok := metrics["container_name"]
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have container name")
-	}
-	containerName, ok := cName.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string container name")
-	}
-	pName, ok := metrics["pod_name"]
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have pod name")
-	}
-	podName, ok := pName.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string pod name")
-	}
-	ns, ok := metrics["namespace"]
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have namespace")
-	}
-	namespace, ok := ns.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string namespace")
-	}
-	node, ok := metrics["node"]
-	if !ok {
-		klog.V(4).Info("Prometheus vector does not have node name")
-		node = ""
-	}
-	nodeName, ok := node.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string node")
-	}
-	cid, ok := metrics["cluster_id"]
-	if !ok {
-		klog.V(4).Info("Prometheus vector does not have cluster id")
-		cid = defaultClusterID
-	}
-	clusterID, ok := cid.(string)
-	if !ok {
-		return nil, fmt.Errorf("Prometheus vector does not have string cluster_id")
-	}
-	return &ContainerMetric{
-		ContainerName: containerName,
-		PodName:       podName,
-		Namespace:     namespace,
-		NodeName:      nodeName,
-		ClusterID:     clusterID,
-		key:           containerMetricKey(namespace, podName, containerName, nodeName, clusterID),
-	}, nil
-}
-
-type KeyTuple struct {
-	key    string
-	kIndex int
-	cIndex int
-}
-
-func (kt *KeyTuple) Namespace() string {
-	return kt.key[0 : kt.kIndex-1]
-}
-
-func (kt *KeyTuple) Key() string {
-	return kt.key[kt.kIndex : kt.cIndex-1]
-}
-
-func (kt *KeyTuple) ClusterID() string {
-	return kt.key[kt.cIndex:]
-}
-
-func NewKeyTuple(key string) (*KeyTuple, error) {
-	kIndex := strings.IndexRune(key, ',')
-	if kIndex < 0 {
-		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
-	}
-	kIndex += 1
-
-	subIndex := strings.IndexRune(key[kIndex:], ',')
-	if subIndex < 0 {
-		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
-	}
-	cIndex := kIndex + subIndex + 1
-
-	if strings.ContainsRune(key[cIndex:], ',') {
-		return nil, fmt.Errorf("NewKeyTuple() Provided key not containing exactly 3 components.")
-	}
-
-	return &KeyTuple{
-		key:    key,
-		kIndex: kIndex,
-		cIndex: cIndex,
-	}, nil
-}
-
 func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue float64, defaultClusterID string) (map[string][]*util.Vector, error) {
-	result, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of ContainerMetricVector
+	result, err := prom.NewQueryResults("ContainerMetricVector", qr)
 	if err != nil {
 		return nil, err
 	}
 
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result {
-		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+	for _, val := range result.Results {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}
@@ -2882,14 +2748,15 @@ func GetContainerMetricVector(qr interface{}, normalize bool, normalizationValue
 }
 
 func GetContainerMetricVectors(qr interface{}, defaultClusterID string) (map[string][]*util.Vector, error) {
-	result, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of ContainerMetricVectors
+	result, err := prom.NewQueryResults("ContainerMetricVectors", qr)
 	if err != nil {
 		return nil, err
 	}
 
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result {
-		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+	for _, val := range result.Results {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}
@@ -2899,14 +2766,15 @@ func GetContainerMetricVectors(qr interface{}, defaultClusterID string) (map[str
 }
 
 func GetNormalizedContainerMetricVectors(qr interface{}, normalizationValues []*util.Vector, defaultClusterID string) (map[string][]*util.Vector, error) {
-	result, err := NewQueryResults(qr)
+	// TODO: Pass actual query instead of NormalizedContainerMetricVectors
+	result, err := prom.NewQueryResults("NormalizedContainerMetricVectors", qr)
 	if err != nil {
 		return nil, err
 	}
 
 	containerData := make(map[string][]*util.Vector)
-	for _, val := range result {
-		containerMetric, err := newContainerMetricFromPrometheus(val.Metric, defaultClusterID)
+	for _, val := range result.Results {
+		containerMetric, err := NewContainerMetricFromPrometheus(val.Metric, defaultClusterID)
 		if err != nil {
 			return nil, err
 		}

+ 7 - 4
pkg/costmodel/networkcosts.go

@@ -2,8 +2,9 @@ package costmodel
 
 import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
-	"k8s.io/klog"
 )
 
 // NetworkUsageVNetworkUsageDataector contains the network usage values for egress network traffic
@@ -138,12 +139,14 @@ func GetNetworkCost(usage *NetworkUsageData, cloud costAnalyzerCloud.Provider) (
 
 func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*NetworkUsageVector, error) {
 	ncdmap := make(map[string]*NetworkUsageVector)
-	result, err := NewQueryResults(qr)
+
+	// TODO: Pass actual query instead of NetworkUsage
+	result, err := prom.NewQueryResults("NetworkUsage", qr)
 	if err != nil {
 		return nil, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		podName, err := val.GetString("pod_name")
 		if err != nil {
 			return nil, err
@@ -156,7 +159,7 @@ func getNetworkUsage(qr interface{}, defaultClusterID string) (map[string]*Netwo
 
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
-			klog.V(4).Info("Prometheus vector does not have cluster id")
+			log.Debugf("Prometheus vector does not have cluster id")
 			clusterID = defaultClusterID
 		}
 

+ 40 - 193
pkg/costmodel/promparsers.go

@@ -2,182 +2,22 @@ package costmodel
 
 import (
 	"fmt"
-	"math"
-	"strconv"
-	"strings"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
-	"github.com/kubecost/cost-model/pkg/util"
-	"k8s.io/klog"
 )
 
-// PromQueryResult contains a single result from a prometheus query
-type PromQueryResult struct {
-	Metric map[string]interface{}
-	Values []*util.Vector
-}
-
-func (pqr *PromQueryResult) GetString(field string) (string, error) {
-	f, ok := pqr.Metric[field]
-	if !ok {
-		return "", fmt.Errorf("%s field does not exist in data result vector", field)
-	}
-
-	strField, ok := f.(string)
-	if !ok {
-		return "", fmt.Errorf("%s field is improperly formatted", field)
-	}
-
-	return strField, nil
-}
-
-func (pqr *PromQueryResult) GetLabels() map[string]string {
-	result := make(map[string]string)
-
-	// Find All keys with prefix label_, remove prefix, add to labels
-	for k, v := range pqr.Metric {
-		if !strings.HasPrefix(k, "label_") {
-			continue
-		}
-
-		label := k[6:]
-		value, ok := v.(string)
-		if !ok {
-			klog.V(3).Infof("Failed to parse label value for label: %s", label)
-			continue
-		}
-
-		result[label] = value
-	}
-
-	return result
-}
-
-// NewQueryResults accepts the raw prometheus query result and returns an array of
-// PromQueryResult objects
-func NewQueryResults(queryResult interface{}) ([]*PromQueryResult, error) {
-	var result []*PromQueryResult
-	if queryResult == nil {
-		return nil, prom.NewCommError("nil queryResult")
-	}
-	data, ok := queryResult.(map[string]interface{})["data"]
-	if !ok {
-		e, err := wrapPrometheusError(queryResult)
-		if err != nil {
-			return nil, err
-		}
-		return nil, fmt.Errorf(e)
-	}
-
-	// Deep Check for proper formatting
-	d, ok := data.(map[string]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
-	}
-	resultData, ok := d["result"]
-	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
-	}
-	resultsData, ok := resultData.([]interface{})
-	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
-	}
-
-	// Scan Results
-	for _, val := range resultsData {
-		resultInterface, ok := val.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Result is improperly formatted")
-		}
-
-		metricInterface, ok := resultInterface["metric"]
-		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
-		}
-		metricMap, ok := metricInterface.(map[string]interface{})
-		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
-		}
-
-		// Wrap execution of this lazily in case the data is not used
-		labels := func() string { return labelsForMetric(metricMap) }
-
-		// Determine if the result is a ranged data set or single value
-		_, isRange := resultInterface["values"]
-
-		var vectors []*util.Vector
-		if !isRange {
-			dataPoint, ok := resultInterface["value"]
-			if !ok {
-				return nil, fmt.Errorf("Value field does not exist in data result vector")
-			}
-
-			v, err := parseDataPoint(dataPoint, labels)
-			if err != nil {
-				return nil, err
-			}
-			vectors = append(vectors, v)
-		} else {
-			values, ok := resultInterface["values"].([]interface{})
-			if !ok {
-				return nil, fmt.Errorf("Values field is improperly formatted")
-			}
-
-			for _, value := range values {
-				v, err := parseDataPoint(value, labels)
-				if err != nil {
-					return nil, err
-				}
-
-				vectors = append(vectors, v)
-			}
-		}
-
-		result = append(result, &PromQueryResult{
-			Metric: metricMap,
-			Values: vectors,
-		})
-	}
-
-	return result, nil
-}
-
-func parseDataPoint(dataPoint interface{}, labels func() string) (*util.Vector, error) {
-	value, ok := dataPoint.([]interface{})
-	if !ok || len(value) != 2 {
-		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
-	}
-
-	strVal := value[1].(string)
-	v, err := strconv.ParseFloat(strVal, 64)
-	if err != nil {
-		return nil, err
-	}
-
-	// Test for +Inf and -Inf (sign: 0), Test for NaN
-	if math.IsInf(v, 0) {
-		klog.V(1).Infof("[Warning] Found Inf value parsing vector data point for metric: %s", labels())
-		v = 0.0
-	} else if math.IsNaN(v) {
-		klog.V(1).Infof("[Warning] Found NaN value parsing vector data point for metric: %s", labels())
-		v = 0.0
-	}
-
-	return &util.Vector{
-		Timestamp: math.Round(value[0].(float64)/10) * 10,
-		Value:     v,
-	}, nil
-}
-
 func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string]*PersistentVolumeClaimData)
-	result, err := NewQueryResults(qr)
+
+	// TODO: Pass actual query instead of PVInfo
+	result, err := prom.NewQueryResults("PVInfo", qr)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -195,14 +35,14 @@ func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentV
 
 		volumeName, err := val.GetString("volumename")
 		if err != nil {
-			klog.V(4).Infof("[Warning] Unfulfilled claim %s: volumename field does not exist in data result vector", pvcName)
+			log.Debugf("Unfulfilled claim %s: volumename field does not exist in data result vector", pvcName)
 			volumeName = ""
 		}
 
 		pvClass, err := val.GetString("storageclass")
 		if err != nil {
 			// TODO: We need to look up the actual PV and PV capacity. For now just proceed with "".
-			klog.V(2).Infof("[Warning] Storage Class not found for claim \"%s/%s\".", ns, pvcName)
+			log.Warningf("Storage Class not found for claim \"%s/%s\".", ns, pvcName)
 			pvClass = ""
 		}
 
@@ -222,12 +62,14 @@ func GetPVInfo(qr interface{}, defaultClusterID string) (map[string]*PersistentV
 
 func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (map[string][]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string][]*PersistentVolumeClaimData)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PVAllocationMetrics
+	result, err := prom.NewQueryResults("PVAllocationMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -250,7 +92,7 @@ func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (m
 
 		pvName, err := val.GetString("persistentvolume")
 		if err != nil {
-			klog.Infof("persistentvolume field does not exist for pv %s", pvcName) // This is possible for an unfulfilled claim
+			log.Warningf("persistentvolume field does not exist for pv %s", pvcName) // This is possible for an unfulfilled claim
 			continue
 		}
 
@@ -272,12 +114,14 @@ func GetPVAllocationMetrics(queryResult interface{}, defaultClusterID string) (m
 
 func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[string]*costAnalyzerCloud.PV, error) {
 	toReturn := make(map[string]*costAnalyzerCloud.PV)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PVCostMetrics
+	result, err := prom.NewQueryResults("PVCostMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		clusterID, err := val.GetString("cluster_id")
 		if clusterID == "" {
 			clusterID = defaultClusterID
@@ -299,12 +143,14 @@ func GetPVCostMetrics(queryResult interface{}, defaultClusterID string) (map[str
 
 func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of NamespaceLabelsMetrics
+	result, err := prom.NewQueryResults("NamespaceLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Namespace and ClusterID for key generation purposes
 		ns, err := val.GetString("namespace")
 		if err != nil {
@@ -330,12 +176,14 @@ func GetNamespaceLabelsMetrics(queryResult interface{}, defaultClusterID string)
 
 func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PodLabelsMetrics
+	result, err := prom.NewQueryResults("PodLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Pod, Namespace and ClusterID for key generation purposes
 		pod, err := val.GetString("pod")
 		if err != nil {
@@ -368,12 +216,14 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 
 func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of StatefulsetMatchLabelsMetrics
+	result, err := prom.NewQueryResults("StatefulsetMatchLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Statefulset, Namespace and ClusterID for key generation purposes
 		ss, err := val.GetString("statefulSet")
 		if err != nil {
@@ -399,11 +249,13 @@ func GetStatefulsetMatchLabelsMetrics(queryResult interface{}, defaultClusterID
 
 func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID string) (map[string]string, error) {
 	toReturn := make(map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of PodDaemonsetsWithMetrics
+	result, err := prom.NewQueryResults("PodDaemonsetsWithMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
-	for _, val := range result {
+	for _, val := range result.Results {
 		ds, err := val.GetString("owner_name")
 		if err != nil {
 			return toReturn, err
@@ -433,12 +285,14 @@ func GetPodDaemonsetsWithMetrics(queryResult interface{}, defaultClusterID strin
 
 func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of DeploymentMatchLabelsMetrics
+	result, err := prom.NewQueryResults("DeploymentMatchLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Deployment, Namespace and ClusterID for key generation purposes
 		deployment, err := val.GetString("deployment")
 		if err != nil {
@@ -464,12 +318,14 @@ func GetDeploymentMatchLabelsMetrics(queryResult interface{}, defaultClusterID s
 
 func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[string]map[string]string, error) {
 	toReturn := make(map[string]map[string]string)
-	result, err := NewQueryResults(queryResult)
+
+	// TODO: Pass actual query instead of ServiceSelectorLabelsMetrics
+	result, err := prom.NewQueryResults("ServiceSelectorLabelsMetrics", queryResult)
 	if err != nil {
 		return toReturn, err
 	}
 
-	for _, val := range result {
+	for _, val := range result.Results {
 		// We want Service, Namespace and ClusterID for key generation purposes
 		service, err := val.GetString("service")
 		if err != nil {
@@ -492,12 +348,3 @@ func GetServiceSelectorLabelsMetrics(queryResult interface{}, defaultClusterID s
 
 	return toReturn, nil
 }
-
-func labelsForMetric(metricMap map[string]interface{}) string {
-	var pairs []string
-	for k, v := range metricMap {
-		pairs = append(pairs, fmt.Sprintf("%s: %+v", k, v))
-	}
-
-	return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
-}

+ 29 - 48
pkg/costmodel/router.go

@@ -8,7 +8,6 @@ import (
 	"math"
 	"net"
 	"net/http"
-	"os"
 	"reflect"
 	"strconv"
 	"strings"
@@ -23,9 +22,10 @@ import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	cm "github.com/kubecost/cost-model/pkg/clustermanager"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
-	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/thanos"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -39,25 +39,19 @@ import (
 )
 
 const (
-	logCollectionEnvVar            = "LOG_COLLECTION_ENABLED"
-	productAnalyticsEnvVar         = "PRODUCT_ANALYTICS_ENABLED"
-	errorReportingEnvVar           = "ERROR_REPORTING_ENABLED"
-	valuesReportingEnvVar          = "VALUES_REPORTING_ENABLED"
-	maxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
-	prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
-	prometheusTroubleshootingEp    = "http://docs.kubecost.com/custom-prom#troubleshoot"
-	RFC3339Milli                   = "2006-01-02T15:04:05.000Z"
+	prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
+	RFC3339Milli                = "2006-01-02T15:04:05.000Z"
 )
 
 var (
 	// gitCommit is set by the build system
 	gitCommit                       string
-	logCollectionEnabled            bool   = strings.EqualFold(os.Getenv(logCollectionEnvVar), "true")
-	productAnalyticsEnabled         bool   = strings.EqualFold(os.Getenv(productAnalyticsEnvVar), "true")
-	errorReportingEnabled           bool   = strings.EqualFold(os.Getenv(errorReportingEnvVar), "true")
-	valuesReportingEnabled          bool   = strings.EqualFold(os.Getenv(valuesReportingEnvVar), "true")
-	multiclusterDBBasicAuthUsername string = os.Getenv("MC_BASIC_AUTH_USERNAME")
-	multiclusterDBBasicAuthPW       string = os.Getenv("MC_BASIC_AUTH_PW")
+	logCollectionEnabled            bool   = env.IsLogCollectionEnabled()
+	productAnalyticsEnabled         bool   = env.IsProductAnalyticsEnabled()
+	errorReportingEnabled           bool   = env.IsErrorReportingEnabled()
+	valuesReportingEnabled          bool   = env.IsValuesReportingEnabled()
+	multiclusterDBBasicAuthUsername string = env.GetMultiClusterBasicAuthUsername()
+	multiclusterDBBasicAuthPW       string = env.GetMultiClusterBasicAuthPassword()
 )
 
 var Router = httprouter.New()
@@ -173,22 +167,6 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
-// Parses the max query concurrency environment variable
-func maxQueryConcurrency() int {
-	v := os.Getenv(maxQueryConcurrencyEnvVar)
-	if v == "" {
-		return 5
-	}
-
-	result, err := strconv.Atoi(v)
-	if err != nil {
-		log.Warningf("Failed to parse MAX_QUERY_CONCURRENCY. Defaulting to 5 - %s", err)
-		return 5
-	}
-
-	return result
-}
-
 // writeReportingFlags writes the reporting flags to the cluster info map
 func writeReportingFlags(clusterInfo map[string]string) {
 	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
@@ -392,11 +370,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	cluster := r.URL.Query().Get("cluster")
 	remote := r.URL.Query().Get("remote")
 
-	remoteAvailable := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if remoteAvailable == "true" && remote != "false" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled() && remote != "false"
 
 	// Use Thanos Client if it exists (enabled) and remote flag set
 	var pClient prometheusClient.Client
@@ -654,6 +628,11 @@ func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
 	// Include Product Reporting Flags with Cluster Info
 	writeReportingFlags(data)
 
+	// Include Thanos Offset Duration if Applicable
+	if thanos.IsEnabled() {
+		data["thanosOffset"] = thanos.Offset()
+	}
+
 	w.Write(WrapData(data, err))
 }
 
@@ -884,7 +863,7 @@ func newClusterManager() *cm.ClusterManager {
 	// NOTE: configmap approach is currently the "persistent" source (entries are read-only
 	// NOTE: on the backend), we don't currently need to store on disk.
 	/*
-		path := os.Getenv("CONFIG_PATH")
+		path := env.GetConfigPath()
 		db, err := bolt.Open(path+"costmodel.db", 0600, nil)
 		if err != nil {
 			klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
@@ -954,12 +933,13 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		}
 	}
 
-	address := os.Getenv(prometheusServerEndpointEnvVar)
+	address := env.GetPrometheusServerEndpoint()
 	if address == "" {
-		klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
+		klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
-	queryConcurrency := maxQueryConcurrency()
+	queryConcurrency := env.GetMaxQueryConcurrency()
+	klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
 
 	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
 		Proxy: http.ProxyFromEnvironment,
@@ -1008,7 +988,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
 	k8sCache.Run()
 
-	cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
+	cloudProviderKey := env.GetCloudProviderAPIKey()
 	cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
 	if err != nil {
 		panic(err.Error())
@@ -1031,7 +1011,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 			}
 		}
 	}
-	kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
+	kubecostNamespace := env.GetKubecostNamespace()
 	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
 	configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
 	if err != nil {
@@ -1158,8 +1138,8 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		OutOfClusterCache:             outOfClusterCache,
 	}
 
-	remoteEnabled := os.Getenv(remoteEnabled)
-	if remoteEnabled == "true" {
+	remoteEnabled := env.IsRemoteEnabled()
+	if remoteEnabled {
 		info, err := cloudProvider.ClusterInfo()
 		klog.Infof("Saving cluster  with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
 		if err != nil {
@@ -1172,8 +1152,9 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	}
 
 	// Thanos Client
-	if os.Getenv(thanosEnabled) == "true" {
-		thanosUrl := os.Getenv(thanosQueryUrl)
+	if thanos.IsEnabled() {
+		thanosUrl := thanos.QueryURL()
+
 		if thanosUrl != "" {
 			var thanosRT http.RoundTripper = &http.Transport{
 				Proxy: http.ProxyFromEnvironment,
@@ -1202,7 +1183,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 			}
 
 		} else {
-			klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
+			klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
 		}
 	}
 

+ 6 - 8
pkg/costmodel/sql.go

@@ -4,19 +4,17 @@ import (
 	"database/sql"
 	"encoding/json"
 	"fmt"
-	"os"
 	"time"
 
 	"k8s.io/klog"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+
 	_ "github.com/lib/pq"
 )
 
-const remotePW = "REMOTE_WRITE_PASSWORD"
-const sqlAddress = "SQL_ADDRESS"
-
 func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
 	pvs := make(map[string]*costAnalyzerCloud.PV)
 	query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
@@ -94,8 +92,8 @@ func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
 }
 
 func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	defer db.Close()
@@ -140,7 +138,7 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 			return nil, err
 		}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance, clusterid)
+		k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
 		key := k.Key()
 		allocationVector := &util.Vector{
 			Timestamp: float64(t.Unix()),
@@ -211,7 +209,7 @@ func CostDataRangeFromSQL(field string, value string, window string, start strin
 			return nil, err
 		}
 
-		k := newContainerMetricFromValues(namespace, pod, container, instance, clusterid)
+		k := NewContainerMetricFromValues(namespace, pod, container, instance, clusterid)
 		key := k.Key()
 		allocationVector := &util.Vector{
 			Timestamp: float64(t.Unix()),

+ 176 - 0
pkg/env/costmodelenv.go

@@ -0,0 +1,176 @@
+package env
+
+const (
+	AWSAccessKeyIDEnvVar     = "AWS_ACCESS_KEY_ID"
+	AWSAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
+	AWSClusterIDEnvVar       = "AWS_CLUSTER_ID"
+
+	KubecostNamespaceEnvVar        = "KUBECOST_NAMESPACE"
+	ClusterIDEnvVar                = "CLUSTER_ID"
+	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
+	MaxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
+	RemoteEnabledEnvVar            = "REMOTE_WRITE_ENABLED"
+	RemotePWEnvVar                 = "REMOTE_WRITE_PASSWORD"
+	SQLAddressEnvVar               = "SQL_ADDRESS"
+	UseCSVProviderEnvVar           = "USE_CSV_PROVIDER"
+	CSVRegionEnvVar                = "CSV_REGION"
+	CSVPathEnvVar                  = "CSV_PATH"
+	ConfigPathEnvVar               = "CONFIG_PATH"
+	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
+
+	ThanosEnabledEnvVar  = "THANOS_ENABLED"
+	ThanosQueryUrlEnvVar = "THANOS_QUERY_URL"
+	ThanosOffsetEnvVar   = "THANOS_QUERY_OFFSET"
+
+	LogCollectionEnabledEnvVar    = "LOG_COLLECTION_ENABLED"
+	ProductAnalyticsEnabledEnvVar = "PRODUCT_ANALYTICS_ENABLED"
+	ErrorReportingEnabledEnvVar   = "ERROR_REPORTING_ENABLED"
+	ValuesReportingEnabledEnvVar  = "VALUES_REPORTING_ENABLED"
+
+	MultiClusterBasicAuthUsername = "MC_BASIC_AUTH_USERNAME"
+	MultiClusterBasicAuthPassword = "MC_BASIC_AUTH_PW"
+)
+
+// GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
+// the AWS access key for authentication
+func GetAWSAccessKeyID() string {
+	return Get(AWSAccessKeyIDEnvVar, "")
+}
+
+// GetAWSAccessKeySecret returns the environment variable value for AWSAccessKeySecretEnvVar which represents
+// the AWS access key secret for authentication
+func GetAWSAccessKeySecret() string {
+	return Get(AWSAccessKeySecretEnvVar, "")
+}
+
+// GetAWSClusterID returns the environment variable value for AWSClusterIDEnvVar which represents
+// an AWS specific cluster identifier.
+func GetAWSClusterID() string {
+	return Get(AWSClusterIDEnvVar, "")
+}
+
+// GetKubecostNamespace returns the environment variable value for KubecostNamespaceEnvVar which
+// represents the namespace the cost model exists in.
+func GetKubecostNamespace() string {
+	return Get(KubecostNamespaceEnvVar, "kubecost")
+}
+
+// GetClusterID returns the environment variable value for ClusterIDEnvVar which represents the
+// configurable identifier used for multi-cluster metric emission.
+func GetClusterID() string {
+	return Get(ClusterIDEnvVar, "")
+}
+
+// GetPrometheusServerEndpoint returns the environment variable value for PrometheusServerEndpointEnvVar which
+// represents the prometheus server endpoint used to execute prometheus queries.
+func GetPrometheusServerEndpoint() string {
+	return Get(PrometheusServerEndpointEnvVar, "")
+}
+
+// IsRemoteEnabled returns the environment variable value for RemoteEnabledEnvVar which represents whether
+// or not remote write is enabled for prometheus for use with SQL backed persistent storage.
+func IsRemoteEnabled() bool {
+	return GetBool(RemoteEnabledEnvVar, false)
+}
+
+// GetRemotePW returns the environment variable value for RemotePWEnvVar which represents the remote
+// persistent storage password.
+func GetRemotePW() string {
+	return Get(RemotePWEnvVar, "")
+}
+
+// GetSQLAddress returns the environment variable value for SQLAddressEnvVar which represents the SQL
+// database address used with remote persistent storage.
+func GetSQLAddress() string {
+	return Get(SQLAddressEnvVar, "")
+}
+
+// IsUseCSVProvider returns the environment variable value for UseCSVProviderEnvVar which represents
+// whether or not the use of a CSV cost provider is enabled.
+func IsUseCSVProvider() bool {
+	return GetBool(UseCSVProviderEnvVar, false)
+}
+
+// GetCSVRegion returns the environment variable value for CSVRegionEnvVar which represents the
+// region configured for a CSV provider.
+func GetCSVRegion() string {
+	return Get(CSVRegionEnvVar, "")
+}
+
+// GetCSVPath returns the environment variable value for CSVPathEnvVar which represents the key path
+// configured for a CSV provider.
+func GetCSVPath() string {
+	return Get(CSVPathEnvVar, "")
+}
+
+// GetConfigPath returns the environment variable value for ConfigPathEnvVar which represents the cost
+// model configuration path
+func GetConfigPath() string {
+	return Get(ConfigPathEnvVar, "")
+}
+
+// GetConfigPath returns the environment variable value for ConfigPathEnvVar which represents the cost
+// model configuration path
+func GetConfigPathWithDefault(defaultValue string) string {
+	return Get(ConfigPathEnvVar, defaultValue)
+}
+
+// GetCloudProviderAPI returns the environment variable value for CloudProviderAPIEnvVar which represents
+// the API key provided for the cloud provider.
+func GetCloudProviderAPIKey() string {
+	return Get(CloudProviderAPIKeyEnvVar, "")
+}
+
+// IsThanosEnabled returns the environment variable value for ThanosEnabledEnvVar which represents whether
+// or not thanos is enabled.
+func IsThanosEnabled() bool {
+	return GetBool(ThanosEnabledEnvVar, false)
+}
+
+// GetThanosQueryUrl returns the environment variable value for ThanosQueryUrlEnvVar which represents the
+// target query endpoint for hitting thanos.
+func GetThanosQueryUrl() string {
+	return Get(ThanosQueryUrlEnvVar, "")
+}
+
+// GetThanosOffset returns the environment variable value for ThanosOffsetEnvVar which represents the total
+// amount of time to offset all queries made to thanos.
+func GetThanosOffset() string {
+	return Get(ThanosOffsetEnvVar, "3h")
+}
+
+// IsLogCollectionEnabled returns the environment variable value for LogCollectionEnabledEnvVar which represents
+// whether or not log collection has been enabled for kubecost deployments.
+func IsLogCollectionEnabled() bool {
+	return GetBool(LogCollectionEnabledEnvVar, true)
+}
+
+// IsProductAnalyticsEnabled returns the environment variable value for ProductAnalyticsEnabledEnvVar
+func IsProductAnalyticsEnabled() bool {
+	return GetBool(ProductAnalyticsEnabledEnvVar, true)
+}
+
+// IsErrorReportingEnabled returns the environment variable value for ErrorReportingEnabledEnvVar
+func IsErrorReportingEnabled() bool {
+	return GetBool(ErrorReportingEnabledEnvVar, true)
+}
+
+// IsValuesReportingEnabled returns the environment variable value for ValuesReportingEnabledEnvVar
+func IsValuesReportingEnabled() bool {
+	return GetBool(ValuesReportingEnabledEnvVar, true)
+}
+
+// GetMaxQueryConcurrency returns the environment variable value for MaxQueryConcurrencyEnvVar
+func GetMaxQueryConcurrency() int {
+	return GetInt(MaxQueryConcurrencyEnvVar, 5)
+}
+
+// GetMultiClusterBasicAuthUsername returns the environemnt variable value for MultiClusterBasicAuthUsername
+func GetMultiClusterBasicAuthUsername() string {
+	return Get(MultiClusterBasicAuthUsername, "")
+}
+
+// GetMultiClusterBasicAuthPassword returns the environemnt variable value for MultiClusterBasicAuthPassword
+func GetMultiClusterBasicAuthPassword() string {
+	return Get(MultiClusterBasicAuthPassword, "")
+}

+ 233 - 0
pkg/env/env.go

@@ -0,0 +1,233 @@
+package env
+
+import (
+	"os"
+	"strconv"
+)
+
+// Get parses an string from the environment variable key parameter. If the environment
+// variable is empty, the defaultValue parameter is returned.
+func Get(key string, defaultValue string) string {
+	r := os.Getenv(key)
+	if r == "" {
+		return defaultValue
+	}
+
+	return r
+}
+
+// GetInt parses an int from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt(key string, defaultValue int) int {
+	r := os.Getenv(key)
+	i, err := strconv.Atoi(r)
+	if err != nil {
+		return defaultValue
+	}
+
+	return i
+}
+
+// GetInt8 parses an int8 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt8(key string, defaultValue int8) int8 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 8)
+	if err != nil {
+		return defaultValue
+	}
+
+	return int8(i)
+}
+
+// GetInt16 parses an int16 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt16(key string, defaultValue int16) int16 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 16)
+	if err != nil {
+		return defaultValue
+	}
+
+	return int16(i)
+}
+
+// GetInt32 parses an int32 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt32(key string, defaultValue int32) int32 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return int32(i)
+}
+
+// GetInt64 parses an int64 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt64(key string, defaultValue int64) int64 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 64)
+	if err != nil {
+		return defaultValue
+	}
+
+	return i
+}
+
+// GetUInt parses a uint from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt(key string, defaultValue uint) uint {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint(i)
+}
+
+// GetUInt8 parses a uint8 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt8(key string, defaultValue uint8) uint8 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 8)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint8(i)
+}
+
+// GetUInt16 parses a uint16 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt16(key string, defaultValue uint16) uint16 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 16)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint16(i)
+}
+
+// GetUInt32 parses a uint32 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt32(key string, defaultValue uint32) uint32 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint32(i)
+}
+
+// GetUInt64 parses a uint64 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt64(key string, defaultValue uint64) uint64 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 64)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint64(i)
+}
+
+// GetFloat32 parses a float32 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetFloat32(key string, defaultValue float32) float32 {
+	r := os.Getenv(key)
+	f, err := strconv.ParseFloat(r, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return float32(f)
+}
+
+// GetFloat64 parses a float64 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetFloat64(key string, defaultValue float64) float64 {
+	r := os.Getenv(key)
+	f, err := strconv.ParseFloat(r, 64)
+	if err != nil {
+		return defaultValue
+	}
+
+	return f
+}
+
+// GetBool parses a bool from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetBool(key string, defaultValue bool) bool {
+	r := os.Getenv(key)
+	b, err := strconv.ParseBool(r)
+	if err != nil {
+		return defaultValue
+	}
+
+	return b
+}
+
+// Set sets the environment variable for the key provided using the value provided.
+func Set(key string, value string) error {
+	return os.Setenv(key, value)
+}
+
+// SetInt sets the environment variable to a string formatted int value
+func SetInt(key string, value int) error {
+	return os.Setenv(key, strconv.Itoa(value))
+}
+
+// SetInt8 sets the environment variable to a string formatted int8 value.
+func SetInt8(key string, value int8) error {
+	return os.Setenv(key, strconv.FormatInt(int64(value), 10))
+}
+
+// SetInt16 sets the environment variable to a string formatted int16 value.
+func SetInt16(key string, value int16) error {
+	return os.Setenv(key, strconv.FormatInt(int64(value), 10))
+}
+
+// SetInt32 sets the environment variable to a string formatted int32 value.
+func SetInt32(key string, value int32) error {
+	return os.Setenv(key, strconv.FormatInt(int64(value), 10))
+}
+
+// SetInt64 sets the environment variable to a string formatted int64 value.
+func SetInt64(key string, value int64) error {
+	return os.Setenv(key, strconv.FormatInt(value, 10))
+}
+
+// SetUInt sets the environment variable to a string formatted uint value
+func SetUInt(key string, value uint) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt8 sets the environment variable to a string formatted uint8 value
+func SetUInt8(key string, value uint8) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt16 sets the environment variable to a string formatted uint16 value
+func SetUInt16(key string, value uint16) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt32 sets the environment variable to a string formatted uint32 value
+func SetUInt32(key string, value uint32) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt64 sets the environment variable to a string formatted uint64 value
+func SetUInt64(key string, value uint64) error {
+	return os.Setenv(key, strconv.FormatUint(value, 10))
+}
+
+// SetBool sets the environment variable to a string formatted bool value.
+func SetBool(key string, value bool) error {
+	return os.Setenv(key, strconv.FormatBool(value))
+}

+ 13 - 4
pkg/prom/query.go

@@ -7,6 +7,7 @@ import (
 	"net/http"
 
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
@@ -66,7 +67,7 @@ func (ctx *Context) Query(query string) QueryResultsChan {
 		raw, promErr := ctx.query(query)
 		ctx.ErrorCollector.Report(promErr)
 
-		results, parseErr := NewQueryResults(raw)
+		results, parseErr := NewQueryResults(query, raw)
 		ctx.ErrorCollector.Report(parseErr)
 
 		resCh <- results
@@ -92,15 +93,23 @@ func (ctx *Context) query(query string) (interface{}, error) {
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
 		}
 
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
 	}
+
+	// Unsuccessful Status Code, log body and status
+	statusCode := resp.StatusCode
+	statusText := http.StatusText(statusCode)
+	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+		return nil, fmt.Errorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
+	}
+
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
 	return toReturn, nil
 }

+ 86 - 30
pkg/prom/result.go

@@ -1,23 +1,31 @@
 package prom
 
 import (
+	"errors"
 	"fmt"
 	"math"
 	"strconv"
 	"strings"
 
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util"
-	"k8s.io/klog"
 )
 
 // QueryResultsChan is a channel of query results
-type QueryResultsChan chan []*QueryResult
+type QueryResultsChan chan *QueryResults
 
 // Await returns query results, blocking until they are made available, and
 // deferring the closure of the underlying channel
 func (qrc QueryResultsChan) Await() []*QueryResult {
 	defer close(qrc)
-	return <-qrc
+	results := <-qrc
+
+	// Possible that the returned results are nil
+	if results == nil {
+		return nil
+	}
+
+	return results.Results
 }
 
 // QueryResult contains a single result from a prometheus query. It's common
@@ -27,13 +35,38 @@ type QueryResult struct {
 	Values []*util.Vector
 }
 
+// QueryResults contains all of the query results and the source query string.
+type QueryResults struct {
+	Query   string
+	Results []*QueryResult
+}
+
+var (
+	// Static Warnings for data point parsing
+	InfWarning warning = newWarning("Found Inf value parsing vector data point for metric")
+	NaNWarning warning = newWarning("Found NaN value parsing vector data point for metric")
+
+	// Static Errors for query result parsing
+	QueryResultNilErr          error = NewCommError("nil queryResult")
+	PromUnexpectedResponseErr  error = errors.New("Unexpected response from Prometheus")
+	DataFieldFormatErr         error = errors.New("Data field improperly formatted in prometheus repsonse")
+	ResultFieldDoesNotExistErr error = errors.New("Result field not does not exist in prometheus response")
+	ResultFieldFormatErr       error = errors.New("Result field improperly formatted in prometheus response")
+	ResultFormatErr            error = errors.New("Result is improperly formatted")
+	MetricFieldDoesNotExistErr error = errors.New("Metric field does not exist in data result vector")
+	MetricFieldFormatErr       error = errors.New("Metric field is improperly formatted")
+	ValueFieldDoesNotExistErr  error = errors.New("Value field does not exist in data result vector")
+	ValueFieldFormatErr        error = errors.New("Values field is improperly formatted")
+	DataPointFormatErr         error = errors.New("Improperly formatted datapoint from Prometheus")
+)
+
 // NewQueryResults accepts the raw prometheus query result and returns an array of
 // QueryResult objects
-func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
-	var result []*QueryResult
+func NewQueryResults(query string, queryResult interface{}) (*QueryResults, error) {
 	if queryResult == nil {
-		return nil, NewCommError("nil queryResult")
+		return nil, QueryResultNilErr
 	}
+
 	data, ok := queryResult.(map[string]interface{})["data"]
 	if !ok {
 		e, err := wrapPrometheusError(queryResult)
@@ -46,35 +79,39 @@ func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
 	// Deep Check for proper formatting
 	d, ok := data.(map[string]interface{})
 	if !ok {
-		return nil, fmt.Errorf("Data field improperly formatted in prometheus repsonse")
+		return nil, DataFieldFormatErr
 	}
 	resultData, ok := d["result"]
 	if !ok {
-		return nil, fmt.Errorf("Result field not present in prometheus response")
+		return nil, ResultFieldDoesNotExistErr
 	}
 	resultsData, ok := resultData.([]interface{})
 	if !ok {
-		return nil, fmt.Errorf("Result field improperly formatted in prometheus response")
+		return nil, ResultFieldFormatErr
 	}
 
-	// Scan Results
+	// Result vectors from the query
+	var results []*QueryResult
+
+	// Parse raw results and into QueryResults
 	for _, val := range resultsData {
 		resultInterface, ok := val.(map[string]interface{})
 		if !ok {
-			return nil, fmt.Errorf("Result is improperly formatted")
+			return nil, ResultFormatErr
 		}
 
 		metricInterface, ok := resultInterface["metric"]
 		if !ok {
-			return nil, fmt.Errorf("Metric field does not exist in data result vector")
+			return nil, MetricFieldDoesNotExistErr
 		}
 		metricMap, ok := metricInterface.(map[string]interface{})
 		if !ok {
-			return nil, fmt.Errorf("Metric field is improperly formatted")
+			return nil, MetricFieldFormatErr
 		}
 
-		// Wrap execution of this lazily in case the data is not used
-		labels := func() string { return labelsForMetric(metricMap) }
+		// Define label string for values to ensure that we only run labelsForMetric once
+		// if we receive multiple warnings.
+		var labelString string = ""
 
 		// Determine if the result is a ranged data set or single value
 		_, isRange := resultInterface["values"]
@@ -83,13 +120,18 @@ func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
 		if !isRange {
 			dataPoint, ok := resultInterface["value"]
 			if !ok {
-				return nil, fmt.Errorf("Value field does not exist in data result vector")
+				return nil, ValueFieldDoesNotExistErr
 			}
 
-			v, err := parseDataPoint(dataPoint, labels)
+			// Append new data point, log warnings
+			v, warn, err := parseDataPoint(dataPoint)
 			if err != nil {
 				return nil, err
 			}
+			if warn != nil {
+				log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelsForMetric(metricMap))
+			}
+
 			vectors = append(vectors, v)
 		} else {
 			values, ok := resultInterface["values"].([]interface{})
@@ -97,35 +139,45 @@ func NewQueryResults(queryResult interface{}) ([]*QueryResult, error) {
 				return nil, fmt.Errorf("Values field is improperly formatted")
 			}
 
+			// Append new data points, log warnings
 			for _, value := range values {
-				v, err := parseDataPoint(value, labels)
+				v, warn, err := parseDataPoint(value)
 				if err != nil {
 					return nil, err
 				}
+				if warn != nil {
+					if labelString == "" {
+						labelString = labelsForMetric(metricMap)
+					}
+					log.Warningf("%s\nQuery: %s\nLabels: %s", warn.Message(), query, labelString)
+				}
 
 				vectors = append(vectors, v)
 			}
 		}
 
-		result = append(result, &QueryResult{
+		results = append(results, &QueryResult{
 			Metric: metricMap,
 			Values: vectors,
 		})
 	}
 
-	return result, nil
+	return &QueryResults{
+		Query:   query,
+		Results: results,
+	}, nil
 }
 
 // GetString returns the requested field, or an error if it does not exist
 func (qr *QueryResult) GetString(field string) (string, error) {
 	f, ok := qr.Metric[field]
 	if !ok {
-		return "", fmt.Errorf("%s field does not exist in data result vector", field)
+		return "", fmt.Errorf("'%s' field does not exist in data result vector", field)
 	}
 
 	strField, ok := f.(string)
 	if !ok {
-		return "", fmt.Errorf("%s field is improperly formatted", field)
+		return "", fmt.Errorf("'%s' field is improperly formatted", field)
 	}
 
 	return strField, nil
@@ -144,7 +196,7 @@ func (qr *QueryResult) GetLabels() map[string]string {
 		label := k[6:]
 		value, ok := v.(string)
 		if !ok {
-			klog.V(3).Infof("Failed to parse label value for label: %s", label)
+			log.Warningf("Failed to parse label value for label: '%s'", label)
 			continue
 		}
 
@@ -154,31 +206,35 @@ func (qr *QueryResult) GetLabels() map[string]string {
 	return result
 }
 
-func parseDataPoint(dataPoint interface{}, labels func() string) (*util.Vector, error) {
+// parseDataPoint parses a data point from raw prometheus query results and returns
+// a new Vector instance containing the parsed data along with any warnings or errors.
+func parseDataPoint(dataPoint interface{}) (*util.Vector, warning, error) {
+	var w warning = nil
+
 	value, ok := dataPoint.([]interface{})
 	if !ok || len(value) != 2 {
-		return nil, fmt.Errorf("Improperly formatted datapoint from Prometheus")
+		return nil, w, DataPointFormatErr
 	}
 
 	strVal := value[1].(string)
 	v, err := strconv.ParseFloat(strVal, 64)
 	if err != nil {
-		return nil, err
+		return nil, w, err
 	}
 
 	// Test for +Inf and -Inf (sign: 0), Test for NaN
 	if math.IsInf(v, 0) {
-		klog.V(1).Infof("[Warning] Found Inf value parsing vector data point for metric: %s", labels())
+		w = InfWarning
 		v = 0.0
 	} else if math.IsNaN(v) {
-		klog.V(1).Infof("[Warning] Found NaN value parsing vector data point for metric: %s", labels())
+		w = NaNWarning
 		v = 0.0
 	}
 
 	return &util.Vector{
 		Timestamp: math.Round(value[0].(float64)/10) * 10,
 		Value:     v,
-	}, nil
+	}, w, nil
 }
 
 func labelsForMetric(metricMap map[string]interface{}) string {
@@ -193,7 +249,7 @@ func labelsForMetric(metricMap map[string]interface{}) string {
 func wrapPrometheusError(qr interface{}) (string, error) {
 	e, ok := qr.(map[string]interface{})["error"]
 	if !ok {
-		return "", fmt.Errorf("Unexpected response from Prometheus")
+		return "", PromUnexpectedResponseErr
 	}
 	eStr, ok := e.(string)
 	return eStr, nil

+ 26 - 0
pkg/prom/warning.go

@@ -0,0 +1,26 @@
+package prom
+
+// warning represents an unexpected result that occurs but doesn't halt processing
+type warning interface {
+	Message() string
+}
+
+// defaultWarning is a simple implementation for warning
+type defaultWarning struct {
+	message string
+}
+
+// Message returns the message for the warning
+func (dw *defaultWarning) Message() string {
+	return dw.message
+}
+
+// Stringer implementation
+func (dw *defaultWarning) String() string {
+	return dw.message
+}
+
+// Creates a warning for the prom package. NOTE: We can make this less prom-centric if desirable.
+func newWarning(msg string) warning {
+	return &defaultWarning{msg}
+}

+ 55 - 0
pkg/thanos/thanos.go

@@ -0,0 +1,55 @@
+package thanos
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/env"
+)
+
+var (
+	lock           = new(sync.Mutex)
+	enabled        = env.IsThanosEnabled()
+	queryUrl       = env.GetThanosQueryUrl()
+	offset         = env.GetThanosOffset()
+	offsetDuration *time.Duration
+	queryOffset    = fmt.Sprintf(" offset %s", offset)
+)
+
+// IsEnabled returns true if Thanos is enabled.
+func IsEnabled() bool {
+	return enabled
+}
+
+// QueryURL returns true if Thanos is enabled.
+func QueryURL() string {
+	return queryUrl
+}
+
+// Offset returns the duration string for the query offset that should be applied to thanos
+func Offset() string {
+	return offset
+}
+
+// OffsetDuration returns the Offset as a parsed duration
+func OffsetDuration() time.Duration {
+	lock.Lock()
+	defer lock.Unlock()
+
+	if offsetDuration == nil {
+		d, err := time.ParseDuration(offset)
+		if err != nil {
+			d = 0
+		}
+
+		offsetDuration = &d
+	}
+
+	return *offsetDuration
+}
+
+// QueryOffset returns a string in the format: " offset %s" substituting in the Offset() string.
+func QueryOffset() string {
+	return queryOffset
+}

+ 33 - 0
pkg/util/http.go

@@ -0,0 +1,33 @@
+package util
+
+import (
+	"fmt"
+	"net/http"
+	"strings"
+)
+
+// HeaderString writes the request/response http.Header to a string.
+func HeaderString(h http.Header) string {
+	var sb strings.Builder
+	var first bool = true
+	sb.WriteString("{ ")
+
+	for k, vs := range h {
+		if first {
+			first = false
+		} else {
+			sb.WriteString(", ")
+		}
+		fmt.Fprintf(&sb, "%s: [ ", k)
+		for idx, v := range vs {
+			sb.WriteString(v)
+			if idx != len(vs)-1 {
+				sb.WriteString(", ")
+			}
+		}
+		sb.WriteString(" ]")
+	}
+	sb.WriteString(" }")
+
+	return sb.String()
+}

+ 48 - 0
test/util_test.go

@@ -0,0 +1,48 @@
+package test
+
+import (
+	"net/http"
+	"testing"
+
+	"github.com/kubecost/cost-model/pkg/util"
+)
+
+func TestHeaderString(t *testing.T) {
+	h := make(http.Header)
+	h.Add("foo", "abc")
+	h.Add("foo", "123")
+	h.Add("bar", "foo")
+	h.Add("Content-Type", "application/octet-stream")
+
+	s := util.HeaderString(h)
+	if len(s) == 0 {
+		t.Errorf("Header String failed to produce a valid output")
+		return
+	}
+
+	t.Logf("Result: %s\n", s)
+}
+
+func TestEmptyHeader(t *testing.T) {
+	h := make(http.Header)
+
+	s := util.HeaderString(h)
+	if len(s) == 0 {
+		t.Errorf("Header String failed to produce a valid output")
+		return
+	}
+
+	t.Logf("Result: %s\n", s)
+}
+
+func TestNilHeader(t *testing.T) {
+	var h http.Header
+
+	s := util.HeaderString(h)
+	if len(s) == 0 {
+		t.Errorf("Header String failed to produce a valid output")
+		return
+	}
+
+	t.Logf("Result: %s\n", s)
+}