Просмотр исходного кода

Merge pull request #320 from kubecost/develop

Develop
Ajay Tripathy 6 лет назад
Родитель
Сommit
a24bfe2983
11 измененных файлов с 897 добавлено и 507 удалено
  1. 66 88
      cloud/awsprovider.go
  2. 36 59
      cloud/azureprovider.go
  3. 34 48
      cloud/customprovider.go
  4. 125 105
      cloud/gcpprovider.go
  5. 5 151
      cloud/provider.go
  6. 216 0
      cloud/providerconfig.go
  7. 278 33
      costmodel/cluster.go
  8. 9 22
      costmodel/costmodel.go
  9. 1 1
      costmodel/router.go
  10. 39 0
      util/errors.go
  11. 88 0
      util/time.go

+ 66 - 88
cloud/awsprovider.go

@@ -61,6 +61,7 @@ type AWS struct {
 	ProjectID               string
 	DownloadPricingDataLock sync.RWMutex
 	ReservedInstances       []*AWSReservedInstance
+	Config                  *ProviderConfig
 	*CustomProvider
 }
 
@@ -206,8 +207,8 @@ var regionToBillingRegionCode = map[string]string{
 	"us-gov-west-1":  "UGW1",
 }
 
-func (aws *AWS) GetLocalStorageQuery(offset string) (string, error) {
-	return "", nil
+func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool) string {
+	return ""
 }
 
 // KubeAttrConversion maps the k8s labels for region to an aws region
@@ -256,7 +257,7 @@ func (aws *AWS) GetManagementPlatform() (string, error) {
 }
 
 func (aws *AWS) GetConfig() (*CustomPricing, error) {
-	c, err := GetCustomPricingData("aws.json")
+	c, err := aws.Config.GetCustomPricingData()
 	if c.Discount == "" {
 		c.Discount = "0%"
 	}
@@ -269,98 +270,75 @@ func (aws *AWS) GetConfig() (*CustomPricing, error) {
 	return c, nil
 }
 func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
-	c, err := GetCustomPricingData("aws.json")
-	if err != nil {
-		return nil, err
-	}
-
-	return configmapUpdate(c, configPathFor("aws.json"), a)
+	return aws.Config.UpdateFromMap(a)
 }
 
 func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
-	c, err := GetCustomPricingData("aws.json")
-	if err != nil {
-		return nil, err
-	}
-	if updateType == SpotInfoUpdateType {
-		a := AwsSpotFeedInfo{}
-		err := json.NewDecoder(r).Decode(&a)
-		if err != nil {
-			return nil, err
-		}
+	return aws.Config.Update(func(c *CustomPricing) error {
+		if updateType == SpotInfoUpdateType {
+			a := AwsSpotFeedInfo{}
+			err := json.NewDecoder(r).Decode(&a)
+			if err != nil {
+				return err
+			}
 
-		if err != nil {
-			return nil, err
-		}
-		c.ServiceKeyName = a.ServiceKeyName
-		c.ServiceKeySecret = a.ServiceKeySecret
-		c.SpotDataPrefix = a.Prefix
-		c.SpotDataBucket = a.BucketName
-		c.ProjectID = a.AccountID
-		c.SpotDataRegion = a.Region
-		c.SpotLabel = a.SpotLabel
-		c.SpotLabelValue = a.SpotLabelValue
-
-	} else if updateType == AthenaInfoUpdateType {
-		a := AwsAthenaInfo{}
-		err := json.NewDecoder(r).Decode(&a)
-		if err != nil {
-			return nil, err
-		}
-		c.AthenaBucketName = a.AthenaBucketName
-		c.AthenaRegion = a.AthenaRegion
-		c.AthenaDatabase = a.AthenaDatabase
-		c.AthenaTable = a.AthenaTable
-		c.ServiceKeyName = a.ServiceKeyName
-		c.ServiceKeySecret = a.ServiceKeySecret
-		c.ProjectID = a.AccountID
-	} else {
-		a := make(map[string]interface{})
-		err = json.NewDecoder(r).Decode(&a)
-		if err != nil {
-			return nil, err
-		}
-		for k, v := range a {
-			kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
-			vstr, ok := v.(string)
-			if ok {
-				err := SetCustomPricingField(c, kUpper, vstr)
-				if err != nil {
-					return nil, err
-				}
-			} else {
-				sci := v.(map[string]interface{})
-				sc := make(map[string]string)
-				for k, val := range sci {
-					sc[k] = val.(string)
+			c.ServiceKeyName = a.ServiceKeyName
+			c.ServiceKeySecret = a.ServiceKeySecret
+			c.SpotDataPrefix = a.Prefix
+			c.SpotDataBucket = a.BucketName
+			c.ProjectID = a.AccountID
+			c.SpotDataRegion = a.Region
+			c.SpotLabel = a.SpotLabel
+			c.SpotLabelValue = a.SpotLabelValue
+
+		} else if updateType == AthenaInfoUpdateType {
+			a := AwsAthenaInfo{}
+			err := json.NewDecoder(r).Decode(&a)
+			if err != nil {
+				return err
+			}
+			c.AthenaBucketName = a.AthenaBucketName
+			c.AthenaRegion = a.AthenaRegion
+			c.AthenaDatabase = a.AthenaDatabase
+			c.AthenaTable = a.AthenaTable
+			c.ServiceKeyName = a.ServiceKeyName
+			c.ServiceKeySecret = a.ServiceKeySecret
+			c.ProjectID = a.AccountID
+		} else {
+			a := make(map[string]interface{})
+			err := json.NewDecoder(r).Decode(&a)
+			if err != nil {
+				return err
+			}
+			for k, v := range a {
+				kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+				vstr, ok := v.(string)
+				if ok {
+					err := SetCustomPricingField(c, kUpper, vstr)
+					if err != nil {
+						return err
+					}
+				} else {
+					sci := v.(map[string]interface{})
+					sc := make(map[string]string)
+					for k, val := range sci {
+						sc[k] = val.(string)
+					}
+					c.SharedCosts = sc //todo: support reflection/multiple map fields
 				}
-				c.SharedCosts = sc //todo: support reflection/multiple map fields
 			}
 		}
-	}
-	cj, err := json.Marshal(c)
-	if err != nil {
-		return nil, err
-	}
 
-	path := configPathFor("aws.json")
-
-	remoteEnabled := os.Getenv(remoteEnabled)
-	if remoteEnabled == "true" {
-		err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
-		if err != nil {
-			return nil, err
+		remoteEnabled := os.Getenv(remoteEnabled)
+		if remoteEnabled == "true" {
+			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+			if err != nil {
+				return err
+			}
 		}
-	}
 
-	configLock.Lock()
-	err = ioutil.WriteFile(path, cj, 0644)
-	configLock.Unlock()
-
-	if err != nil {
-		return nil, err
-	}
-	return c, nil
+		return nil
+	})
 }
 
 type awsKey struct {
@@ -477,7 +455,7 @@ func (aws *AWS) isPreemptible(key string) bool {
 func (aws *AWS) DownloadPricingData() error {
 	aws.DownloadPricingDataLock.Lock()
 	defer aws.DownloadPricingDataLock.Unlock()
-	c, err := GetCustomPricingData("aws.json")
+	c, err := aws.Config.GetCustomPricingData()
 	if err != nil {
 		klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
 	}
@@ -700,8 +678,8 @@ func (aws *AWS) DownloadPricingData() error {
 }
 
 // Stubbed NetworkPricing for AWS. Pull directly from aws.json for now
-func (c *AWS) NetworkPricing() (*Network, error) {
-	cpricing, err := GetCustomPricingData("aws.json")
+func (aws *AWS) NetworkPricing() (*Network, error) {
+	cpricing, err := aws.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}

+ 36 - 59
cloud/azureprovider.go

@@ -5,7 +5,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"io"
-	"io/ioutil"
 	"net/url"
 	"os"
 	"regexp"
@@ -166,6 +165,7 @@ type Azure struct {
 	allPrices               map[string]*Node
 	DownloadPricingDataLock sync.RWMutex
 	Clientset               clustercache.ClusterCache
+	Config                  *ProviderConfig
 }
 
 type azureKey struct {
@@ -462,8 +462,8 @@ func (az *Azure) NodePricing(key Key) (*Node, error) {
 }
 
 // Stubbed NetworkPricing for Azure. Pull directly from azure.json for now
-func (c *Azure) NetworkPricing() (*Network, error) {
-	cpricing, err := GetCustomPricingData("azure.json")
+func (az *Azure) NetworkPricing() (*Network, error) {
+	cpricing, err := az.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}
@@ -547,72 +547,49 @@ func (az *Azure) AddServiceKey(url url.Values) error {
 }
 
 func (az *Azure) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
-	c, err := GetCustomPricingData("azure.json")
-	if err != nil {
-		return nil, err
-	}
-
-	return configmapUpdate(c, configPathFor("azure.json"), a)
+	return az.Config.UpdateFromMap(a)
 }
 
 func (az *Azure) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	defer az.DownloadPricingData()
 
-	c, err := GetCustomPricingData("azure.json")
-	if err != nil {
-		return nil, err
-	}
-
-	a := make(map[string]interface{})
-	err = json.NewDecoder(r).Decode(&a)
-	if err != nil {
-		return nil, err
-	}
-	for k, v := range a {
-		kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
-		vstr, ok := v.(string)
-		if ok {
-			err := SetCustomPricingField(c, kUpper, vstr)
-			if err != nil {
-				return nil, err
-			}
-		} else {
-			sci := v.(map[string]interface{})
-			sc := make(map[string]string)
-			for k, val := range sci {
-				sc[k] = val.(string)
+	return az.Config.Update(func(c *CustomPricing) error {
+		a := make(map[string]interface{})
+		err := json.NewDecoder(r).Decode(&a)
+		if err != nil {
+			return err
+		}
+		for k, v := range a {
+			kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+			vstr, ok := v.(string)
+			if ok {
+				err := SetCustomPricingField(c, kUpper, vstr)
+				if err != nil {
+					return err
+				}
+			} else {
+				sci := v.(map[string]interface{})
+				sc := make(map[string]string)
+				for k, val := range sci {
+					sc[k] = val.(string)
+				}
+				c.SharedCosts = sc //todo: support reflection/multiple map fields
 			}
-			c.SharedCosts = sc //todo: support reflection/multiple map fields
 		}
-	}
-
-	cj, err := json.Marshal(c)
-	if err != nil {
-		return nil, err
-	}
 
-	remoteEnabled := os.Getenv(remoteEnabled)
-	if remoteEnabled == "true" {
-		err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
-		if err != nil {
-			return nil, err
+		remoteEnabled := os.Getenv(remoteEnabled)
+		if remoteEnabled == "true" {
+			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+			if err != nil {
+				return err
+			}
 		}
-	}
-
-	configPath := configPathFor("azure.json")
-
-	configLock.Lock()
-	err = ioutil.WriteFile(configPath, cj, 0644)
-	configLock.Unlock()
-
-	if err != nil {
-		return nil, err
-	}
 
-	return c, nil
+		return nil
+	})
 }
 func (az *Azure) GetConfig() (*CustomPricing, error) {
-	c, err := GetCustomPricingData("azure.json")
+	c, err := az.Config.GetCustomPricingData()
 	if c.Discount == "" {
 		c.Discount = "0%"
 	}
@@ -643,6 +620,6 @@ func (az *Azure) PVPricing(PVKey) (*PV, error) {
 	return nil, nil
 }
 
-func (az *Azure) GetLocalStorageQuery(offset string) (string, error) {
-	return "", nil
+func (az *Azure) GetLocalStorageQuery(window, offset string, rate bool) string {
+	return ""
 }

+ 34 - 48
cloud/customprovider.go

@@ -3,7 +3,6 @@ package cloud
 import (
 	"encoding/json"
 	"io"
-	"io/ioutil"
 	"net/url"
 	"strconv"
 	"strings"
@@ -27,6 +26,7 @@ type CustomProvider struct {
 	GPULabel                string
 	GPULabelValue           string
 	DownloadPricingDataLock sync.RWMutex
+	Config                  *ProviderConfig
 }
 
 type customProviderKey struct {
@@ -37,12 +37,12 @@ type customProviderKey struct {
 	Labels         map[string]string
 }
 
-func (*CustomProvider) GetLocalStorageQuery(offset string) (string, error) {
-	return "", nil
+func (*CustomProvider) GetLocalStorageQuery(window, offset string, rate bool) string {
+	return ""
 }
 
-func (*CustomProvider) GetConfig() (*CustomPricing, error) {
-	return GetCustomPricingData("default.json")
+func (cp *CustomProvider) GetConfig() (*CustomPricing, error) {
+	return cp.Config.GetCustomPricingData()
 }
 
 func (*CustomProvider) GetManagementPlatform() (string, error) {
@@ -54,60 +54,46 @@ func (*CustomProvider) ApplyReservedInstancePricing(nodes map[string]*Node) {
 }
 
 func (cp *CustomProvider) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
-	c, err := GetCustomPricingData("default.json")
-	if err != nil {
-		return nil, err
-	}
-
-	return configmapUpdate(c, configPathFor("default.json"), a)
+	return cp.Config.UpdateFromMap(a)
 }
 
 func (cp *CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
-	c, err := GetCustomPricingData("default.json")
-	if err != nil {
-		return nil, err
-	}
-
+	// Parse config updates from reader
 	a := make(map[string]interface{})
-	err = json.NewDecoder(r).Decode(&a)
+	err := json.NewDecoder(r).Decode(&a)
 	if err != nil {
 		return nil, err
 	}
-	for k, v := range a {
-		kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
-		vstr, ok := v.(string)
-		if ok {
-			err := SetCustomPricingField(c, kUpper, vstr)
-			if err != nil {
-				return nil, err
-			}
-		} else {
-			sci := v.(map[string]interface{})
-			sc := make(map[string]string)
-			for k, val := range sci {
-				sc[k] = val.(string)
+
+	// Update Config
+	c, err := cp.Config.Update(func(c *CustomPricing) error {
+		for k, v := range a {
+			kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+			vstr, ok := v.(string)
+			if ok {
+				err := SetCustomPricingField(c, kUpper, vstr)
+				if err != nil {
+					return err
+				}
+			} else {
+				sci := v.(map[string]interface{})
+				sc := make(map[string]string)
+				for k, val := range sci {
+					sc[k] = val.(string)
+				}
+				c.SharedCosts = sc //todo: support reflection/multiple map fields
 			}
-			c.SharedCosts = sc //todo: support reflection/multiple map fields
 		}
-	}
-
-	cj, err := json.Marshal(c)
-	if err != nil {
-		return nil, err
-	}
-
-	configPath := configPathFor("default.json")
 
-	configLock.Lock()
-	err = ioutil.WriteFile(configPath, cj, 0644)
-	configLock.Unlock()
+		return nil
+	})
 
 	if err != nil {
 		return nil, err
 	}
+
 	defer cp.DownloadPricingData()
 	return c, nil
-
 }
 
 func (cp *CustomProvider) ClusterInfo() (map[string]string, error) {
@@ -168,7 +154,7 @@ func (cp *CustomProvider) DownloadPricingData() error {
 		m := make(map[string]*NodePrice)
 		cp.Pricing = m
 	}
-	p, err := GetCustomPricingData("default.json")
+	p, err := cp.Config.GetCustomPricingData()
 	if err != nil {
 		return err
 	}
@@ -213,8 +199,8 @@ func (*CustomProvider) QuerySQL(query string) ([]byte, error) {
 	return nil, nil
 }
 
-func (*CustomProvider) PVPricing(pvk PVKey) (*PV, error) {
-	cpricing, err := GetCustomPricingData("default.json")
+func (cp *CustomProvider) PVPricing(pvk PVKey) (*PV, error) {
+	cpricing, err := cp.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}
@@ -223,8 +209,8 @@ func (*CustomProvider) PVPricing(pvk PVKey) (*PV, error) {
 	}, nil
 }
 
-func (*CustomProvider) NetworkPricing() (*Network, error) {
-	cpricing, err := GetCustomPricingData("default.json")
+func (cp *CustomProvider) NetworkPricing() (*Network, error) {
+	cpricing, err := cp.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}

+ 125 - 105
cloud/gcpprovider.go

@@ -51,6 +51,7 @@ type GCP struct {
 	BillingDataDataset      string
 	DownloadPricingDataLock sync.RWMutex
 	ReservedInstances       []*GCPReservedInstance
+	Config                  *ProviderConfig
 	*CustomProvider
 }
 
@@ -80,13 +81,34 @@ func gcpAllocationToOutOfClusterAllocation(gcpAlloc gcpAllocation) *OutOfCluster
 	}
 }
 
-func (gcp *GCP) GetLocalStorageQuery(offset string) (string, error) {
-	localStorageCost := 0.04 // TODO: Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
-	return fmt.Sprintf(`sum(sum(container_fs_limit_bytes{device!="tmpfs", id="/"} %s) by (instance, cluster_id)) by (cluster_id) / 1024 / 1024 / 1024 * %f`, offset, localStorageCost), nil
+func (gcp *GCP) GetLocalStorageQuery(window, offset string, rate bool) string {
+	// TODO Set to the price for the appropriate storage class. It's not trivial to determine the local storage disk type
+	// See https://cloud.google.com/compute/disks-image-pricing#persistentdisk
+	localStorageCost := 0.04
+
+	fmtOffset := ""
+	if offset != "" {
+		fmtOffset = fmt.Sprintf("offset %s", offset)
+	}
+
+	fmtCumulativeQuery := `sum(
+		sum_over_time(container_fs_limit_bytes{device!="tmpfs", id="/"}[%s:1m]%s)
+	) by (cluster_id) / 60 / 730 / 1024 / 1024 / 1024 * %f`
+
+	fmtMonthlyQuery := `sum(
+		avg_over_time(container_fs_limit_bytes{device!="tmpfs", id="/"}[%s:1m]%s)
+	) by (cluster_id) / 1024 / 1024 / 1024 * %f`
+
+	fmtQuery := fmtCumulativeQuery
+	if rate {
+		fmtQuery = fmtMonthlyQuery
+	}
+
+	return fmt.Sprintf(fmtQuery, window, fmtOffset, localStorageCost)
 }
 
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {
-	c, err := GetCustomPricingData("gcp.json")
+	c, err := gcp.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}
@@ -119,97 +141,78 @@ func (gcp *GCP) GetManagementPlatform() (string, error) {
 }
 
 func (gcp *GCP) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
-	c, err := GetCustomPricingData("gcp.json")
-	if err != nil {
-		return nil, err
-	}
-
-	return configmapUpdate(c, configPathFor("gcp.json"), a)
+	return gcp.Config.UpdateFromMap(a)
 }
 
 func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
-	c, err := GetCustomPricingData("gcp.json")
-	if err != nil {
-		return nil, err
-	}
-	path := os.Getenv("CONFIG_PATH")
-	if path == "" {
-		path = "/models/"
-	}
-	if updateType == BigqueryUpdateType {
-		a := BigQueryConfig{}
-		err = json.NewDecoder(r).Decode(&a)
-		if err != nil {
-			return nil, err
-		}
+	return gcp.Config.Update(func(c *CustomPricing) error {
+		if updateType == BigqueryUpdateType {
+			a := BigQueryConfig{}
+			err := json.NewDecoder(r).Decode(&a)
+			if err != nil {
+				return err
+			}
 
-		c.ProjectID = a.ProjectID
-		c.BillingDataDataset = a.BillingDataDataset
+			c.ProjectID = a.ProjectID
+			c.BillingDataDataset = a.BillingDataDataset
 
-		j, err := json.Marshal(a.Key)
-		if err != nil {
-			return nil, err
-		}
+			j, err := json.Marshal(a.Key)
+			if err != nil {
+				return err
+			}
 
-		keyPath := path + "key.json"
-		err = ioutil.WriteFile(keyPath, j, 0644)
-		if err != nil {
-			return nil, err
-		}
-	} else {
-		a := make(map[string]interface{})
-		err = json.NewDecoder(r).Decode(&a)
-		if err != nil {
-			return nil, err
-		}
-		for k, v := range a {
-			kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
-			vstr, ok := v.(string)
-			if ok {
-				err := SetCustomPricingField(c, kUpper, vstr)
-				if err != nil {
-					return nil, err
-				}
-			} else {
-				sci := v.(map[string]interface{})
-				sc := make(map[string]string)
-				for k, val := range sci {
-					sc[k] = val.(string)
+			path := os.Getenv("CONFIG_PATH")
+			if path == "" {
+				path = "/models/"
+			}
+
+			keyPath := path + "key.json"
+			err = ioutil.WriteFile(keyPath, j, 0644)
+			if err != nil {
+				return err
+			}
+		} else {
+			a := make(map[string]interface{})
+			err := json.NewDecoder(r).Decode(&a)
+			if err != nil {
+				return err
+			}
+			for k, v := range a {
+				kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+				vstr, ok := v.(string)
+				if ok {
+					err := SetCustomPricingField(c, kUpper, vstr)
+					if err != nil {
+						return err
+					}
+				} else {
+					sci := v.(map[string]interface{})
+					sc := make(map[string]string)
+					for k, val := range sci {
+						sc[k] = val.(string)
+					}
+					c.SharedCosts = sc //todo: support reflection/multiple map fields
 				}
-				c.SharedCosts = sc //todo: support reflection/multiple map fields
 			}
 		}
-	}
 
-	cj, err := json.Marshal(c)
-	if err != nil {
-		return nil, err
-	}
-	remoteEnabled := os.Getenv(remoteEnabled)
-	if remoteEnabled == "true" {
-		err = UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
-		if err != nil {
-			return nil, err
+		remoteEnabled := os.Getenv(remoteEnabled)
+		if remoteEnabled == "true" {
+			err := UpdateClusterMeta(os.Getenv(clusterIDKey), c.ClusterName)
+			if err != nil {
+				return err
+			}
 		}
-	}
 
-	configPath := path + "gcp.json"
-
-	configLock.Lock()
-	err = ioutil.WriteFile(configPath, cj, 0644)
-	configLock.Unlock()
-	if err != nil {
-		return nil, err
-	}
-
-	return c, nil
+		return nil
+	})
 }
 
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
 func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
-	c, err := GetCustomPricingData("gcp.json")
+	c, err := gcp.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}
@@ -234,7 +237,7 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string,
 
 // QuerySQL should query BigQuery for billing data for out of cluster costs.
 func (gcp *GCP) QuerySQL(query string) ([]*OutOfClusterAllocation, error) {
-	c, err := GetCustomPricingData("gcp.json")
+	c, err := gcp.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}
@@ -463,6 +466,13 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 					instanceType = "n2standard"
 				}
 
+				if (instanceType == "ram" || instanceType == "cpu") && strings.Contains(strings.ToUpper(product.Description), "E2 INSTANCE") {
+					instanceType = "e2"
+				}
+				partialCPUMap := make(map[string]float64)
+				partialCPUMap["e2micro"] = 0.25
+				partialCPUMap["e2small"] = 0.5
+				partialCPUMap["e2medium"] = 1
 				/*
 					var partialCPU float64
 					if strings.ToLower(instanceType) == "f1micro" {
@@ -480,11 +490,23 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 					}
 				}
 
-				for _, sr := range product.ServiceRegions {
-					region := sr
-					candidateKey := region + "," + instanceType + "," + usageType
-					candidateKeyGPU := candidateKey + ",gpu"
+				candidateKeys := []string{}
+				for _, region := range product.ServiceRegions {
+					if instanceType == "e2" { // this needs to be done to handle a partial cpu mapping
+						candidateKeys = append(candidateKeys, region+","+"e2micro"+","+usageType)
+						candidateKeys = append(candidateKeys, region+","+"e2small"+","+usageType)
+						candidateKeys = append(candidateKeys, region+","+"e2medium"+","+usageType)
+						candidateKeys = append(candidateKeys, region+","+"e2standard"+","+usageType)
+					} else {
+						candidateKey := region + "," + instanceType + "," + usageType
+						candidateKeys = append(candidateKeys, candidateKey)
+					}
+				}
 
+				for _, candidateKey := range candidateKeys {
+					instanceType = strings.Split(candidateKey, ",")[1] // we may have overriden this while generating candidate keys
+					region := strings.Split(candidateKey, ",")[0]
+					candidateKeyGPU := candidateKey + ",gpu"
 					if gpuType != "" {
 						lastRateIndex := len(product.PricingInfo[0].PricingExpression.TieredRates) - 1
 						var nanos float64
@@ -543,11 +565,10 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 									product.Node = &Node{
 										RAMCost: strconv.FormatFloat(hourlyPrice, 'f', -1, 64),
 									}
-									/*
-										if partialCPU != 0 {
-											product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
-										}
-									*/
+									partialCPU, pcok := partialCPUMap[instanceType]
+									if pcok {
+										product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
+									}
 									product.Node.UsageType = usageType
 									gcpPricingList[candidateKey] = product
 								}
@@ -560,11 +581,10 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 									product.Node = &Node{
 										RAMCost: strconv.FormatFloat(hourlyPrice, 'f', -1, 64),
 									}
-									/*
-										if partialCPU != 0 {
-											product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
-										}
-									*/
+									partialCPU, pcok := partialCPUMap[instanceType]
+									if pcok {
+										product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
+									}
 									product.Node.UsageType = usageType
 									gcpPricingList[candidateKeyGPU] = product
 								}
@@ -577,11 +597,10 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 									product.Node = &Node{
 										VCPUCost: strconv.FormatFloat(hourlyPrice, 'f', -1, 64),
 									}
-									/*
-										if partialCPU != 0 {
-											product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
-										}
-									*/
+									partialCPU, pcok := partialCPUMap[instanceType]
+									if pcok {
+										product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
+									}
 									product.Node.UsageType = usageType
 									gcpPricingList[candidateKey] = product
 								}
@@ -592,11 +611,10 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]Key, pvKeys map[stri
 									product.Node = &Node{
 										VCPUCost: strconv.FormatFloat(hourlyPrice, 'f', -1, 64),
 									}
-									/*
-										if partialCPU != 0 {
-											product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
-										}
-									*/
+									partialCPU, pcok := partialCPUMap[instanceType]
+									if pcok {
+										product.Node.VCPU = fmt.Sprintf("%f", partialCPU)
+									}
 									product.Node.UsageType = usageType
 									gcpPricingList[candidateKeyGPU] = product
 								}
@@ -687,7 +705,7 @@ func (gcp *GCP) parsePages(inputKeys map[string]Key, pvKeys map[string]PVKey) (m
 func (gcp *GCP) DownloadPricingData() error {
 	gcp.DownloadPricingDataLock.Lock()
 	defer gcp.DownloadPricingDataLock.Unlock()
-	c, err := GetCustomPricingData("gcp.json")
+	c, err := gcp.Config.GetCustomPricingData()
 	if err != nil {
 		klog.V(2).Infof("Error downloading default pricing data: %s", err.Error())
 		return err
@@ -760,8 +778,8 @@ func (gcp *GCP) PVPricing(pvk PVKey) (*PV, error) {
 }
 
 // Stubbed NetworkPricing for GCP. Pull directly from gcp.json for now
-func (c *GCP) NetworkPricing() (*Network, error) {
-	cpricing, err := GetCustomPricingData("gcp.json")
+func (gcp *GCP) NetworkPricing() (*Network, error) {
+	cpricing, err := gcp.Config.GetCustomPricingData()
 	if err != nil {
 		return nil, err
 	}
@@ -1070,6 +1088,8 @@ func (gcp *gcpKey) Features() string {
 	instanceType := strings.ToLower(strings.Join(strings.Split(gcp.Labels[v1.LabelInstanceType], "-")[:2], ""))
 	if instanceType == "n1highmem" || instanceType == "n1highcpu" {
 		instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
+	} else if instanceType == "e2highmem" || instanceType == "e2highcpu" {
+		instanceType = "e2standard"
 	} else if strings.HasPrefix(instanceType, "custom") {
 		instanceType = "custom" // The suffix of custom does not matter
 	}

+ 5 - 151
cloud/provider.go

@@ -2,16 +2,12 @@ package cloud
 
 import (
 	"database/sql"
-	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
-	"io/ioutil"
 	"net/url"
 	"os"
-	"reflect"
 	"strings"
-	"sync"
 
 	"k8s.io/klog"
 
@@ -34,9 +30,6 @@ var createTableStatements = []string{
 	);`,
 }
 
-// This Mutex is used to control read/writes to our default config file
-var configLock sync.Mutex
-
 // ReservedInstanceData keeps record of resources on a node should be
 // priced at reserved rates
 type ReservedInstanceData struct {
@@ -175,7 +168,7 @@ type Provider interface {
 	UpdateConfigFromConfigMap(map[string]string) (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
-	GetLocalStorageQuery(offset string) (string, error)
+	GetLocalStorageQuery(string, string, bool) string
 	ExternalAllocations(string, string, string, string, string) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 }
@@ -210,120 +203,6 @@ func CustomPricesEnabled(p Provider) bool {
 	return config.CustomPricesEnabled == "true"
 }
 
-// DefaultPricing should be returned so we can do computation even if no file is supplied.
-func DefaultPricing() *CustomPricing {
-	return &CustomPricing{
-		Provider:              "base",
-		Description:           "Default prices based on GCP us-central1",
-		CPU:                   "0.031611",
-		SpotCPU:               "0.006655",
-		RAM:                   "0.004237",
-		SpotRAM:               "0.000892",
-		GPU:                   "0.95",
-		Storage:               "0.00005479452",
-		ZoneNetworkEgress:     "0.01",
-		RegionNetworkEgress:   "0.01",
-		InternetNetworkEgress: "0.12",
-		CustomPricesEnabled:   "false",
-	}
-}
-
-// GetDefaultPricingData will search for a json file representing pricing data in /models/ and use it for base pricing info.
-func GetCustomPricingData(fname string) (*CustomPricing, error) {
-	configLock.Lock()
-	defer configLock.Unlock()
-
-	path := configPathFor(fname)
-
-	exists, err := fileExists(path)
-	// File Error other than NotExists
-	if err != nil {
-		klog.Infof("Custom Pricing file at path '%s' read error: '%s'", path, err.Error())
-		return DefaultPricing(), err
-	}
-
-	// File Doesn't Exist
-	if !exists {
-		klog.Infof("Could not find Custom Pricing file at path '%s'", path)
-		c := DefaultPricing()
-		cj, err := json.Marshal(c)
-		if err != nil {
-			return c, err
-		}
-
-		err = ioutil.WriteFile(path, cj, 0644)
-		if err != nil {
-			klog.Infof("Could not write Custom Pricing file to path '%s'", path)
-			return c, err
-		}
-
-		return c, nil
-	}
-
-	// File Exists - Read all contents of file, unmarshal json
-	byteValue, err := ioutil.ReadFile(path)
-	if err != nil {
-		klog.Infof("Could not read Custom Pricing file at path %s", path)
-		return DefaultPricing(), err
-	}
-
-	var customPricing CustomPricing
-	err = json.Unmarshal(byteValue, &customPricing)
-	if err != nil {
-		klog.Infof("Could not decode Custom Pricing file at path %s", path)
-		return DefaultPricing(), err
-	}
-
-	return &customPricing, nil
-}
-
-func configmapUpdate(c *CustomPricing, path string, a map[string]string) (*CustomPricing, error) {
-	for k, v := range a {
-		kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
-		err := SetCustomPricingField(c, kUpper, v)
-		if err != nil {
-			return nil, err
-		}
-	}
-
-	cj, err := json.Marshal(c)
-	if err != nil {
-		return c, err
-	}
-
-	configLock.Lock()
-	err = ioutil.WriteFile(path, cj, 0644)
-	configLock.Unlock()
-
-	if err != nil {
-		return c, err
-	}
-
-	return c, nil
-}
-
-func SetCustomPricingField(obj *CustomPricing, name string, value string) error {
-	structValue := reflect.ValueOf(obj).Elem()
-	structFieldValue := structValue.FieldByName(name)
-
-	if !structFieldValue.IsValid() {
-		return fmt.Errorf("No such field: %s in obj", name)
-	}
-
-	if !structFieldValue.CanSet() {
-		return fmt.Errorf("Cannot set %s field value", name)
-	}
-
-	structFieldType := structFieldValue.Type()
-	val := reflect.ValueOf(value)
-	if structFieldType != val.Type() {
-		return fmt.Errorf("Provided value type didn't match custom pricing field type")
-	}
-
-	structFieldValue.Set(val)
-	return nil
-}
-
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
 func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
 	if metadata.OnGCE() {
@@ -334,6 +213,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		return &GCP{
 			Clientset: cache,
 			APIKey:    apiKey,
+			Config:    NewProviderConfig("gcp.json"),
 		}, nil
 	}
 
@@ -347,16 +227,19 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		klog.V(2).Info("Found ProviderID starting with \"aws\", using AWS Provider")
 		return &AWS{
 			Clientset: cache,
+			Config:    NewProviderConfig("aws.json"),
 		}, nil
 	} else if strings.HasPrefix(provider, "azure") {
 		klog.V(2).Info("Found ProviderID starting with \"azure\", using Azure Provider")
 		return &Azure{
 			Clientset: cache,
+			Config:    NewProviderConfig("azure.json"),
 		}, nil
 	} else {
 		klog.V(2).Info("Unsupported provider, falling back to default")
 		return &CustomProvider{
 			Clientset: cache,
+			Config:    NewProviderConfig("default.json"),
 		}, nil
 	}
 }
@@ -446,32 +329,3 @@ func GetOrCreateClusterMeta(cluster_id, cluster_name string) (string, string, er
 
 	return id, name, nil
 }
-
-// File exists has three different return cases that should be handled:
-//   1. File exists and is not a directory (true, nil)
-//   2. File does not exist (false, nil)
-//   3. File may or may not exist. Error occurred during stat (false, error)
-// The third case represents the scenario where the stat returns an error,
-// but the error isn't relevant to the path. This can happen when the current
-// user doesn't have permission to access the file.
-func fileExists(filename string) (bool, error) {
-	info, err := os.Stat(filename)
-	if err != nil {
-		if os.IsNotExist(err) {
-			return false, nil
-		}
-
-		return false, err
-	}
-
-	return !info.IsDir(), nil
-}
-
-// Returns the configuration directory concatenated with a specific config file name
-func configPathFor(filename string) string {
-	path := os.Getenv("CONFIG_PATH")
-	if path == "" {
-		path = "/models/"
-	}
-	return path + filename
-}

+ 216 - 0
cloud/providerconfig.go

@@ -0,0 +1,216 @@
+package cloud
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"reflect"
+	"strings"
+	"sync"
+
+	"k8s.io/klog"
+)
+
+// ProviderConfig is a utility class that provides a thread-safe configuration
+// storage/cache for all Provider implementations
+type ProviderConfig struct {
+	lock          *sync.Mutex
+	fileName      string
+	configPath    string
+	customPricing *CustomPricing
+}
+
+// Creates a new ProviderConfig instance
+func NewProviderConfig(file string) *ProviderConfig {
+	return &ProviderConfig{
+		lock:          new(sync.Mutex),
+		fileName:      file,
+		configPath:    configPathFor(file),
+		customPricing: nil,
+	}
+}
+
+// Non-ThreadSafe logic to load the config file if a cache does not exist. Flag to write
+// the default config if the config file doesn't exist.
+func (pc *ProviderConfig) loadConfig(writeIfNotExists bool) (*CustomPricing, error) {
+	if pc.customPricing != nil {
+		return pc.customPricing, nil
+	}
+
+	exists, err := fileExists(pc.configPath)
+	// File Error other than NotExists
+	if err != nil {
+		klog.Infof("Custom Pricing file at path '%s' read error: '%s'", pc.configPath, err.Error())
+		return DefaultPricing(), err
+	}
+
+	// File Doesn't Exist
+	if !exists {
+		klog.Infof("Could not find Custom Pricing file at path '%s'", pc.configPath)
+		pc.customPricing = DefaultPricing()
+
+		// Only write the file if flag enabled
+		if writeIfNotExists {
+			cj, err := json.Marshal(pc.customPricing)
+			if err != nil {
+				return pc.customPricing, err
+			}
+
+			err = ioutil.WriteFile(pc.configPath, cj, 0644)
+			if err != nil {
+				klog.Infof("Could not write Custom Pricing file to path '%s'", pc.configPath)
+				return pc.customPricing, err
+			}
+		}
+
+		return pc.customPricing, nil
+	}
+
+	// File Exists - Read all contents of file, unmarshal json
+	byteValue, err := ioutil.ReadFile(pc.configPath)
+	if err != nil {
+		klog.Infof("Could not read Custom Pricing file at path %s", pc.configPath)
+		// If read fails, we don't want to cache default, assuming that the file is valid
+		return DefaultPricing(), err
+	}
+
+	var customPricing CustomPricing
+	err = json.Unmarshal(byteValue, &customPricing)
+	if err != nil {
+		klog.Infof("Could not decode Custom Pricing file at path %s", pc.configPath)
+		return DefaultPricing(), err
+	}
+
+	pc.customPricing = &customPricing
+
+	return pc.customPricing, nil
+}
+
+// ThreadSafe method for retrieving the custom pricing config.
+func (pc *ProviderConfig) GetCustomPricingData() (*CustomPricing, error) {
+	pc.lock.Lock()
+	defer pc.lock.Unlock()
+
+	return pc.loadConfig(true)
+}
+
+// Allows a call to manually update the configuration while maintaining proper thread-safety
+// for read/write methods.
+func (pc *ProviderConfig) Update(updateFunc func(*CustomPricing) error) (*CustomPricing, error) {
+	pc.lock.Lock()
+	defer pc.lock.Unlock()
+
+	// Load Config, set flag to _not_ write if failure to find file.
+	// We're about to write the updated values, so we don't want to double write.
+	c, _ := pc.loadConfig(false)
+
+	// Execute Update - On error, return the in-memory config but don't update cache
+	// explicitly
+	err := updateFunc(c)
+	if err != nil {
+		return c, err
+	}
+
+	// Cache Update (possible the ptr already references the cached value)
+	pc.customPricing = c
+
+	cj, err := json.Marshal(c)
+	if err != nil {
+		return c, err
+	}
+
+	err = ioutil.WriteFile(pc.configPath, cj, 0644)
+
+	if err != nil {
+		return c, err
+	}
+
+	return c, nil
+}
+
+// ThreadSafe update of the config using a string map
+func (pc *ProviderConfig) UpdateFromMap(a map[string]string) (*CustomPricing, error) {
+	// Run our Update() method using SetCustomPricingField logic
+	return pc.Update(func(c *CustomPricing) error {
+		for k, v := range a {
+			// Just so we consistently supply / receive the same values, uppercase the first letter.
+			kUpper := strings.Title(k)
+			err := SetCustomPricingField(c, kUpper, v)
+			if err != nil {
+				return err
+			}
+		}
+
+		return nil
+	})
+}
+
+// DefaultPricing should be returned so we can do computation even if no file is supplied.
+func DefaultPricing() *CustomPricing {
+	return &CustomPricing{
+		Provider:              "base",
+		Description:           "Default prices based on GCP us-central1",
+		CPU:                   "0.031611",
+		SpotCPU:               "0.006655",
+		RAM:                   "0.004237",
+		SpotRAM:               "0.000892",
+		GPU:                   "0.95",
+		Storage:               "0.00005479452",
+		ZoneNetworkEgress:     "0.01",
+		RegionNetworkEgress:   "0.01",
+		InternetNetworkEgress: "0.12",
+		CustomPricesEnabled:   "false",
+	}
+}
+
+func SetCustomPricingField(obj *CustomPricing, name string, value string) error {
+	structValue := reflect.ValueOf(obj).Elem()
+	structFieldValue := structValue.FieldByName(name)
+
+	if !structFieldValue.IsValid() {
+		return fmt.Errorf("No such field: %s in obj", name)
+	}
+
+	if !structFieldValue.CanSet() {
+		return fmt.Errorf("Cannot set %s field value", name)
+	}
+
+	structFieldType := structFieldValue.Type()
+	val := reflect.ValueOf(value)
+	if structFieldType != val.Type() {
+		return fmt.Errorf("Provided value type didn't match custom pricing field type")
+	}
+
+	structFieldValue.Set(val)
+	return nil
+}
+
+// File exists has three different return cases that should be handled:
+//   1. File exists and is not a directory (true, nil)
+//   2. File does not exist (false, nil)
+//   3. File may or may not exist. Error occurred during stat (false, error)
+// The third case represents the scenario where the stat returns an error,
+// but the error isn't relevant to the path. This can happen when the current
+// user doesn't have permission to access the file.
+func fileExists(filename string) (bool, error) {
+	info, err := os.Stat(filename)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return false, nil
+		}
+
+		return false, err
+	}
+
+	return !info.IsDir(), nil
+}
+
+// Returns the configuration directory concatenated with a specific config file name
+func configPathFor(filename string) string {
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	return path + filename
+}

+ 278 - 33
costmodel/cluster.go

@@ -3,11 +3,12 @@ package costmodel
 import (
 	"fmt"
 	"os"
+	"sync"
 	"time"
 
-	costAnalyzerCloud "github.com/kubecost/cost-model/cloud"
-	prometheusClient "github.com/prometheus/client_golang/api"
-
+	"github.com/kubecost/cost-model/cloud"
+	"github.com/kubecost/cost-model/util"
+	prometheus "github.com/prometheus/client_golang/api"
 	"k8s.io/klog"
 )
 
@@ -33,6 +34,257 @@ const (
 	  ) by (cluster_id) %s`
 )
 
+// TODO move this to a package-accessible helper
+type PromQueryContext struct {
+	client prometheus.Client
+	ec     *util.ErrorCollector
+	wg     *sync.WaitGroup
+}
+
+// TODO move this to a package-accessible helper function once dependencies are able to
+// be extricated from costmodel package (PromQueryResult -> Vector). Otherwise, circular deps.
+func AsyncPromQuery(query string, resultCh chan []*PromQueryResult, ctx PromQueryContext) {
+	if ctx.wg != nil {
+		defer ctx.wg.Done()
+	}
+
+	raw, promErr := Query(ctx.client, query)
+	ctx.ec.Report(promErr)
+
+	results, parseErr := NewQueryResults(raw)
+	ctx.ec.Report(parseErr)
+
+	resultCh <- results
+}
+
+// Costs represents cumulative and monthly cluster costs over a given duration. Costs
+// are broken down by cores, memory, and storage.
+type ClusterCosts struct {
+	Start             *time.Time `json:"startTime"`
+	End               *time.Time `json:"endTime"`
+	CPUCumulative     float64    `json:"cpuCumulativeCost"`
+	CPUMonthly        float64    `json:"cpuMonthlyCost"`
+	GPUCumulative     float64    `json:"gpuCumulativeCost"`
+	GPUMonthly        float64    `json:"gpuMonthlyCost"`
+	RAMCumulative     float64    `json:"ramCumulativeCost"`
+	RAMMonthly        float64    `json:"ramMonthlyCost"`
+	StorageCumulative float64    `json:"storageCumulativeCost"`
+	StorageMonthly    float64    `json:"storageMonthlyCost"`
+	TotalCumulative   float64    `json:"totalCumulativeCost"`
+	TotalMonthly      float64    `json:"totalMonthlyCost"`
+}
+
+// NewClusterCostsFromCumulative takes cumulative cost data over a given time range, computes
+// the associated monthly rate data, and returns the Costs.
+func NewClusterCostsFromCumulative(cpu, gpu, ram, storage float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	// If the number of hours is not given (i.e. is zero) compute one from the window and offset
+	if dataHours == 0 {
+		dataHours = end.Sub(*start).Hours()
+	}
+
+	// Do not allow zero-length windows to prevent divide-by-zero issues
+	if dataHours == 0 {
+		return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
+	}
+
+	cc := &ClusterCosts{
+		Start:             start,
+		End:               end,
+		CPUCumulative:     cpu,
+		GPUCumulative:     gpu,
+		RAMCumulative:     ram,
+		StorageCumulative: storage,
+		TotalCumulative:   cpu + gpu + ram + storage,
+		CPUMonthly:        cpu / dataHours * (util.HoursPerMonth),
+		GPUMonthly:        gpu / dataHours * (util.HoursPerMonth),
+		RAMMonthly:        ram / dataHours * (util.HoursPerMonth),
+		StorageMonthly:    storage / dataHours * (util.HoursPerMonth),
+	}
+	cc.TotalMonthly = cc.CPUMonthly + cc.GPUMonthly + cc.RAMMonthly + cc.StorageMonthly
+
+	return cc, nil
+}
+
+// NewClusterCostsFromMonthly takes monthly-rate cost data over a given time range, computes
+// the associated cumulative cost data, and returns the Costs.
+func NewClusterCostsFromMonthly(cpuMonthly, gpuMonthly, ramMonthly, storageMonthly float64, window, offset string, dataHours float64) (*ClusterCosts, error) {
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+
+	// If the number of hours is not given (i.e. is zero) compute one from the window and offset
+	if dataHours == 0 {
+		dataHours = end.Sub(*start).Hours()
+	}
+
+	// Do not allow zero-length windows to prevent divide-by-zero issues
+	if dataHours == 0 {
+		return nil, fmt.Errorf("illegal time range: window %s, offset %s", window, offset)
+	}
+
+	cc := &ClusterCosts{
+		Start:             start,
+		End:               end,
+		CPUMonthly:        cpuMonthly,
+		GPUMonthly:        gpuMonthly,
+		RAMMonthly:        ramMonthly,
+		StorageMonthly:    storageMonthly,
+		TotalMonthly:      cpuMonthly + gpuMonthly + ramMonthly + storageMonthly,
+		CPUCumulative:     cpuMonthly / util.HoursPerMonth * dataHours,
+		GPUCumulative:     gpuMonthly / util.HoursPerMonth * dataHours,
+		RAMCumulative:     ramMonthly / util.HoursPerMonth * dataHours,
+		StorageCumulative: storageMonthly / util.HoursPerMonth * dataHours,
+	}
+	cc.TotalCumulative = cc.CPUCumulative + cc.GPUCumulative + cc.RAMCumulative + cc.StorageCumulative
+
+	return cc, nil
+}
+
+// ComputeClusterCosts gives the cumulative and monthly-rate cluster costs over a window of time for all clusters.
+func ComputeClusterCosts(client prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*ClusterCosts, error) {
+	// Compute number of minutes in the full interval, for use interpolating missed scrapes or scaling missing data
+	start, end, err := util.ParseTimeRange(window, offset)
+	if err != nil {
+		return nil, err
+	}
+	mins := end.Sub(*start).Minutes()
+
+	const fmtQueryDataCount = `max(count_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s))`
+
+	const fmtQueryTotalGPU = `sum(
+		sum_over_time(node_gpu_hourly_cost[%s:1m]%s) / 60
+	) by (cluster_id)`
+
+	const fmtQueryTotalCPU = `sum(
+		sum(sum_over_time(kube_node_status_capacity_cpu_cores[%s:1m]%s)) by (node, cluster_id) *
+		avg(avg_over_time(node_cpu_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
+	) by (cluster_id)`
+
+	const fmtQueryTotalRAM = `sum(
+		sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:1m]%s) / 1024 / 1024 / 1024) by (node, cluster_id) *
+		avg(avg_over_time(node_ram_hourly_cost[%s:1m]%s)) by (node, cluster_id) / 60
+	) by (cluster_id)`
+
+	const fmtQueryTotalStorage = `sum(
+		sum(sum_over_time(kube_persistentvolume_capacity_bytes[%s:1m]%s)) by (persistentvolume, cluster_id) / 1024 / 1024 / 1024 *
+		avg(avg_over_time(pv_hourly_cost[%s:1m]%s)) by (persistentvolume, cluster_id) / 60
+	) by (cluster_id) %s`
+
+	queryTotalLocalStorage := provider.GetLocalStorageQuery(window, offset, false)
+	if queryTotalLocalStorage != "" {
+		queryTotalLocalStorage = fmt.Sprintf(" + %s", queryTotalLocalStorage)
+	}
+
+	fmtOffset := ""
+	if offset != "" {
+		fmtOffset = fmt.Sprintf("offset %s", offset)
+	}
+
+	queryDataCount := fmt.Sprintf(fmtQueryDataCount, window, fmtOffset)
+	queryTotalGPU := fmt.Sprintf(fmtQueryTotalGPU, window, fmtOffset)
+	queryTotalCPU := fmt.Sprintf(fmtQueryTotalCPU, window, fmtOffset, window, fmtOffset)
+	queryTotalRAM := fmt.Sprintf(fmtQueryTotalRAM, window, fmtOffset, window, fmtOffset)
+	queryTotalStorage := fmt.Sprintf(fmtQueryTotalStorage, window, fmtOffset, window, fmtOffset, queryTotalLocalStorage)
+	numQueries := 5
+
+	klog.V(4).Infof("[Debug] queryDataCount: %s", queryDataCount)
+	klog.V(4).Infof("[Debug] queryTotalGPU: %s", queryTotalGPU)
+	klog.V(4).Infof("[Debug] queryTotalCPU: %s", queryTotalCPU)
+	klog.V(4).Infof("[Debug] queryTotalRAM: %s", queryTotalRAM)
+	klog.V(4).Infof("[Debug] queryTotalStorage: %s", queryTotalStorage)
+
+	// Submit queries to Prometheus asynchronously
+	var ec util.ErrorCollector
+	var wg sync.WaitGroup
+	ctx := PromQueryContext{client, &ec, &wg}
+	ctx.wg.Add(numQueries)
+
+	chDataCount := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryDataCount, chDataCount, ctx)
+
+	chTotalGPU := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalGPU, chTotalGPU, ctx)
+
+	chTotalCPU := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalCPU, chTotalCPU, ctx)
+
+	chTotalRAM := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalRAM, chTotalRAM, ctx)
+
+	chTotalStorage := make(chan []*PromQueryResult, 1)
+	go AsyncPromQuery(queryTotalStorage, chTotalStorage, ctx)
+
+	// After queries complete, retrieve results
+	wg.Wait()
+
+	resultsDataCount := <-chDataCount
+	close(chDataCount)
+
+	resultsTotalGPU := <-chTotalGPU
+	close(chTotalGPU)
+
+	resultsTotalCPU := <-chTotalCPU
+	close(chTotalCPU)
+
+	resultsTotalRAM := <-chTotalRAM
+	close(chTotalRAM)
+
+	resultsTotalStorage := <-chTotalStorage
+	close(chTotalStorage)
+
+	dataMins := mins
+	if len(resultsDataCount) > 0 && len(resultsDataCount[0].Values) > 0 {
+		dataMins = resultsDataCount[0].Values[0].Value
+	} else {
+		klog.V(3).Infof("[Warning] cluster cost data count returned no results")
+	}
+
+	// Intermediate structure storing mapping of [clusterID][type ∈ {cpu, ram, storage, total}]=cost
+	costData := make(map[string]map[string]float64)
+	defaultClusterID := os.Getenv(clusterIDKey)
+
+	// Helper function to iterate over Prom query results, parsing the raw values into
+	// the intermediate costData structure.
+	setCostsFromResults := func(costData map[string]map[string]float64, results []*PromQueryResult, name string) {
+		for _, result := range results {
+			clusterID, _ := result.GetString("cluster_id")
+			if clusterID == "" {
+				clusterID = defaultClusterID
+			}
+			if _, ok := costData[clusterID]; !ok {
+				costData[clusterID] = map[string]float64{}
+			}
+			if len(result.Values) > 0 {
+				costData[clusterID][name] += result.Values[0].Value
+				costData[clusterID]["total"] += result.Values[0].Value
+			}
+		}
+	}
+	setCostsFromResults(costData, resultsTotalGPU, "gpu")
+	setCostsFromResults(costData, resultsTotalCPU, "cpu")
+	setCostsFromResults(costData, resultsTotalRAM, "ram")
+	setCostsFromResults(costData, resultsTotalStorage, "storage")
+
+	// Convert intermediate structure to Costs instances
+	costsByCluster := map[string]*ClusterCosts{}
+	for id, cd := range costData {
+		costs, err := NewClusterCostsFromCumulative(cd["cpu"], cd["gpu"], cd["ram"], cd["storage"], window, offset, dataMins/util.MinsPerHour)
+		if err != nil {
+			klog.V(3).Infof("[Warning] Failed to parse cluster costs on %s (%s) from cumulative data: %+v", window, offset, cd)
+			return nil, err
+		}
+		costsByCluster[id] = costs
+	}
+
+	return costsByCluster, nil
+}
+
 type Totals struct {
 	TotalCost   [][]string `json:"totalcost"`
 	CPUCost     [][]string `json:"cpucost"`
@@ -103,34 +355,33 @@ func resultToTotal(qr interface{}) (map[string][][]string, error) {
 }
 
 // ClusterCostsForAllClusters gives the cluster costs averaged over a window of time for all clusters.
-func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (map[string]*Totals, error) {
-
-	if offset != "" {
-		offset = fmt.Sprintf("offset %s", offset)
-	}
-
-	localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
-	if err != nil {
-		return nil, err
-	}
+func ClusterCostsForAllClusters(cli prometheus.Client, provider cloud.Provider, window, offset string) (map[string]*Totals, error) {
+	localStorageQuery := provider.GetLocalStorageQuery(window, offset, true)
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
-	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
+	fmtOffset := ""
+	if offset != "" {
+		fmtOffset = fmt.Sprintf("offset %s", offset)
+	}
+
+	qCores := fmt.Sprintf(queryClusterCores, window, fmtOffset, window, fmtOffset, window, fmtOffset)
+	qRAM := fmt.Sprintf(queryClusterRAM, window, fmtOffset, window, fmtOffset)
+	qStorage := fmt.Sprintf(queryStorage, window, fmtOffset, window, fmtOffset, localStorageQuery)
 
 	klog.V(4).Infof("Running query %s", qCores)
 	resultClusterCores, err := Query(cli, qCores)
 	if err != nil {
 		return nil, fmt.Errorf("Error for query %s: %s", qCores, err.Error())
 	}
+
 	klog.V(4).Infof("Running query %s", qRAM)
 	resultClusterRAM, err := Query(cli, qRAM)
 	if err != nil {
 		return nil, fmt.Errorf("Error for query %s: %s", qRAM, err.Error())
 	}
+
 	klog.V(4).Infof("Running query %s", qRAM)
 	resultStorage, err := Query(cli, qStorage)
 	if err != nil {
@@ -175,25 +426,23 @@ func ClusterCostsForAllClusters(cli prometheusClient.Client, cloud costAnalyzerC
 	return toReturn, nil
 }
 
-// ClusterCosts gives the current full cluster costs averaged over a window of time.
-func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, windowString, offset string) (*Totals, error) {
-
+// AverageClusterTotals gives the current full cluster costs averaged over a window of time.
+// Used to be ClutserCosts, but has been deprecated for that use.
+func AverageClusterTotals(cli prometheus.Client, provider cloud.Provider, windowString, offset string) (*Totals, error) {
 	// turn offsets of the format "[0-9+]h" into the format "offset [0-9+]h" for use in query templatess
+	fmtOffset := ""
 	if offset != "" {
-		offset = fmt.Sprintf("offset %s", offset)
+		fmtOffset = fmt.Sprintf("offset %s", offset)
 	}
 
-	localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
-	if err != nil {
-		return nil, err
-	}
+	localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}
 
-	qCores := fmt.Sprintf(queryClusterCores, windowString, offset, windowString, offset, windowString, offset)
-	qRAM := fmt.Sprintf(queryClusterRAM, windowString, offset, windowString, offset)
-	qStorage := fmt.Sprintf(queryStorage, windowString, offset, windowString, offset, localStorageQuery)
+	qCores := fmt.Sprintf(queryClusterCores, windowString, fmtOffset, windowString, fmtOffset, windowString, fmtOffset)
+	qRAM := fmt.Sprintf(queryClusterRAM, windowString, fmtOffset, windowString, fmtOffset)
+	qStorage := fmt.Sprintf(queryStorage, windowString, fmtOffset, windowString, fmtOffset, localStorageQuery)
 	qTotal := fmt.Sprintf(queryTotal, localStorageQuery)
 
 	resultClusterCores, err := Query(cli, qCores)
@@ -246,12 +495,8 @@ func ClusterCosts(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider,
 }
 
 // ClusterCostsOverTime gives the full cluster costs over time
-func ClusterCostsOverTime(cli prometheusClient.Client, cloud costAnalyzerCloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
-
-	localStorageQuery, err := cloud.GetLocalStorageQuery(offset)
-	if err != nil {
-		return nil, err
-	}
+func ClusterCostsOverTime(cli prometheus.Client, provider cloud.Provider, startString, endString, windowString, offset string) (*Totals, error) {
+	localStorageQuery := provider.GetLocalStorageQuery(windowString, offset, true)
 	if localStorageQuery != "" {
 		localStorageQuery = fmt.Sprintf("+ %s", localStorageQuery)
 	}

+ 9 - 22
costmodel/costmodel.go

@@ -1906,16 +1906,6 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	profileStart = time.Now()
 
-	nodes, err := cm.GetNodeCost(cp)
-	if err != nil {
-		klog.V(1).Infof("[Warning] no cost model available: " + err.Error())
-		return nil, err
-	}
-
-	measureTime(profileStart, profileThreshold, fmt.Sprintf("costDataRange(%fh): GetNodeCost", durHrs))
-
-	profileStart = time.Now()
-
 	pvClaimMapping, err := GetPVInfo(resultPVRequests, clusterID)
 	if err != nil {
 		// Just log for compatibility with KSM less than 1.6
@@ -2129,15 +2119,12 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 			GPUReqV = []*Vector{}
 		}
 
-		node, ok := nodes[c.NodeName]
-		if !ok {
-			klog.V(4).Infof("Node \"%s\" has been deleted from Kubernetes. Query historical data to get it.", c.NodeName)
-			if n, ok := missingNodes[c.NodeName]; ok {
-				node = n
-			} else {
-				node = &costAnalyzerCloud.Node{}
-				missingNodes[c.NodeName] = node
-			}
+		var node *costAnalyzerCloud.Node
+		if n, ok := missingNodes[c.NodeName]; ok {
+			node = n
+		} else {
+			node = &costAnalyzerCloud.Node{}
+			missingNodes[c.NodeName] = node
 		}
 
 		nsKey := c.Namespace + "," + c.ClusterID
@@ -2420,15 +2407,15 @@ func QueryRange(cli prometheusClient.Client, query string, start, end time.Time,
 
 	resp, body, warnings, err := cli.Do(context.Background(), req)
 	for _, w := range warnings {
-		klog.V(3).Infof("Warning '%s' fetching query '%s'", w, query)
+		klog.V(3).Infof("[Warning] '%s' fetching query '%s'", w, query)
 	}
 	if err != nil {
-		return nil, fmt.Errorf("Error %s fetching query %s", err.Error(), query)
+		return nil, fmt.Errorf("[Error] %s fetching query %s", err.Error(), query)
 	}
 	var toReturn interface{}
 	err = json.Unmarshal(body, &toReturn)
 	if err != nil {
-		return nil, fmt.Errorf("%d Error %s fetching query %s", resp.StatusCode, err.Error(), query)
+		return nil, fmt.Errorf("[Error] %d %s fetching query %s", resp.StatusCode, err.Error(), query)
 	}
 	return toReturn, err
 }

+ 1 - 1
costmodel/router.go

@@ -317,7 +317,7 @@ func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httpr
 	window := r.URL.Query().Get("window")
 	offset := r.URL.Query().Get("offset")
 
-	data, err := ClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
+	data, err := ComputeClusterCosts(a.PrometheusClient, a.Cloud, window, offset)
 	w.Write(WrapData(data, err))
 }
 

+ 39 - 0
util/errors.go

@@ -0,0 +1,39 @@
+package util
+
+import "sync"
+
+// Error collection helper
+type ErrorCollector struct {
+	m      sync.Mutex
+	errors []error
+}
+
+// Reports an error to the collector. Ignores if the error is nil.
+func (ec *ErrorCollector) Report(e error) {
+	if e == nil {
+		return
+	}
+
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	ec.errors = append(ec.errors, e)
+}
+
+// Whether or not the collector caught errors
+func (ec *ErrorCollector) IsError() bool {
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	return len(ec.errors) > 0
+}
+
+// Errors caught by the collector
+func (ec *ErrorCollector) Errors() []error {
+	ec.m.Lock()
+	defer ec.m.Unlock()
+
+	errs := make([]error, len(ec.errors))
+	copy(errs, ec.errors)
+	return errs
+}

+ 88 - 0
util/time.go

@@ -0,0 +1,88 @@
+package util
+
+import (
+	"fmt"
+	"strconv"
+	"time"
+)
+
+const (
+	MinsPerHour   = 60.0
+	HoursPerDay   = 24.0
+	HoursPerMonth = 730.0
+	DaysPerMonth  = 30.42
+)
+
+// ParseDuration converts a Prometheus-style duration string into a Duration
+func ParseDuration(duration string) (*time.Duration, error) {
+	unitStr := duration[len(duration)-1:]
+	var unit time.Duration
+	switch unitStr {
+	case "s":
+		unit = time.Second
+	case "m":
+		unit = time.Minute
+	case "h":
+		unit = time.Hour
+	case "d":
+		unit = 24.0 * time.Hour
+	default:
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	amountStr := duration[:len(duration)-1]
+	amount, err := strconv.ParseInt(amountStr, 10, 64)
+	if err != nil {
+		return nil, fmt.Errorf("error parsing duration: %s did not match expected format [0-9+](s|m|d|h)", duration)
+	}
+
+	dur := time.Duration(amount) * unit
+	return &dur, nil
+}
+
+// ParseTimeRange returns a start and end time, respectively, which are converted from
+// a duration and offset, defined as strings with Prometheus-style syntax.
+func ParseTimeRange(duration, offset string) (*time.Time, *time.Time, error) {
+	// endTime defaults to the current time, unless an offset is explicity declared,
+	// in which case it shifts endTime back by given duration
+	endTime := time.Now()
+	if offset != "" {
+		o, err := ParseDuration(offset)
+		if err != nil {
+			return nil, nil, fmt.Errorf("error parsing offset (%s): %s", offset, err)
+		}
+		endTime = endTime.Add(-1 * *o)
+	}
+
+	// if duration is defined in terms of days, convert to hours
+	// e.g. convert "2d" to "48h"
+	durationNorm, err := normalizeTimeParam(duration)
+	if err != nil {
+		return nil, nil, fmt.Errorf("error parsing duration (%s): %s", duration, err)
+	}
+
+	// convert time duration into start and end times, formatted
+	// as ISO datetime strings
+	dur, err := time.ParseDuration(durationNorm)
+	if err != nil {
+		return nil, nil, fmt.Errorf("errorf parsing duration (%s): %s", durationNorm, err)
+	}
+	startTime := endTime.Add(-1 * dur)
+
+	return &startTime, &endTime, nil
+}
+
+func normalizeTimeParam(param string) (string, error) {
+	// convert days to hours
+	if param[len(param)-1:] == "d" {
+		count := param[:len(param)-1]
+		val, err := strconv.ParseInt(count, 10, 64)
+		if err != nil {
+			return "", err
+		}
+		val = val * 24
+		param = fmt.Sprintf("%dh", val)
+	}
+
+	return param, nil
+}