Jelajahi Sumber

Merge branch 'develop' into bolt/prom-streamlining

# Conflicts:
#	pkg/costmodel/cluster.go
Matt Bolt 5 tahun lalu
induk
melakukan
385786c262

+ 1 - 0
go.sum

@@ -490,6 +490,7 @@ golang.org/x/tools v0.0.0-20190312170243-e65039ee4138 h1:H3uGjxCR/6Ds0Mjgyp7LMK8
 golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190327201419-c70d86f8b7cf/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/api v0.0.0-20181220000619-583d854617af/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
 google.golang.org/api v0.2.0/go.mod h1:IfRCZScioGtypHNTlz3gFk67J8uePVW7uDTBzXuIkhU=

+ 22 - 28
pkg/cloud/awsprovider.go

@@ -20,6 +20,7 @@ import (
 	"k8s.io/klog"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/util"
 
@@ -37,8 +38,6 @@ import (
 	v1 "k8s.io/api/core/v1"
 )
 
-const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
-const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
 const awsReservedInstancePricePerHour = 0.0287
 const supportedSpotFeedVersion = "1"
 const SpotInfoUpdateType = "spotinfo"
@@ -386,9 +385,8 @@ func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			}
 		}
 
-		remoteEnabled := os.Getenv(remoteEnabled)
-		if remoteEnabled == "true" {
-			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+		if env.IsRemoteEnabled() {
+			err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
 			if err != nil {
 				return err
 			}
@@ -966,17 +964,13 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		return nil, err
 	}
 
-	remote := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if os.Getenv(remote) == "true" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled()
 
 	if c.ClusterName != "" {
 		m := make(map[string]string)
 		m["name"] = c.ClusterName
 		m["provider"] = "AWS"
-		m["id"] = os.Getenv(clusterIDKey)
+		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		return m, nil
 	}
@@ -985,12 +979,12 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 		m := make(map[string]string)
 		m["name"] = clusterName
 		m["provider"] = "AWS"
-		m["id"] = os.Getenv(clusterIDKey)
+		m["id"] = env.GetClusterID()
 		m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 		return m, nil
 	}
 
-	maybeClusterId := os.Getenv(ClusterIdEnvVar)
+	maybeClusterId := env.GetAWSClusterID()
 	if len(maybeClusterId) != 0 {
 		return makeStructure(maybeClusterId)
 	}
@@ -1049,7 +1043,7 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 			}
 		}
 	}*/
-	klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", ClusterIdEnvVar)
+	klog.V(2).Infof("Unable to sniff out cluster ID, perhaps set $%s to force one", env.AWSClusterIDEnvVar)
 	return makeStructure(defaultClusterName)
 }
 
@@ -1067,7 +1061,7 @@ func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string)
 	}
 
 	// 3. Fall back to env vars
-	return os.Getenv(awsAccessKeyIDEnvVar), os.Getenv(awsAccessKeySecretEnvVar)
+	return env.GetAWSAccessKeyID(), env.GetAWSAccessKeySecret()
 }
 
 // Load once and cache the result (even on failure). This is an install time secret, so
@@ -1103,11 +1097,11 @@ func (aws *AWS) configureAWSAuth() error {
 	accessKeyID := aws.ServiceKeyName
 	accessKeySecret := aws.ServiceKeySecret
 	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
-		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
+		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
 		if err != nil {
 			return err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
 		if err != nil {
 			return err
 		}
@@ -1138,7 +1132,7 @@ func getClusterConfig(ccFile string) (map[string]string, error) {
 // a new AWS Session are set.
 func (a *AWS) SetKeyEnv() error {
 	// TODO add this to the helm chart, mirroring the cost-model
-	// configPath := os.Getenv("CONFIG_PATH")
+	// configPath := env.GetConfigPath()
 	configPath := defaultConfigPath
 	path := configPath + "aws.json"
 
@@ -1165,8 +1159,8 @@ func (a *AWS) SetKeyEnv() error {
 	keySecret := configMap["awsServiceKeySecret"]
 
 	// These are required before calling NewEnvCredentials below
-	os.Setenv("AWS_ACCESS_KEY_ID", keyName)
-	os.Setenv("AWS_SECRET_ACCESS_KEY", keySecret)
+	env.Set(env.AWSAccessKeyIDEnvVar, keyName)
+	env.Set(env.AWSAccessKeySecretEnvVar, keySecret)
 
 	return nil
 }
@@ -1420,11 +1414,11 @@ func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutpu
 		return nil, err
 	}
 	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
 		if err != nil {
 			return nil, err
 		}
@@ -1612,11 +1606,11 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 	klog.V(3).Infof("Running Query: %s", query)
 
 	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
 		if err != nil {
 			return nil, err
 		}
@@ -1724,11 +1718,11 @@ func (a *AWS) QuerySQL(query string) ([]byte, error) {
 		return nil, err
 	}
 	if customPricing.ServiceKeyName != "" {
-		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
 		if err != nil {
 			return nil, err
 		}
@@ -1850,11 +1844,11 @@ func (f fnames) Less(i, j int) bool {
 func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
 	// credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
 	if accessKeyID != "" && accessKeySecret != "" {
-		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
+		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
+		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
 		if err != nil {
 			return nil, err
 		}

+ 5 - 10
pkg/cloud/azureprovider.go

@@ -6,13 +6,13 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"os"
 	"regexp"
 	"strconv"
 	"strings"
 	"sync"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
 
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-09-01/skus"
@@ -672,11 +672,7 @@ func (*Azure) GetDisks() ([]byte, error) {
 }
 
 func (az *Azure) ClusterInfo() (map[string]string, error) {
-	remote := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if os.Getenv(remote) == "true" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled()
 
 	m := make(map[string]string)
 	m["name"] = "Azure Cluster #1"
@@ -689,7 +685,7 @@ func (az *Azure) ClusterInfo() (map[string]string, error) {
 	}
 	m["provider"] = "azure"
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
-	m["id"] = os.Getenv(clusterIDKey)
+	m["id"] = env.GetClusterID()
 	return m, nil
 
 }
@@ -725,9 +721,8 @@ func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, e
 			}
 		}
 
-		remoteEnabled := os.Getenv(remoteEnabled)
-		if remoteEnabled == "true" {
-			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+		if env.IsRemoteEnabled() {
+			err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
 			if err != nil {
 				return err
 			}

+ 5 - 2
pkg/cloud/csvprovider.go

@@ -10,9 +10,12 @@ import (
 	"sync"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/env"
+
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/s3"
+	"github.com/kubecost/cost-model/pkg/log"
 	v1 "k8s.io/api/core/v1"
 	"k8s.io/klog"
 
@@ -59,7 +62,7 @@ func (c *CSVProvider) DownloadPricingData() error {
 	var csvr io.Reader
 	var csverr error
 	if strings.HasPrefix(c.CSVLocation, "s3://") {
-		region := os.Getenv("CSV_REGION")
+		region := env.GetCSVRegion()
 		conf := aws.NewConfig().WithRegion(region).WithCredentialsChainVerboseErrors(true)
 		s3Client := s3.New(session.New(conf))
 		bucketAndKey := strings.Split(strings.TrimPrefix(c.CSVLocation, "s3://"), "/")
@@ -138,7 +141,7 @@ func (c *CSVProvider) DownloadPricingData() error {
 		c.Pricing = pricing
 		c.PricingPV = pvpricing
 	} else {
-		klog.Infof("[WARNING] No data received from csv")
+		log.DedupedWarningf(5, "No data received from csv at %s", c.CSVLocation)
 	}
 	time.AfterFunc(refreshMinutes*time.Minute, func() { c.DownloadPricingData() })
 	return nil

+ 3 - 2
pkg/cloud/customprovider.go

@@ -3,12 +3,13 @@ package cloud
 import (
 	"encoding/json"
 	"io"
-	"os"
 	"strconv"
 	"strings"
 	"sync"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
+
 	v1 "k8s.io/api/core/v1"
 )
 
@@ -106,7 +107,7 @@ func (cp *CustomProvider) ClusterInfo() (map[string]string, error) {
 		m["name"] = conf.ClusterName
 	}
 	m["provider"] = "custom"
-	m["id"] = os.Getenv(clusterIDKey)
+	m["id"] = env.GetClusterID()
 	return m, nil
 }
 

+ 43 - 22
pkg/cloud/gcpprovider.go

@@ -8,7 +8,6 @@ import (
 	"io/ioutil"
 	"math"
 	"net/http"
-	"os"
 	"regexp"
 	"strconv"
 	"strings"
@@ -19,8 +18,11 @@ import (
 
 	"cloud.google.com/go/bigquery"
 	"cloud.google.com/go/compute/metadata"
+
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"
 	compute "google.golang.org/api/compute/v1"
@@ -53,6 +55,7 @@ type GCP struct {
 	ReservedInstances       []*GCPReservedInstance
 	Config                  *ProviderConfig
 	serviceKeyProvided      bool
+	ValidPricingKeys        map[string]bool
 	*CustomProvider
 }
 
@@ -184,10 +187,7 @@ func (gcp *GCP) GetManagementPlatform() (string, error) {
 
 // Attempts to load a GCP auth secret and copy the contents to the key file.
 func (*GCP) loadGCPAuthSecret() {
-	path := os.Getenv("CONFIG_PATH")
-	if path == "" {
-		path = "/models/"
-	}
+	path := env.GetConfigPathWithDefault("/models/")
 
 	keyPath := path + "key.json"
 	keyExists, _ := util.FileExists(keyPath)
@@ -241,10 +241,7 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 					return err
 				}
 
-				path := os.Getenv("CONFIG_PATH")
-				if path == "" {
-					path = "/models/"
-				}
+				path := env.GetConfigPathWithDefault("/models/")
 
 				keyPath := path + "key.json"
 				err = ioutil.WriteFile(keyPath, j, 0644)
@@ -291,9 +288,8 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			}
 		}
 
-		remoteEnabled := os.Getenv(remoteEnabled)
-		if remoteEnabled == "true" {
-			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+		if env.IsRemoteEnabled() {
+			err := UpdateClusterMeta(env.GetClusterID(), c.ClusterName)
 			if err != nil {
 				return err
 			}
@@ -468,11 +464,8 @@ func (gcp *GCP) QuerySQL(query string) ([]*OutOfClusterAllocation, error) {
 
 // ClusterName returns the name of a GKE cluster, as provided by metadata.
 func (gcp *GCP) ClusterInfo() (map[string]string, error) {
-	remote := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if os.Getenv(remote) == "true" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled()
+
 	metadataClient := metadata.NewClient(&http.Client{Transport: userAgentTransport{
 		userAgent: "kubecost",
 		base:      http.DefaultTransport,
@@ -494,7 +487,7 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	m := make(map[string]string)
 	m["name"] = attribute
 	m["provider"] = "GCP"
-	m["id"] = os.Getenv(clusterIDKey)
+	m["id"] = env.GetClusterID()
 	m["remoteReadEnabled"] = strconv.FormatBool(remoteEnabled)
 	return m, nil
 }
@@ -715,6 +708,10 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 				}
 
 				candidateKeys := []string{}
+				if gcp.ValidPricingKeys == nil {
+					gcp.ValidPricingKeys = make(map[string]bool)
+				}
+
 				for _, region := range product.ServiceRegions {
 					if instanceType == "e2" { // this needs to be done to handle a partial cpu mapping
 						candidateKeys = append(candidateKeys, region+","+"e2micro"+","+usageType)
@@ -731,6 +728,8 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 					instanceType = strings.Split(candidateKey, ",")[1] // we may have overriden this while generating candidate keys
 					region := strings.Split(candidateKey, ",")[0]
 					candidateKeyGPU := candidateKey + ",gpu"
+					gcp.ValidPricingKeys[candidateKey] = true
+					gcp.ValidPricingKeys[candidateKeyGPU] = true
 					if gpuType != "" {
 						lastRateIndex := len(product.PricingInfo[0].PricingExpression.TieredRates) - 1
 						var nanos float64
@@ -1346,15 +1345,37 @@ func (gcp *GCP) AllNodePricing() (interface{}, error) {
 	return gcp.Pricing, nil
 }
 
-// NodePricing returns GCP pricing data for a single node
-func (gcp *GCP) NodePricing(key Key) (*Node, error) {
+func (gcp *GCP) getPricing(key Key) (*GCPPricing, bool) {
 	gcp.DownloadPricingDataLock.RLock()
 	defer gcp.DownloadPricingDataLock.RUnlock()
-	if n, ok := gcp.Pricing[key.Features()]; ok {
+	n, ok := gcp.Pricing[key.Features()]
+	return n, ok
+}
+func (gcp *GCP) isValidPricingKey(key Key) bool {
+	gcp.DownloadPricingDataLock.RLock()
+	defer gcp.DownloadPricingDataLock.RUnlock()
+	_, ok := gcp.ValidPricingKeys[key.Features()]
+	return ok
+}
+
+// NodePricing returns GCP pricing data for a single node
+func (gcp *GCP) NodePricing(key Key) (*Node, error) {
+	if n, ok := gcp.getPricing(key); ok {
 		klog.V(4).Infof("Returning pricing for node %s: %+v from SKU %s", key, n.Node, n.Name)
 		n.Node.BaseCPUPrice = gcp.BaseCPUPrice
 		return n.Node, nil
+	} else if ok := gcp.isValidPricingKey(key); ok {
+		err := gcp.DownloadPricingData()
+		if err != nil {
+			return nil, fmt.Errorf("Download pricing data failed: %s", err.Error())
+		}
+		if n, ok := gcp.getPricing(key); ok {
+			klog.V(4).Infof("Returning pricing for node %s: %+v from SKU %s", key, n.Node, n.Name)
+			n.Node.BaseCPUPrice = gcp.BaseCPUPrice
+			return n.Node, nil
+		}
+		klog.V(1).Infof("[Warning] no pricing data found for %s: %s", key.Features(), key)
+		return nil, fmt.Errorf("Warning: no pricing data found for %s", key)
 	}
-	klog.V(1).Infof("[Warning] no pricing data found for %s: %s", key.Features(), key)
 	return nil, fmt.Errorf("Warning: no pricing data found for %s", key)
 }

+ 13 - 16
pkg/cloud/provider.go

@@ -5,21 +5,18 @@ import (
 	"errors"
 	"fmt"
 	"io"
-	"os"
 	"strings"
 
 	"k8s.io/klog"
 
 	"cloud.google.com/go/compute/metadata"
+
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 
 	v1 "k8s.io/api/core/v1"
 )
 
-const clusterIDKey = "CLUSTER_ID"
-const remoteEnabled = "REMOTE_WRITE_ENABLED"
-const remotePW = "REMOTE_WRITE_PASSWORD"
-const sqlAddress = "SQL_ADDRESS"
 const authSecretPath = "/var/secrets/service-key.json"
 
 var createTableStatements = []string{
@@ -186,12 +183,12 @@ type Provider interface {
 func ClusterName(p Provider) string {
 	info, err := p.ClusterInfo()
 	if err != nil {
-		return os.Getenv(clusterIDKey)
+		return env.GetClusterID()
 	}
 
 	name, ok := info["name"]
 	if !ok {
-		return os.Getenv(clusterIDKey)
+		return env.GetClusterID()
 	}
 
 	return name
@@ -238,8 +235,8 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 
 	provider := strings.ToLower(nodes[0].Spec.ProviderID)
 
-	if os.Getenv("USE_CSV_PROVIDER") == "true" {
-		klog.Infof("Using CSV Provider with CSV at %s", os.Getenv("CSV_PATH"))
+	if env.IsUseCSVProvider() {
+		klog.Infof("Using CSV Provider with CSV at %s", env.GetCSVPath())
 		configFileName := ""
 		if metadata.OnGCE() {
 			configFileName = "gcp.json"
@@ -252,7 +249,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 			configFileName = "default.json"
 		}
 		return &CSVProvider{
-			CSVLocation: os.Getenv("CSV_PATH"),
+			CSVLocation: env.GetCSVPath(),
 			CustomProvider: &CustomProvider{
 				Clientset: cache,
 				Config:    NewProviderConfig(configFileName),
@@ -293,8 +290,8 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 }
 
 func UpdateClusterMeta(cluster_id, cluster_name string) error {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	if err != nil {
@@ -310,8 +307,8 @@ func UpdateClusterMeta(cluster_id, cluster_name string) error {
 }
 
 func CreateClusterMeta(cluster_id, cluster_name string) error {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	if err != nil {
@@ -333,8 +330,8 @@ func CreateClusterMeta(cluster_id, cluster_name string) error {
 }
 
 func GetClusterMeta(cluster_id string) (string, string, error) {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	defer db.Close()

+ 2 - 5
pkg/cloud/providerconfig.go

@@ -4,12 +4,12 @@ import (
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
-	"os"
 	"reflect"
 	"strconv"
 	"strings"
 	"sync"
 
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
 
 	"k8s.io/klog"
@@ -210,9 +210,6 @@ func fileExists(filename string) (bool, error) {
 
 // Returns the configuration directory concatenated with a specific config file name
 func configPathFor(filename string) string {
-	path := os.Getenv("CONFIG_PATH")
-	if path == "" {
-		path = "/models/"
-	}
+	path := env.GetConfigPathWithDefault("/models/")
 	return path + filename
 }

+ 2 - 2
pkg/clustercache/clustercache.go

@@ -1,9 +1,9 @@
 package clustercache
 
 import (
-	"os"
 	"sync"
 
+	"github.com/kubecost/cost-model/pkg/env"
 	"k8s.io/klog"
 
 	appsv1 "k8s.io/api/apps/v1"
@@ -88,7 +88,7 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 
-	kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
+	kubecostNamespace := env.GetKubecostNamespace()
 	klog.Infof("NAMESPACE: %s", kubecostNamespace)
 
 	kcc := &KubernetesClusterCache{

+ 3 - 2
pkg/costmodel/cluster.go

@@ -2,12 +2,13 @@ package costmodel
 
 import (
 	"fmt"
-	"os"
 	"time"
 
 	"github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
+
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
@@ -215,7 +216,7 @@ func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, wind
 		resChs = append(resChs, bdResChs...)
 	}
 
-	defaultClusterID := os.Getenv(clusterIDKey)
+	defaultClusterID := env.GetClusterID()
 
 	dataMinsByCluster := map[string]float64{}
 	for _, result := range resChs[0].Await() {

+ 31 - 17
pkg/costmodel/costmodel.go

@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"math"
 	"net/http"
-	"os"
 	"strconv"
 	"strings"
 	"sync"
@@ -14,9 +13,11 @@ import (
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/thanos"
 	"github.com/kubecost/cost-model/pkg/util"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	v1 "k8s.io/api/core/v1"
@@ -46,11 +47,6 @@ const (
 	epCleanTombstones = apiPrefix + "/admin/tsdb/clean_tombstones"
 	epConfig          = apiPrefix + "/status/config"
 	epFlags           = apiPrefix + "/status/flags"
-
-	clusterIDKey   = "CLUSTER_ID"
-	remoteEnabled  = "REMOTE_WRITE_ENABLED"
-	thanosEnabled  = "THANOS_ENABLED"
-	thanosQueryUrl = "THANOS_QUERY_URL"
 )
 
 type CostModel struct {
@@ -238,7 +234,7 @@ type PrometheusMetadata struct {
 func ValidatePrometheus(cli prometheusClient.Client, isThanos bool) (*PrometheusMetadata, error) {
 	q := "up"
 	if isThanos {
-		q += " offset 3h"
+		q += thanos.QueryOffset()
 	}
 	data, err := Query(cli, q)
 	if err != nil {
@@ -331,7 +327,7 @@ func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kube
 	normalization := fmt.Sprintf(normalizationStr, window, offset)
 
 	// Cluster ID is specific to the source cluster
-	clusterID := os.Getenv(clusterIDKey)
+	clusterID := env.GetClusterID()
 
 	var wg sync.WaitGroup
 	wg.Add(11)
@@ -1140,7 +1136,7 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 
 		cnode, err := cp.NodePricing(cp.GetKey(nodeLabels, n))
 		if err != nil {
-			klog.V(1).Infof("[Warning] Error getting node pricing. Error: " + err.Error())
+			log.DedupedWarningf(10, "Error getting node pricing. Error: %s", err.Error())
 			if cnode != nil {
 				nodes[name] = cnode
 				continue
@@ -1729,7 +1725,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		klog.V(1).Infof("Error parsing time " + windowString + ". Error: " + err.Error())
 		return nil, err
 	}
-	clusterID := os.Getenv(clusterIDKey)
+	clusterID := env.GetClusterID()
 
 	durHrs := end.Sub(start).Hours() + 1
 
@@ -2623,18 +2619,28 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 
 	resp, body, warnings, err := cli.Do(context.Background(), req)
 	for _, w := range warnings {
-		klog.V(3).Infof("[Warning] '%s' fetching query '%s'", w, query)
+		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
 	}
 	if err != nil {
-		return nil, fmt.Errorf("[Error] %s fetching query %s", err.Error(), query)
+		if resp == nil {
+			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
+		}
+
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
+	}
+
+	// Unsuccessful Status Code, log body and status
+	statusCode := resp.StatusCode
+	statusText := http.StatusText(statusCode)
+	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+		return nil, fmt.Errorf("%d (%s) Headers: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
 	}
 
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("[Error] %d %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
-
 	return toReturn, nil
 }
 
@@ -2655,15 +2661,23 @@ func Query(cli prometheusClient.Client, query string) (interface{}, error) {
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
 		}
 
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
 	}
+
+	// Unsuccessful Status Code, log body and status
+	statusCode := resp.StatusCode
+	statusText := http.StatusText(statusCode)
+	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+		return nil, fmt.Errorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
+	}
+
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
 	return toReturn, nil
 }

+ 31 - 44
pkg/costmodel/router.go

@@ -8,7 +8,6 @@ import (
 	"math"
 	"net"
 	"net/http"
-	"os"
 	"reflect"
 	"strconv"
 	"strings"
@@ -23,9 +22,10 @@ import (
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	cm "github.com/kubecost/cost-model/pkg/clustermanager"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
-	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/thanos"
 	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	v1 "k8s.io/api/core/v1"
@@ -39,21 +39,17 @@ import (
 )
 
 const (
-	logCollectionEnvVar            = "LOG_COLLECTION_ENABLED"
-	productAnalyticsEnvVar         = "PRODUCT_ANALYTICS_ENABLED"
-	errorReportingEnvVar           = "ERROR_REPORTING_ENABLED"
-	maxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
-	prometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
-	prometheusTroubleshootingEp    = "http://docs.kubecost.com/custom-prom#troubleshoot"
-	RFC3339Milli                   = "2006-01-02T15:04:05.000Z"
+	prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
+	RFC3339Milli                = "2006-01-02T15:04:05.000Z"
 )
 
 var (
 	// gitCommit is set by the build system
 	gitCommit               string
-	logCollectionEnabled    bool = strings.EqualFold(os.Getenv(logCollectionEnvVar), "true")
-	productAnalyticsEnabled bool = strings.EqualFold(os.Getenv(productAnalyticsEnvVar), "true")
-	errorReportingEnabled   bool = strings.EqualFold(os.Getenv(errorReportingEnvVar), "true")
+	logCollectionEnabled    bool = env.IsLogCollectionEnabled()
+	productAnalyticsEnabled bool = env.IsProductAnalyticsEnabled()
+	errorReportingEnabled   bool = env.IsErrorReportingEnabled()
+	valuesReportingEnabled  bool = env.IsValuesReportingEnabled()
 )
 
 var Router = httprouter.New()
@@ -152,6 +148,9 @@ func filterFields(fields string, data map[string]*CostData) map[string]CostData
 }
 
 func normalizeTimeParam(param string) (string, error) {
+	if param == "" {
+		return "", fmt.Errorf("invalid time param")
+	}
 	// convert days to hours
 	if param[len(param)-1:] == "d" {
 		count := param[:len(param)-1]
@@ -166,27 +165,12 @@ func normalizeTimeParam(param string) (string, error) {
 	return param, nil
 }
 
-// Parses the max query concurrency environment variable
-func maxQueryConcurrency() int {
-	v := os.Getenv(maxQueryConcurrencyEnvVar)
-	if v == "" {
-		return 5
-	}
-
-	result, err := strconv.Atoi(v)
-	if err != nil {
-		log.Warningf("Failed to parse MAX_QUERY_CONCURRENCY. Defaulting to 5 - %s", err)
-		return 5
-	}
-
-	return result
-}
-
 // writeReportingFlags writes the reporting flags to the cluster info map
 func writeReportingFlags(clusterInfo map[string]string) {
 	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
 	clusterInfo["productAnalytics"] = fmt.Sprintf("%t", productAnalyticsEnabled)
 	clusterInfo["errorReporting"] = fmt.Sprintf("%t", errorReportingEnabled)
+	clusterInfo["valuesReporting"] = fmt.Sprintf("%t", valuesReportingEnabled)
 }
 
 // parsePercentString takes a string of expected format "N%" and returns a floating point 0.0N.
@@ -384,11 +368,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	cluster := r.URL.Query().Get("cluster")
 	remote := r.URL.Query().Get("remote")
 
-	remoteAvailable := os.Getenv(remoteEnabled)
-	remoteEnabled := false
-	if remoteAvailable == "true" && remote != "false" {
-		remoteEnabled = true
-	}
+	remoteEnabled := env.IsRemoteEnabled() && remote != "false"
 
 	// Use Thanos Client if it exists (enabled) and remote flag set
 	var pClient prometheusClient.Client
@@ -646,6 +626,11 @@ func (p *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
 	// Include Product Reporting Flags with Cluster Info
 	writeReportingFlags(data)
 
+	// Include Thanos Offset Duration if Applicable
+	if thanos.IsEnabled() {
+		data["thanosOffset"] = thanos.Offset()
+	}
+
 	w.Write(WrapData(data, err))
 }
 
@@ -876,7 +861,7 @@ func newClusterManager() *cm.ClusterManager {
 	// NOTE: configmap approach is currently the "persistent" source (entries are read-only
 	// NOTE: on the backend), we don't currently need to store on disk.
 	/*
-		path := os.Getenv("CONFIG_PATH")
+		path := env.GetConfigPath()
 		db, err := bolt.Open(path+"costmodel.db", 0600, nil)
 		if err != nil {
 			klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
@@ -946,12 +931,13 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		}
 	}
 
-	address := os.Getenv(prometheusServerEndpointEnvVar)
+	address := env.GetPrometheusServerEndpoint()
 	if address == "" {
-		klog.Fatalf("No address for prometheus set in $%s. Aborting.", prometheusServerEndpointEnvVar)
+		klog.Fatalf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
 	}
 
-	queryConcurrency := maxQueryConcurrency()
+	queryConcurrency := env.GetMaxQueryConcurrency()
+	klog.Infof("Prometheus/Thanos Client Max Concurrency set to %d", queryConcurrency)
 
 	var LongTimeoutRoundTripper http.RoundTripper = &http.Transport{ // may be necessary for long prometheus queries. TODO: make this configurable
 		Proxy: http.ProxyFromEnvironment,
@@ -1000,7 +986,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
 	k8sCache.Run()
 
-	cloudProviderKey := os.Getenv("CLOUD_PROVIDER_API_KEY")
+	cloudProviderKey := env.GetCloudProviderAPIKey()
 	cloudProvider, err := costAnalyzerCloud.NewProvider(k8sCache, cloudProviderKey)
 	if err != nil {
 		panic(err.Error())
@@ -1023,7 +1009,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 			}
 		}
 	}
-	kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
+	kubecostNamespace := env.GetKubecostNamespace()
 	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
 	configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get("pricing-configs", metav1.GetOptions{})
 	if err != nil {
@@ -1150,8 +1136,8 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 		OutOfClusterCache:             outOfClusterCache,
 	}
 
-	remoteEnabled := os.Getenv(remoteEnabled)
-	if remoteEnabled == "true" {
+	remoteEnabled := env.IsRemoteEnabled()
+	if remoteEnabled {
 		info, err := cloudProvider.ClusterInfo()
 		klog.Infof("Saving cluster  with id:'%s', and name:'%s' to durable storage", info["id"], info["name"])
 		if err != nil {
@@ -1164,8 +1150,9 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	}
 
 	// Thanos Client
-	if os.Getenv(thanosEnabled) == "true" {
-		thanosUrl := os.Getenv(thanosQueryUrl)
+	if thanos.IsEnabled() {
+		thanosUrl := thanos.QueryURL()
+
 		if thanosUrl != "" {
 			var thanosRT http.RoundTripper = &http.Transport{
 				Proxy: http.ProxyFromEnvironment,
@@ -1193,7 +1180,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 			}
 
 		} else {
-			klog.Infof("Error resolving environment variable: $%s", thanosQueryUrl)
+			klog.Infof("Error resolving environment variable: $%s", env.ThanosQueryUrlEnvVar)
 		}
 	}
 

+ 4 - 6
pkg/costmodel/sql.go

@@ -4,19 +4,17 @@ import (
 	"database/sql"
 	"encoding/json"
 	"fmt"
-	"os"
 	"time"
 
 	"k8s.io/klog"
 
 	costAnalyzerCloud "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/util"
+
 	_ "github.com/lib/pq"
 )
 
-const remotePW = "REMOTE_WRITE_PASSWORD"
-const sqlAddress = "SQL_ADDRESS"
-
 func getPVCosts(db *sql.DB) (map[string]*costAnalyzerCloud.PV, error) {
 	pvs := make(map[string]*costAnalyzerCloud.PV)
 	query := `SELECT name, avg(value),labels->>'volumename' AS volumename, labels->>'cluster_id' AS clusterid
@@ -94,8 +92,8 @@ func getNodeCosts(db *sql.DB) (map[string]*costAnalyzerCloud.Node, error) {
 }
 
 func CostDataRangeFromSQL(field string, value string, window string, start string, end string) (map[string]*CostData, error) {
-	pw := os.Getenv(remotePW)
-	address := os.Getenv(sqlAddress)
+	pw := env.GetRemotePW()
+	address := env.GetSQLAddress()
 	connStr := fmt.Sprintf("postgres://postgres:%s@%s:5432?sslmode=disable", pw, address)
 	db, err := sql.Open("postgres", connStr)
 	defer db.Close()

+ 163 - 0
pkg/env/costmodelenv.go

@@ -0,0 +1,163 @@
+package env
+
+const (
+	AWSAccessKeyIDEnvVar     = "AWS_ACCESS_KEY_ID"
+	AWSAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
+	AWSClusterIDEnvVar       = "AWS_CLUSTER_ID"
+
+	KubecostNamespaceEnvVar        = "KUBECOST_NAMESPACE"
+	ClusterIDEnvVar                = "CLUSTER_ID"
+	PrometheusServerEndpointEnvVar = "PROMETHEUS_SERVER_ENDPOINT"
+	MaxQueryConcurrencyEnvVar      = "MAX_QUERY_CONCURRENCY"
+	RemoteEnabledEnvVar            = "REMOTE_WRITE_ENABLED"
+	RemotePWEnvVar                 = "REMOTE_WRITE_PASSWORD"
+	SQLAddressEnvVar               = "SQL_ADDRESS"
+	UseCSVProviderEnvVar           = "USE_CSV_PROVIDER"
+	CSVRegionEnvVar                = "CSV_REGION"
+	CSVPathEnvVar                  = "CSV_PATH"
+	ConfigPathEnvVar               = "CONFIG_PATH"
+	CloudProviderAPIKeyEnvVar      = "CLOUD_PROVIDER_API_KEY"
+
+	ThanosEnabledEnvVar  = "THANOS_ENABLED"
+	ThanosQueryUrlEnvVar = "THANOS_QUERY_URL"
+	ThanosOffsetEnvVar   = "THANOS_QUERY_OFFSET"
+
+	LogCollectionEnabledEnvVar    = "LOG_COLLECTION_ENABLED"
+	ProductAnalyticsEnabledEnvVar = "PRODUCT_ANALYTICS_ENABLED"
+	ErrorReportingEnabledEnvVar   = "ERROR_REPORTING_ENABLED"
+	ValuesReportingEnabledEnvVar  = "VALUES_REPORTING_ENABLED"
+)
+
+// GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
+// the AWS access key for authentication
+func GetAWSAccessKeyID() string {
+	return Get(AWSAccessKeyIDEnvVar, "")
+}
+
+// GetAWSAccessKeySecret returns the environment variable value for AWSAccessKeySecretEnvVar which represents
+// the AWS access key secret for authentication
+func GetAWSAccessKeySecret() string {
+	return Get(AWSAccessKeySecretEnvVar, "")
+}
+
+// GetAWSClusterID returns the environment variable value for AWSClusterIDEnvVar which represents
+// an AWS specific cluster identifier.
+func GetAWSClusterID() string {
+	return Get(AWSClusterIDEnvVar, "")
+}
+
+// GetKubecostNamespace returns the environment variable value for KubecostNamespaceEnvVar which
+// represents the namespace the cost model exists in.
+func GetKubecostNamespace() string {
+	return Get(KubecostNamespaceEnvVar, "kubecost")
+}
+
+// GetClusterID returns the environment variable value for ClusterIDEnvVar which represents the
+// configurable identifier used for multi-cluster metric emission.
+func GetClusterID() string {
+	return Get(ClusterIDEnvVar, "")
+}
+
+// GetPrometheusServerEndpoint returns the environment variable value for PrometheusServerEndpointEnvVar which
+// represents the prometheus server endpoint used to execute prometheus queries.
+func GetPrometheusServerEndpoint() string {
+	return Get(PrometheusServerEndpointEnvVar, "")
+}
+
+// IsRemoteEnabled returns the environment variable value for RemoteEnabledEnvVar which represents whether
+// or not remote write is enabled for prometheus for use with SQL backed persistent storage.
+func IsRemoteEnabled() bool {
+	return GetBool(RemoteEnabledEnvVar, false)
+}
+
+// GetRemotePW returns the environment variable value for RemotePWEnvVar which represents the remote
+// persistent storage password.
+func GetRemotePW() string {
+	return Get(RemotePWEnvVar, "")
+}
+
+// GetSQLAddress returns the environment variable value for SQLAddressEnvVar which represents the SQL
+// database address used with remote persistent storage.
+func GetSQLAddress() string {
+	return Get(SQLAddressEnvVar, "")
+}
+
+// IsUseCSVProvider returns the environment variable value for UseCSVProviderEnvVar which represents
+// whether or not the use of a CSV cost provider is enabled.
+func IsUseCSVProvider() bool {
+	return GetBool(UseCSVProviderEnvVar, false)
+}
+
+// GetCSVRegion returns the environment variable value for CSVRegionEnvVar which represents the
+// region configured for a CSV provider.
+func GetCSVRegion() string {
+	return Get(CSVRegionEnvVar, "")
+}
+
+// GetCSVPath returns the environment variable value for CSVPathEnvVar which represents the key path
+// configured for a CSV provider.
+func GetCSVPath() string {
+	return Get(CSVPathEnvVar, "")
+}
+
+// GetConfigPath returns the environment variable value for ConfigPathEnvVar which represents the cost
+// model configuration path
+func GetConfigPath() string {
+	return Get(ConfigPathEnvVar, "")
+}
+
+// GetConfigPath returns the environment variable value for ConfigPathEnvVar which represents the cost
+// model configuration path
+func GetConfigPathWithDefault(defaultValue string) string {
+	return Get(ConfigPathEnvVar, defaultValue)
+}
+
+// GetCloudProviderAPI returns the environment variable value for CloudProviderAPIEnvVar which represents
+// the API key provided for the cloud provider.
+func GetCloudProviderAPIKey() string {
+	return Get(CloudProviderAPIKeyEnvVar, "")
+}
+
+// IsThanosEnabled returns the environment variable value for ThanosEnabledEnvVar which represents whether
+// or not thanos is enabled.
+func IsThanosEnabled() bool {
+	return GetBool(ThanosEnabledEnvVar, false)
+}
+
+// GetThanosQueryUrl returns the environment variable value for ThanosQueryUrlEnvVar which represents the
+// target query endpoint for hitting thanos.
+func GetThanosQueryUrl() string {
+	return Get(ThanosQueryUrlEnvVar, "")
+}
+
+// GetThanosOffset returns the environment variable value for ThanosOffsetEnvVar which represents the total
+// amount of time to offset all queries made to thanos.
+func GetThanosOffset() string {
+	return Get(ThanosOffsetEnvVar, "3h")
+}
+
+// IsLogCollectionEnabled returns the environment variable value for LogCollectionEnabledEnvVar which represents
+// whether or not log collection has been enabled for kubecost deployments.
+func IsLogCollectionEnabled() bool {
+	return GetBool(LogCollectionEnabledEnvVar, true)
+}
+
+// IsProductAnalyticsEnabled returns the environment variable value for ProductAnalyticsEnabledEnvVar
+func IsProductAnalyticsEnabled() bool {
+	return GetBool(ProductAnalyticsEnabledEnvVar, true)
+}
+
+// IsErrorReportingEnabled returns the environment variable value for ErrorReportingEnabledEnvVar
+func IsErrorReportingEnabled() bool {
+	return GetBool(ErrorReportingEnabledEnvVar, true)
+}
+
+// IsValuesReportingEnabled returns the environment variable value for ValuesReportingEnabledEnvVar
+func IsValuesReportingEnabled() bool {
+	return GetBool(ValuesReportingEnabledEnvVar, true)
+}
+
+// GetMaxQueryConcurrency returns the environment variable value for MaxQueryConcurrencyEnvVar
+func GetMaxQueryConcurrency() int {
+	return GetInt(MaxQueryConcurrencyEnvVar, 5)
+}

+ 233 - 0
pkg/env/env.go

@@ -0,0 +1,233 @@
+package env
+
+import (
+	"os"
+	"strconv"
+)
+
+// Get parses an string from the environment variable key parameter. If the environment
+// variable is empty, the defaultValue parameter is returned.
+func Get(key string, defaultValue string) string {
+	r := os.Getenv(key)
+	if r == "" {
+		return defaultValue
+	}
+
+	return r
+}
+
+// GetInt parses an int from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt(key string, defaultValue int) int {
+	r := os.Getenv(key)
+	i, err := strconv.Atoi(r)
+	if err != nil {
+		return defaultValue
+	}
+
+	return i
+}
+
+// GetInt8 parses an int8 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt8(key string, defaultValue int8) int8 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 8)
+	if err != nil {
+		return defaultValue
+	}
+
+	return int8(i)
+}
+
+// GetInt16 parses an int16 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt16(key string, defaultValue int16) int16 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 16)
+	if err != nil {
+		return defaultValue
+	}
+
+	return int16(i)
+}
+
+// GetInt32 parses an int32 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt32(key string, defaultValue int32) int32 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return int32(i)
+}
+
+// GetInt64 parses an int64 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetInt64(key string, defaultValue int64) int64 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseInt(r, 10, 64)
+	if err != nil {
+		return defaultValue
+	}
+
+	return i
+}
+
+// GetUInt parses a uint from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt(key string, defaultValue uint) uint {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint(i)
+}
+
+// GetUInt8 parses a uint8 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt8(key string, defaultValue uint8) uint8 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 8)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint8(i)
+}
+
+// GetUInt16 parses a uint16 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt16(key string, defaultValue uint16) uint16 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 16)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint16(i)
+}
+
+// GetUInt32 parses a uint32 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt32(key string, defaultValue uint32) uint32 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint32(i)
+}
+
+// GetUInt64 parses a uint64 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetUInt64(key string, defaultValue uint64) uint64 {
+	r := os.Getenv(key)
+	i, err := strconv.ParseUint(r, 10, 64)
+	if err != nil {
+		return defaultValue
+	}
+
+	return uint64(i)
+}
+
+// GetFloat32 parses a float32 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetFloat32(key string, defaultValue float32) float32 {
+	r := os.Getenv(key)
+	f, err := strconv.ParseFloat(r, 32)
+	if err != nil {
+		return defaultValue
+	}
+
+	return float32(f)
+}
+
+// GetFloat64 parses a float64 from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetFloat64(key string, defaultValue float64) float64 {
+	r := os.Getenv(key)
+	f, err := strconv.ParseFloat(r, 64)
+	if err != nil {
+		return defaultValue
+	}
+
+	return f
+}
+
+// GetBool parses a bool from the environment variable key parameter. If the environment
+// variable is empty or fails to parse, the defaultValue parameter is returned.
+func GetBool(key string, defaultValue bool) bool {
+	r := os.Getenv(key)
+	b, err := strconv.ParseBool(r)
+	if err != nil {
+		return defaultValue
+	}
+
+	return b
+}
+
+// Set sets the environment variable for the key provided using the value provided.
+func Set(key string, value string) error {
+	return os.Setenv(key, value)
+}
+
+// SetInt sets the environment variable to a string formatted int value
+func SetInt(key string, value int) error {
+	return os.Setenv(key, strconv.Itoa(value))
+}
+
+// SetInt8 sets the environment variable to a string formatted int8 value.
+func SetInt8(key string, value int8) error {
+	return os.Setenv(key, strconv.FormatInt(int64(value), 10))
+}
+
+// SetInt16 sets the environment variable to a string formatted int16 value.
+func SetInt16(key string, value int16) error {
+	return os.Setenv(key, strconv.FormatInt(int64(value), 10))
+}
+
+// SetInt32 sets the environment variable to a string formatted int32 value.
+func SetInt32(key string, value int32) error {
+	return os.Setenv(key, strconv.FormatInt(int64(value), 10))
+}
+
+// SetInt64 sets the environment variable to a string formatted int64 value.
+func SetInt64(key string, value int64) error {
+	return os.Setenv(key, strconv.FormatInt(value, 10))
+}
+
+// SetUInt sets the environment variable to a string formatted uint value
+func SetUInt(key string, value uint) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt8 sets the environment variable to a string formatted uint8 value
+func SetUInt8(key string, value uint8) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt16 sets the environment variable to a string formatted uint16 value
+func SetUInt16(key string, value uint16) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt32 sets the environment variable to a string formatted uint32 value
+func SetUInt32(key string, value uint32) error {
+	return os.Setenv(key, strconv.FormatUint(uint64(value), 10))
+}
+
+// SetUInt64 sets the environment variable to a string formatted uint64 value
+func SetUInt64(key string, value uint64) error {
+	return os.Setenv(key, strconv.FormatUint(value, 10))
+}
+
+// SetBool sets the environment variable to a string formatted bool value.
+func SetBool(key string, value bool) error {
+	return os.Setenv(key, strconv.FormatBool(value))
+}

+ 50 - 0
pkg/log/log.go

@@ -7,18 +7,68 @@ import (
 	"k8s.io/klog"
 )
 
+var seen = make(map[string]int)
+
 func Errorf(format string, a ...interface{}) {
 	klog.Errorf(fmt.Sprintf("[Error] %s", format), a...)
 }
 
+func DedupedErrorf(logTypeLimit int, format string, a ...interface{}) {
+	timesLogged, ok := seen[format]
+	if !ok {
+		seen[format] = 1
+	} else if timesLogged == logTypeLimit {
+		seen[format]++
+		f := fmt.Sprintf("[Error] %s", format)
+		klog.Errorf("%s seen %d times, suppressing future logs", f, logTypeLimit)
+	} else if timesLogged > logTypeLimit {
+		seen[format]++
+	} else {
+		seen[format]++
+		klog.Errorf(fmt.Sprintf("[Error] %s", format), a...)
+	}
+}
+
 func Warningf(format string, a ...interface{}) {
 	klog.V(2).Infof(fmt.Sprintf("[Warning] %s", format), a...)
 }
 
+func DedupedWarningf(logTypeLimit int, format string, a ...interface{}) {
+	timesLogged, ok := seen[format]
+	if !ok {
+		seen[format] = 1
+	} else if timesLogged == logTypeLimit {
+		seen[format]++
+		f := fmt.Sprintf("[Warning] %s", format)
+		klog.Errorf("%s seen %d times, suppressing future logs", f, logTypeLimit)
+	} else if timesLogged > logTypeLimit {
+		seen[format]++
+	} else {
+		seen[format]++
+		klog.V(2).Infof(fmt.Sprintf("[Warning] %s", format), a...)
+	}
+}
+
 func Infof(format string, a ...interface{}) {
 	klog.V(3).Infof(fmt.Sprintf("[Info] %s", format), a...)
 }
 
+func DedupedInfof(logTypeLimit int, format string, a ...interface{}) {
+	timesLogged, ok := seen[format]
+	if !ok {
+		seen[format] = 1
+	} else if timesLogged == logTypeLimit {
+		seen[format]++
+		f := fmt.Sprintf("[Info] %s", format)
+		klog.Errorf("%s seen %d times, suppressing future logs", f, logTypeLimit)
+	} else if timesLogged > logTypeLimit {
+		seen[format]++
+	} else {
+		seen[format]++
+		klog.V(3).Infof(fmt.Sprintf("[Info] %s", format), a...)
+	}
+}
+
 func Profilef(format string, a ...interface{}) {
 	klog.V(3).Infof(fmt.Sprintf("[Profiler] %s", format), a...)
 }

+ 12 - 3
pkg/prom/query.go

@@ -7,6 +7,7 @@ import (
 	"net/http"
 
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/util"
 	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
@@ -92,15 +93,23 @@ func (ctx *Context) query(query string) (interface{}, error) {
 	}
 	if err != nil {
 		if resp == nil {
-			return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+			return nil, fmt.Errorf("Error: %s, Body: %s Query: %s", err.Error(), body, query)
 		}
 
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", resp.StatusCode, http.StatusText(resp.StatusCode), util.HeaderString(resp.Header), body, err.Error(), query)
 	}
+
+	// Unsuccessful Status Code, log body and status
+	statusCode := resp.StatusCode
+	statusText := http.StatusText(statusCode)
+	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+		return nil, fmt.Errorf("%d (%s) Headers: %s, Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), body, query)
+	}
+
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		return nil, fmt.Errorf("%d (%s) Headers: %s Error: %s Body: %s Query: %s", statusCode, statusText, util.HeaderString(resp.Header), err.Error(), body, query)
 	}
 	return toReturn, nil
 }

+ 55 - 0
pkg/thanos/thanos.go

@@ -0,0 +1,55 @@
+package thanos
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/env"
+)
+
+var (
+	lock           = new(sync.Mutex)
+	enabled        = env.IsThanosEnabled()
+	queryUrl       = env.GetThanosQueryUrl()
+	offset         = env.GetThanosOffset()
+	offsetDuration *time.Duration
+	queryOffset    = fmt.Sprintf(" offset %s", offset)
+)
+
+// IsEnabled returns true if Thanos is enabled.
+func IsEnabled() bool {
+	return enabled
+}
+
+// QueryURL returns true if Thanos is enabled.
+func QueryURL() string {
+	return queryUrl
+}
+
+// Offset returns the duration string for the query offset that should be applied to thanos
+func Offset() string {
+	return offset
+}
+
+// OffsetDuration returns the Offset as a parsed duration
+func OffsetDuration() time.Duration {
+	lock.Lock()
+	defer lock.Unlock()
+
+	if offsetDuration == nil {
+		d, err := time.ParseDuration(offset)
+		if err != nil {
+			d = 0
+		}
+
+		offsetDuration = &d
+	}
+
+	return *offsetDuration
+}
+
+// QueryOffset returns a string in the format: " offset %s" substituting in the Offset() string.
+func QueryOffset() string {
+	return queryOffset
+}

+ 33 - 0
pkg/util/http.go

@@ -0,0 +1,33 @@
+package util
+
+import (
+	"fmt"
+	"net/http"
+	"strings"
+)
+
+// HeaderString writes the request/response http.Header to a string.
+func HeaderString(h http.Header) string {
+	var sb strings.Builder
+	var first bool = true
+	sb.WriteString("{ ")
+
+	for k, vs := range h {
+		if first {
+			first = false
+		} else {
+			sb.WriteString(", ")
+		}
+		fmt.Fprintf(&sb, "%s: [ ", k)
+		for idx, v := range vs {
+			sb.WriteString(v)
+			if idx != len(vs)-1 {
+				sb.WriteString(", ")
+			}
+		}
+		sb.WriteString(" ]")
+	}
+	sb.WriteString(" }")
+
+	return sb.String()
+}

+ 48 - 0
test/util_test.go

@@ -0,0 +1,48 @@
+package test
+
+import (
+	"net/http"
+	"testing"
+
+	"github.com/kubecost/cost-model/pkg/util"
+)
+
+func TestHeaderString(t *testing.T) {
+	h := make(http.Header)
+	h.Add("foo", "abc")
+	h.Add("foo", "123")
+	h.Add("bar", "foo")
+	h.Add("Content-Type", "application/octet-stream")
+
+	s := util.HeaderString(h)
+	if len(s) == 0 {
+		t.Errorf("Header String failed to produce a valid output")
+		return
+	}
+
+	t.Logf("Result: %s\n", s)
+}
+
+func TestEmptyHeader(t *testing.T) {
+	h := make(http.Header)
+
+	s := util.HeaderString(h)
+	if len(s) == 0 {
+		t.Errorf("Header String failed to produce a valid output")
+		return
+	}
+
+	t.Logf("Result: %s\n", s)
+}
+
+func TestNilHeader(t *testing.T) {
+	var h http.Header
+
+	s := util.HeaderString(h)
+	if len(s) == 0 {
+		t.Errorf("Header String failed to produce a valid output")
+		return
+	}
+
+	t.Logf("Result: %s\n", s)
+}