Просмотр исходного кода

Merge pull request #380 from kubecost/develop

merge develop into master
Ajay Tripathy 6 лет назад
Родитель
Сommit
5ba2201749

+ 56 - 13
pkg/cloud/awsprovider.go

@@ -9,7 +9,6 @@ import (
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
-	"net/url"
 	"os"
 	"os"
 	"regexp"
 	"regexp"
 	"strconv"
 	"strconv"
@@ -20,6 +19,7 @@ import (
 	"k8s.io/klog"
 	"k8s.io/klog"
 
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/util"
 
 
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/awserr"
 	"github.com/aws/aws-sdk-go/aws/awserr"
@@ -67,6 +67,11 @@ type AWS struct {
 	*CustomProvider
 	*CustomProvider
 }
 }
 
 
+type AWSAccessKey struct {
+	AccessKeyID     string `json:"aws_access_key_id"`
+	SecretAccessKey string `json:"aws_secret_access_key"`
+}
+
 // AWSPricing maps a k8s node to an AWS Pricing "product"
 // AWSPricing maps a k8s node to an AWS Pricing "product"
 type AWSPricing struct {
 type AWSPricing struct {
 	Products map[string]*AWSProduct `json:"products"`
 	Products map[string]*AWSProduct `json:"products"`
@@ -209,6 +214,9 @@ var regionToBillingRegionCode = map[string]string{
 	"us-gov-west-1":  "UGW1",
 	"us-gov-west-1":  "UGW1",
 }
 }
 
 
+var loadedAWSSecret bool = false
+var awsSecret *AWSAccessKey = nil
+
 func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 func (aws *AWS) GetLocalStorageQuery(window, offset string, rate bool, used bool) string {
 	return ""
 	return ""
 }
 }
@@ -269,6 +277,7 @@ func (aws *AWS) GetConfig() (*CustomPricing, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
 	return c, nil
 	return c, nil
 }
 }
 func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
 func (aws *AWS) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
@@ -474,8 +483,10 @@ func (aws *AWS) DownloadPricingData() error {
 	aws.SpotDataPrefix = c.SpotDataPrefix
 	aws.SpotDataPrefix = c.SpotDataPrefix
 	aws.ProjectID = c.ProjectID
 	aws.ProjectID = c.ProjectID
 	aws.SpotDataRegion = c.SpotDataRegion
 	aws.SpotDataRegion = c.SpotDataRegion
-	aws.ServiceKeyName = c.ServiceKeyName
-	aws.ServiceKeySecret = c.ServiceKeySecret
+
+	skn, sks := aws.getAWSAuth(false, c)
+	aws.ServiceKeyName = skn
+	aws.ServiceKeySecret = sks
 
 
 	if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
 	if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
 		klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
 		klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
@@ -939,18 +950,50 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	return makeStructure(defaultClusterName)
 	return makeStructure(defaultClusterName)
 }
 }
 
 
-// AddServiceKey adds an AWS service key, useful for pulling down out-of-cluster costs. Optional-- the container this runs in can be directly authorized.
-func (*AWS) AddServiceKey(formValues url.Values) error {
-	keyID := formValues.Get("access_key_ID")
-	key := formValues.Get("secret_access_key")
-	m := make(map[string]string)
-	m["access_key_ID"] = keyID
-	m["secret_access_key"] = key
-	result, err := json.Marshal(m)
+// Gets the aws key id and secret
+func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
+	// 1. Check config values first (set from frontend UI)
+	if cp.ServiceKeyName != "" && cp.ServiceKeySecret != "" {
+		return cp.ServiceKeyName, cp.ServiceKeySecret
+	}
+
+	// 2. Check for secret
+	s, _ := aws.loadAWSAuthSecret(forceReload)
+	if s != nil && s.AccessKeyID != "" && s.SecretAccessKey != "" {
+		return s.AccessKeyID, s.SecretAccessKey
+	}
+
+	// 3. Fall back to env vars
+	return os.Getenv(awsAccessKeyIDEnvVar), os.Getenv(awsAccessKeySecretEnvVar)
+}
+
+// Load once and cache the result (even on failure). This is an install time secret, so
+// we don't expect the secret to change. If it does, however, we can force reload using
+// the input parameter.
+func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
+	if !force && loadedAWSSecret {
+		return awsSecret, nil
+	}
+	loadedAWSSecret = true
+
+	exists, err := util.FileExists(authSecretPath)
+	if !exists || err != nil {
+		return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
+	}
+
+	result, err := ioutil.ReadFile(authSecretPath)
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
-	return ioutil.WriteFile("/var/configs/key.json", result, 0644)
+
+	var ak AWSAccessKey
+	err = json.Unmarshal(result, &ak)
+	if err != nil {
+		return nil, err
+	}
+
+	awsSecret = &ak
+	return awsSecret, nil
 }
 }
 
 
 func (aws *AWS) configureAWSAuth() error {
 func (aws *AWS) configureAWSAuth() error {

+ 90 - 6
pkg/cloud/azureprovider.go

@@ -5,7 +5,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
-	"net/url"
+	"io/ioutil"
 	"os"
 	"os"
 	"regexp"
 	"regexp"
 	"strconv"
 	"strconv"
@@ -13,6 +13,7 @@ import (
 	"sync"
 	"sync"
 
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/util"
 
 
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-09-01/skus"
 	"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2017-09-01/skus"
 	"github.com/Azure/azure-sdk-for-go/services/containerservice/mgmt/2018-03-31/containerservice"
 	"github.com/Azure/azure-sdk-for-go/services/containerservice/mgmt/2018-03-31/containerservice"
@@ -59,6 +60,9 @@ var (
 	mtStandardN, _ = regexp.Compile(`^Standard_N[C|D|V]\d+r?[_v\d]*[_Promo]*$`)
 	mtStandardN, _ = regexp.Compile(`^Standard_N[C|D|V]\d+r?[_v\d]*[_Promo]*$`)
 )
 )
 
 
+var loadedAzureSecret bool = false
+var azureSecret *AzureServiceKey = nil
+
 type regionParts []string
 type regionParts []string
 
 
 func (r regionParts) String() string {
 func (r regionParts) String() string {
@@ -197,13 +201,90 @@ func (k *azureKey) GPUType() string {
 		return t
 		return t
 	}
 	}
 	return ""
 	return ""
-
 }
 }
 
 
 func (k *azureKey) ID() string {
 func (k *azureKey) ID() string {
 	return ""
 	return ""
 }
 }
 
 
+// Represents an azure app key
+type AzureAppKey struct {
+	AppID       string `json:"appId"`
+	DisplayName string `json:"displayName"`
+	Name        string `json:"name"`
+	Password    string `json:"password"`
+	Tenant      string `json:"tenant"`
+}
+
+// Azure service key for a specific subscription
+type AzureServiceKey struct {
+	SubscriptionID string       `json:"subscriptionId"`
+	ServiceKey     *AzureAppKey `json:"serviceKey"`
+}
+
+// Validity check on service key
+func (ask *AzureServiceKey) IsValid() bool {
+	return ask.SubscriptionID != "" &&
+		ask.ServiceKey != nil &&
+		ask.ServiceKey.AppID != "" &&
+		ask.ServiceKey.Password != "" &&
+		ask.ServiceKey.Tenant != ""
+}
+
+// Loads the azure authentication via configuration or a secret set at install time.
+func (az *Azure) getAzureAuth(forceReload bool, cp *CustomPricing) (subscriptionID, clientID, clientSecret, tenantID string) {
+	// 1. Check config values first (set from frontend UI)
+	if cp.AzureSubscriptionID != "" && cp.AzureClientID != "" && cp.AzureClientSecret != "" && cp.AzureTenantID != "" {
+		subscriptionID = cp.AzureSubscriptionID
+		clientID = cp.AzureClientID
+		clientSecret = cp.AzureClientSecret
+		tenantID = cp.AzureTenantID
+		return
+	}
+
+	// 2. Check for secret
+	s, _ := az.loadAzureAuthSecret(forceReload)
+	if s != nil && s.IsValid() {
+		subscriptionID = s.SubscriptionID
+		clientID = s.ServiceKey.AppID
+		clientSecret = s.ServiceKey.Password
+		tenantID = s.ServiceKey.Tenant
+		return
+	}
+
+	// 3. Empty values
+	return "", "", "", ""
+}
+
+// Load once and cache the result (even on failure). This is an install time secret, so
+// we don't expect the secret to change. If it does, however, we can force reload using
+// the input parameter.
+func (az *Azure) loadAzureAuthSecret(force bool) (*AzureServiceKey, error) {
+	if !force && loadedAzureSecret {
+		return azureSecret, nil
+	}
+	loadedAzureSecret = true
+
+	exists, err := util.FileExists(authSecretPath)
+	if !exists || err != nil {
+		return nil, fmt.Errorf("Failed to locate service account file: %s", authSecretPath)
+	}
+
+	result, err := ioutil.ReadFile(authSecretPath)
+	if err != nil {
+		return nil, err
+	}
+
+	var ask AzureServiceKey
+	err = json.Unmarshal(result, &ask)
+	if err != nil {
+		return nil, err
+	}
+
+	azureSecret = &ask
+	return azureSecret, nil
+}
+
 func (az *Azure) GetKey(labels map[string]string) Key {
 func (az *Azure) GetKey(labels map[string]string) Key {
 	cfg, err := az.GetConfig()
 	cfg, err := az.GetConfig()
 	if err != nil {
 	if err != nil {
@@ -319,6 +400,13 @@ func (az *Azure) DownloadPricingData() error {
 		return err
 		return err
 	}
 	}
 
 
+	// Load the service provider keys
+	subscriptionID, clientID, clientSecret, tenantID := az.getAzureAuth(false, config)
+	config.AzureSubscriptionID = subscriptionID
+	config.AzureClientID = clientID
+	config.AzureClientSecret = clientSecret
+	config.AzureTenantID = tenantID
+
 	var authorizer autorest.Authorizer
 	var authorizer autorest.Authorizer
 
 
 	if config.AzureClientID != "" && config.AzureClientSecret != "" && config.AzureTenantID != "" {
 	if config.AzureClientID != "" && config.AzureClientSecret != "" && config.AzureTenantID != "" {
@@ -602,10 +690,6 @@ func (az *Azure) ClusterInfo() (map[string]string, error) {
 
 
 }
 }
 
 
-func (az *Azure) AddServiceKey(url url.Values) error {
-	return nil
-}
-
 func (az *Azure) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
 func (az *Azure) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
 	return az.Config.UpdateFromMap(a)
 	return az.Config.UpdateFromMap(a)
 }
 }

+ 0 - 5
pkg/cloud/customprovider.go

@@ -3,7 +3,6 @@ package cloud
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"io"
 	"io"
-	"net/url"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
@@ -109,10 +108,6 @@ func (cp *CustomProvider) ClusterInfo() (map[string]string, error) {
 	return m, nil
 	return m, nil
 }
 }
 
 
-func (*CustomProvider) AddServiceKey(url.Values) error {
-	return nil
-}
-
 func (*CustomProvider) GetDisks() ([]byte, error) {
 func (*CustomProvider) GetDisks() ([]byte, error) {
 	return nil, nil
 	return nil, nil
 }
 }

+ 87 - 38
pkg/cloud/gcpprovider.go

@@ -8,7 +8,6 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"math"
 	"math"
 	"net/http"
 	"net/http"
-	"net/url"
 	"os"
 	"os"
 	"regexp"
 	"regexp"
 	"strconv"
 	"strconv"
@@ -21,6 +20,7 @@ import (
 	"cloud.google.com/go/bigquery"
 	"cloud.google.com/go/bigquery"
 	"cloud.google.com/go/compute/metadata"
 	"cloud.google.com/go/compute/metadata"
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/util"
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"
 	"golang.org/x/oauth2/google"
 	compute "google.golang.org/api/compute/v1"
 	compute "google.golang.org/api/compute/v1"
@@ -181,6 +181,43 @@ func (gcp *GCP) GetManagementPlatform() (string, error) {
 	return "", nil
 	return "", nil
 }
 }
 
 
+// Attempts to load a GCP auth secret and copy the contents to the key file.
+func (*GCP) loadGCPAuthSecret() {
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+
+	keyPath := path + "key.json"
+	keyExists, _ := util.FileExists(keyPath)
+	if keyExists {
+		klog.V(1).Infof("GCP Auth Key already exists, no need to load from secret")
+		return
+	}
+
+	exists, err := util.FileExists(authSecretPath)
+	if !exists || err != nil {
+		errMessage := "Secret does not exist"
+		if err != nil {
+			errMessage = err.Error()
+		}
+
+		klog.V(4).Infof("[Warning] Failed to load auth secret, or was not mounted: %s", errMessage)
+		return
+	}
+
+	result, err := ioutil.ReadFile(authSecretPath)
+	if err != nil {
+		klog.V(4).Infof("[Warning] Failed to load auth secret, or was not mounted: %s", err.Error())
+		return
+	}
+
+	err = ioutil.WriteFile(keyPath, result, 0644)
+	if err != nil {
+		klog.V(4).Infof("[Warning] Failed to copy auth secret to %s: %s", keyPath, err.Error())
+	}
+}
+
 func (gcp *GCP) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
 func (gcp *GCP) UpdateConfigFromConfigMap(a map[string]string) (*CustomPricing, error) {
 	return gcp.Config.UpdateFromMap(a)
 	return gcp.Config.UpdateFromMap(a)
 }
 }
@@ -197,20 +234,22 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			c.ProjectID = a.ProjectID
 			c.ProjectID = a.ProjectID
 			c.BillingDataDataset = a.BillingDataDataset
 			c.BillingDataDataset = a.BillingDataDataset
 
 
-			j, err := json.Marshal(a.Key)
-			if err != nil {
-				return err
-			}
+			if len(a.Key) > 0 {
+				j, err := json.Marshal(a.Key)
+				if err != nil {
+					return err
+				}
 
 
-			path := os.Getenv("CONFIG_PATH")
-			if path == "" {
-				path = "/models/"
-			}
+				path := os.Getenv("CONFIG_PATH")
+				if path == "" {
+					path = "/models/"
+				}
 
 
-			keyPath := path + "key.json"
-			err = ioutil.WriteFile(keyPath, j, 0644)
-			if err != nil {
-				return err
+				keyPath := path + "key.json"
+				err = ioutil.WriteFile(keyPath, j, 0644)
+				if err != nil {
+					return err
+				}
 			}
 			}
 		} else if updateType == AthenaInfoUpdateType {
 		} else if updateType == AthenaInfoUpdateType {
 			a := AwsAthenaInfo{}
 			a := AwsAthenaInfo{}
@@ -270,6 +309,7 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []stri
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+
 	var s []*OutOfClusterAllocation
 	var s []*OutOfClusterAllocation
 	if c.ServiceKeyName != "" && c.ServiceKeySecret != "" && !crossCluster {
 	if c.ServiceKeyName != "" && c.ServiceKeySecret != "" && !crossCluster {
 		aws, err := NewCrossClusterProvider("aws", "gcp.json", gcp.Clientset)
 		aws, err := NewCrossClusterProvider("aws", "gcp.json", gcp.Clientset)
@@ -313,30 +353,42 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []stri
 		s = append(s, gcpOOC...)
 		s = append(s, gcpOOC...)
 		qerr = err
 		qerr = err
 		*/
 		*/
-		queryString := fmt.Sprintf(`(SELECT
-			service.description as service,
-			TO_JSON_STRING(labels) as keys,
-			SUM(cost) as cost
-		  	FROM  %s
-		 	WHERE
-				EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
-				AND usage_start_time >= "%s" AND usage_start_time < "%s"
-			GROUP BY  service,keys)`, c.BillingDataDataset, aggregator, start, end)
+		queryString := fmt.Sprintf(`(
+			SELECT
+				service.description as service,
+				TO_JSON_STRING(labels) as keys,
+				SUM(cost) as cost
+			FROM  %s
+			WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
+			AND usage_start_time >= "%s" AND usage_start_time < "%s"
+			GROUP BY service, keys
+		)`, c.BillingDataDataset, aggregator, start, end)
 		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
 		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
 		s = append(s, gcpOOC...)
 		s = append(s, gcpOOC...)
 		qerr = err
 		qerr = err
 	} else {
 	} else {
-		queryString := fmt.Sprintf(`(SELECT
-			service.description as service,
-			TO_JSON_STRING(labels) as keys,
-			SUM(cost) as cost
+		if filterType == "kubernetes_labels" {
+			fvs := strings.Split(filterValue, "=")
+			if len(fvs) == 2 {
+				filterType = fvs[0]
+				filterValue = fvs[1]
+			} else {
+				klog.V(2).Infof("[Warning] illegal kubernetes_labels filterValue: %s", filterValue)
+			}
+		}
+
+		queryString := fmt.Sprintf(`(
+			SELECT
+				service.description as service,
+				TO_JSON_STRING(labels) as keys,
+				SUM(cost) as cost
 		  	FROM  %s
 		  	FROM  %s
-		 	WHERE
-				EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "%s" AND l.value = "%s")
-				AND EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
-				AND usage_start_time >= "%s" AND usage_start_time < "%s"
-			GROUP BY  service,keys)`, c.BillingDataDataset, filterType, filterValue, aggregator, start, end)
+		 	WHERE EXISTS (SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
+			AND EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "%s" AND l.value = "%s")
+			AND usage_start_time >= "%s" AND usage_start_time < "%s"
+			GROUP BY service, keys
+		)`, c.BillingDataDataset, aggregator, filterType, filterValue, start, end)
 		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
 		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
 		s = append(s, gcpOOC...)
 		s = append(s, gcpOOC...)
@@ -444,13 +496,6 @@ func (gcp *GCP) ClusterInfo() (map[string]string, error) {
 	return m, nil
 	return m, nil
 }
 }
 
 
-// AddServiceKey adds the service key as required for GetDisks
-func (*GCP) AddServiceKey(formValues url.Values) error {
-	key := formValues.Get("key")
-	k := []byte(key)
-	return ioutil.WriteFile("/var/configs/key.json", k, 0644)
-}
-
 // GetDisks returns the GCP disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
 // GetDisks returns the GCP disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
 func (*GCP) GetDisks() ([]byte, error) {
 func (*GCP) GetDisks() ([]byte, error) {
 	// metadata API setup
 	// metadata API setup
@@ -858,6 +903,8 @@ func (gcp *GCP) DownloadPricingData() error {
 		klog.V(2).Infof("Error downloading default pricing data: %s", err.Error())
 		klog.V(2).Infof("Error downloading default pricing data: %s", err.Error())
 		return err
 		return err
 	}
 	}
+	gcp.loadGCPAuthSecret()
+
 	gcp.BaseCPUPrice = c.CPU
 	gcp.BaseCPUPrice = c.CPU
 	gcp.ProjectID = c.ProjectID
 	gcp.ProjectID = c.ProjectID
 	gcp.BillingDataDataset = c.BillingDataDataset
 	gcp.BillingDataDataset = c.BillingDataDataset
@@ -1238,6 +1285,8 @@ func (gcp *gcpKey) Features() string {
 	instanceType := strings.ToLower(strings.Join(strings.Split(gcp.Labels[v1.LabelInstanceType], "-")[:2], ""))
 	instanceType := strings.ToLower(strings.Join(strings.Split(gcp.Labels[v1.LabelInstanceType], "-")[:2], ""))
 	if instanceType == "n1highmem" || instanceType == "n1highcpu" {
 	if instanceType == "n1highmem" || instanceType == "n1highcpu" {
 		instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
 		instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
+	} else if instanceType == "n2highmem" || instanceType == "n2highcpu" {
+		instanceType = "n2standard"
 	} else if instanceType == "e2highmem" || instanceType == "e2highcpu" {
 	} else if instanceType == "e2highmem" || instanceType == "e2highcpu" {
 		instanceType = "e2standard"
 		instanceType = "e2standard"
 	} else if strings.HasPrefix(instanceType, "custom") {
 	} else if strings.HasPrefix(instanceType, "custom") {

+ 1 - 2
pkg/cloud/provider.go

@@ -5,7 +5,6 @@ import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
-	"net/url"
 	"os"
 	"os"
 	"strings"
 	"strings"
 
 
@@ -21,6 +20,7 @@ const clusterIDKey = "CLUSTER_ID"
 const remoteEnabled = "REMOTE_WRITE_ENABLED"
 const remoteEnabled = "REMOTE_WRITE_ENABLED"
 const remotePW = "REMOTE_WRITE_PASSWORD"
 const remotePW = "REMOTE_WRITE_PASSWORD"
 const sqlAddress = "SQL_ADDRESS"
 const sqlAddress = "SQL_ADDRESS"
+const authSecretPath = "/var/secrets/service-key.json"
 
 
 var createTableStatements = []string{
 var createTableStatements = []string{
 	`CREATE TABLE IF NOT EXISTS names (
 	`CREATE TABLE IF NOT EXISTS names (
@@ -162,7 +162,6 @@ type CustomPricing struct {
 // Provider represents a k8s provider.
 // Provider represents a k8s provider.
 type Provider interface {
 type Provider interface {
 	ClusterInfo() (map[string]string, error)
 	ClusterInfo() (map[string]string, error)
-	AddServiceKey(url.Values) error
 	GetDisks() ([]byte, error)
 	GetDisks() ([]byte, error)
 	NodePricing(Key) (*Node, error)
 	NodePricing(Key) (*Node, error)
 	PVPricing(PVKey) (*PV, error)
 	PVPricing(PVKey) (*PV, error)

+ 15 - 4
pkg/costmodel/costmodel.go

@@ -215,8 +215,17 @@ const (
 				scalar(avg(avg_over_time(prometheus_target_interval_length_seconds[%s])))*%f)
 				scalar(avg(avg_over_time(prometheus_target_interval_length_seconds[%s])))*%f)
 		) by (namespace,container,pod,node,cluster_id)
 		) by (namespace,container,pod,node,cluster_id)
 	, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
-	queryPVCAllocation        = `avg_over_time(pod_pvc_allocation[%s])`
-	queryPVHourlyCost         = `avg_over_time(pv_hourly_cost[%s])`
+	// queryPVCAllocationFmt yields the total byte-hour CPU allocation over the given window.
+	//  sum(all VCPU measurements within given window) = [byte*min] by metric
+	//  (") / 60 = [byte*hour] by metric, assuming no missed scrapes
+	//  (") * (normalization factor) = [byte*hour] by metric, normalized for missed scrapes
+	//  sum(") by unique pvc = [VCPU*hour] by (cluster, namespace, pod, pv, pvc)
+	// Note: normalization factor is 1.0 if no scrapes are missed and has an upper bound determined by minExpectedScrapeRate
+	// so that coarse resolutions don't push normalization factors too high; e.g. 24h resolution with 1h of data would make
+	// for a normalization factor of 24. With a minimumExpectedScrapeRate of 0.95, that caps the norm factor at
+	queryPVCAllocationFmt = `sum(sum_over_time(pod_pvc_allocation[%s:1m])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) / 60
+		* 60 / clamp_min(count_over_time(sum(pod_pvc_allocation) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim)[%s:1m])/%f, 60 * %f)`
+	queryPVHourlyCostFmt      = `avg_over_time(pv_hourly_cost[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
 	queryDeploymentLabels     = `avg_over_time(deployment_match_labels[%s])`
@@ -1694,6 +1703,8 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
 	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
+	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString, windowString, resolutionHours, minimumExpectedScrapeRate)
+	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, windowString)
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")
 	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, windowString, "")
 	queryNetInternetRequests := fmt.Sprintf(queryInternetNetworkUsage, windowString, "")
@@ -1853,7 +1864,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		defer measureTimeAsync(time.Now(), profileThreshold, "PVPodAllocation", queryProfileCh)
 		defer measureTimeAsync(time.Now(), profileThreshold, "PVPodAllocation", queryProfileCh)
 
 
 		var promErr error
 		var promErr error
-		pvPodAllocationResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVCAllocation, windowString), start, end, window)
+		pvPodAllocationResults, promErr = QueryRange(cli, queryPVCAllocation, start, end, window)
 
 
 		ec.Report(promErr)
 		ec.Report(promErr)
 	}()
 	}()
@@ -1863,7 +1874,7 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 		defer measureTimeAsync(time.Now(), profileThreshold, "PVCost", queryProfileCh)
 		defer measureTimeAsync(time.Now(), profileThreshold, "PVCost", queryProfileCh)
 
 
 		var promErr error
 		var promErr error
-		pvCostResults, promErr = QueryRange(cli, fmt.Sprintf(queryPVHourlyCost, windowString), start, end, window)
+		pvCostResults, promErr = QueryRange(cli, queryPVHourlyCost, start, end, window)
 
 
 		ec.Report(promErr)
 		ec.Report(promErr)
 	}()
 	}()

+ 20 - 13
pkg/costmodel/router.go

@@ -25,7 +25,6 @@ import (
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 
-	bolt "github.com/etcd-io/bbolt"
 	"github.com/patrickmn/go-cache"
 	"github.com/patrickmn/go-cache"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus"
 
 
@@ -843,20 +842,28 @@ func (a *Accesses) recordPrices() {
 func newClusterManager() *cm.ClusterManager {
 func newClusterManager() *cm.ClusterManager {
 	clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
 	clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
 
 
-	path := os.Getenv("CONFIG_PATH")
-	db, err := bolt.Open(path+"costmodel.db", 0600, nil)
-	if err != nil {
-		klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
-		return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
-	}
+	// Return a memory-backed cluster manager populated by configmap
+	return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
 
 
-	store, err := cm.NewBoltDBClusterStorage("clusters", db)
-	if err != nil {
-		klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
-		return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
-	}
+	// NOTE: The following should be used with a persistent disk store. Since the
+	// NOTE: configmap approach is currently the "persistent" source (entries are read-only
+	// NOTE: on the backend), we don't currently need to store on disk.
+	/*
+		path := os.Getenv("CONFIG_PATH")
+		db, err := bolt.Open(path+"costmodel.db", 0600, nil)
+		if err != nil {
+			klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
+			return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
+		}
+
+		store, err := cm.NewBoltDBClusterStorage("clusters", db)
+		if err != nil {
+			klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
+			return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
+		}
 
 
-	return cm.NewConfiguredClusterManager(store, clustersConfigFile)
+		return cm.NewConfiguredClusterManager(store, clustersConfigFile)
+	*/
 }
 }
 
 
 type ConfigWatchers struct {
 type ConfigWatchers struct {