Quellcode durchsuchen

Merge pull request #601 from kubecost/develop

Merge develop into master
Ajay Tripathy vor 5 Jahren
Ursprung
Commit
72a51a9cfc

+ 2 - 2
CONTRIBUTING.md

@@ -12,8 +12,8 @@ you can start by asking a question on [Slack](https://join.slack.com/t/kubecost/
 Follow these steps to build from source and deploy:
 
 1. `docker build --rm -f "Dockerfile" -t <repo>/kubecost-cost-model:<tag> .`
-2. Edit the [pulled image](https://github.com/kubecost/cost-model/blob/master/kubernetes/deployment.yaml#L22) in the deployment.yaml to <repo>/kubecost-cost-model:<tag>
-3. Set [this environment variable](https://github.com/kubecost/cost-model/blob/master/kubernetes/deployment.yaml#L30) to the address of your prometheus server
+2. Edit the [pulled image](https://github.com/kubecost/cost-model/blob/master/kubernetes/deployment.yaml#L25) in the deployment.yaml to <repo>/kubecost-cost-model:<tag>
+3. Set [this environment variable](https://github.com/kubecost/cost-model/blob/master/kubernetes/deployment.yaml#L33) to the address of your prometheus server
 4. `kubectl create namespace cost-model`
 5. `kubectl apply -f kubernetes/ --namespace cost-model`
 6. `kubectl port-forward --namespace cost-model service/cost-model 9003`

+ 2 - 0
go.sum

@@ -295,6 +295,7 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf
 github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
 github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
 github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
+github.com/prometheus/client_golang v1.8.0 h1:zvJNkoCFAnYFNC24FV8nW4JdRJ3GIFcLbg65lL/JDcw=
 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
 github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f h1:BVwpUVJDADN2ufcGik7W992pyps0wZ888b/y9GXcLTU=
 github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
@@ -561,6 +562,7 @@ k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag=
 k8s.io/apimachinery v0.19.0 h1:gjKnAda/HZp5k4xQYjL0K/Yb66IvNqjthCb03QlKpaQ=
 k8s.io/apimachinery v0.19.1 h1:cwsxZazM/LA9aUsBaL4bRS5ygoM6bYp8dFk22DSYQa4=
 k8s.io/apimachinery v0.19.2 h1:5Gy9vQpAGTKHPVOh5c4plE274X8D/6cuEiTO2zve7tc=
+k8s.io/apimachinery v0.19.3 h1:bpIQXlKjB4cB/oNpnNnV+BybGPR7iP5oYpsOTEJ4hgc=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5 h1:BwY2C//EoWktJi74O6R2REBonrhsfhRI0qfVwOjOPp8=
 k8s.io/client-go v0.0.0-20190404172613-2e1a3ed22ac5/go.mod h1:bIEHXHbykaOlj+pgLllzLJ2RPGdzkjtqdk0Il07KPEM=
 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab h1:E8Fecph0qbNsAbijJJQryKu4Oi9QTp5cVpjTE+nqg6g=

+ 232 - 441
pkg/cloud/awsprovider.go

@@ -42,6 +42,44 @@ const awsReservedInstancePricePerHour = 0.0287
 const supportedSpotFeedVersion = "1"
 const SpotInfoUpdateType = "spotinfo"
 const AthenaInfoUpdateType = "athenainfo"
+const PreemptibleType = "preemptible"
+
+const APIPricingSource = "Public API"
+const SpotPricingSource = "Spot Data Feed"
+const ReservedInstancePricingSource = "Savings Plan, Reservied Instance, and Out-Of-Cluster"
+
+func (aws *AWS) PricingSourceStatus() map[string]*PricingSource {
+
+	sources := make(map[string]*PricingSource)
+
+	sps := &PricingSource{
+		Name: SpotPricingSource,
+	}
+	sps.Error = aws.SpotPricingStatus
+	if sps.Error != "" {
+		sps.Available = false
+	} else if len(aws.SpotPricingByInstanceID) > 0 {
+		sps.Available = true
+	} else {
+		sps.Error = "No spot instances detected"
+	}
+	sources[SpotPricingSource] = sps
+
+	rps := &PricingSource{
+		Name: ReservedInstancePricingSource,
+	}
+	rps.Error = aws.RIPricingStatus
+	if rps.Error != "" {
+		rps.Available = false
+	} else if len(aws.RIPricingByInstanceID) > 0 {
+		rps.Available = true
+	} else {
+		rps.Error = "No reserved instances detected"
+	}
+	sources[ReservedInstancePricingSource] = rps
+	return sources
+
+}
 
 // How often spot data is refreshed
 const SpotRefreshDuration = 15 * time.Minute
@@ -81,7 +119,9 @@ type AWS struct {
 	SpotPricingUpdatedAt        *time.Time
 	SpotRefreshRunning          bool
 	SpotPricingLock             sync.RWMutex
+	SpotPricingStatus           string
 	RIPricingByInstanceID       map[string]*RIData
+	RIPricingStatus             string
 	RIDataRunning               bool
 	RIDataLock                  sync.RWMutex
 	SavingsPlanDataByInstanceID map[string]*SavingsPlanData
@@ -96,8 +136,6 @@ type AWS struct {
 	BaseSpotRAMPrice            string
 	SpotLabelName               string
 	SpotLabelValue              string
-	ServiceKeyName              string
-	ServiceKeySecret            string
 	SpotDataRegion              string
 	SpotDataBucket              string
 	SpotDataPrefix              string
@@ -425,16 +463,12 @@ func (k *awsKey) ID() string {
 
 func (k *awsKey) Features() string {
 
-	instanceType := k.Labels[v1.LabelInstanceType]
-	var operatingSystem string
-	operatingSystem, ok := k.Labels[v1.LabelOSStable]
-	if !ok {
-		operatingSystem = k.Labels["beta.kubernetes.io/os"]
-	}
-	region := k.Labels[v1.LabelZoneRegion]
+	instanceType, _ := util.GetInstanceType(k.Labels)
+	operatingSystem, _ := util.GetOperatingSystem(k.Labels)
+	region, _ := util.GetRegion(k.Labels)
 
 	key := region + "," + instanceType + "," + operatingSystem
-	usageType := "preemptible"
+	usageType := PreemptibleType
 	spotKey := key + "," + usageType
 	if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
 		return spotKey
@@ -494,7 +528,7 @@ func (key *awsPVKey) Features() string {
 	// Storage class names are generally EBS volume types (gp2)
 	// Keys in Pricing are based on UsageTypes (EBS:VolumeType.gp2)
 	// Converts between the 2
-	region := key.Labels[v1.LabelZoneRegion]
+	region, _ := util.GetRegion(key.Labels)
 	//if region == "" {
 	//	region = "us-east-1"
 	//}
@@ -517,7 +551,7 @@ func (aws *AWS) GetKey(labels map[string]string, n *v1.Node) Key {
 
 func (aws *AWS) isPreemptible(key string) bool {
 	s := strings.Split(key, ",")
-	if len(s) == 4 && s[3] == "preemptible" {
+	if len(s) == 4 && s[3] == PreemptibleType {
 		return true
 	}
 	return false
@@ -527,6 +561,45 @@ func (aws *AWS) ClusterManagementPricing() (string, float64, error) {
 	return aws.clusterProvisioner, aws.clusterManagementPrice, nil
 }
 
+// Use the pricing data from the current region. Fall back to using all region data if needed.
+func (aws *AWS) getRegionPricing(nodeList []*v1.Node) (*http.Response, string, error) {
+
+	pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/"
+
+	region := ""
+	multiregion := false
+	for _, n := range nodeList {
+		labels := n.GetLabels()
+		currentNodeRegion := ""
+		if r, ok := util.GetRegion(labels); ok {
+			currentNodeRegion = r
+		} else {
+			multiregion = true // We weren't able to detect the node's region, so pull all data.
+			break
+		}
+		if region == "" { // We haven't set a region yet
+			region = currentNodeRegion
+		} else if region != "" && currentNodeRegion != region { // If two nodes have different regions here, we'll need to fetch all pricing data.
+			multiregion = true
+			break
+		}
+	}
+
+	if region != "" && !multiregion {
+		pricingURL += region + "/"
+	}
+
+	pricingURL += "index.json"
+
+	klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
+	resp, err := http.Get(pricingURL)
+	if err != nil {
+		klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
+		return nil, pricingURL, err
+	}
+	return resp, pricingURL, err
+}
+
 // DownloadPricingData fetches data from the AWS Pricing API
 func (aws *AWS) DownloadPricingData() error {
 	aws.DownloadPricingDataLock.Lock()
@@ -550,9 +623,7 @@ func (aws *AWS) DownloadPricingData() error {
 	aws.ProjectID = c.ProjectID
 	aws.SpotDataRegion = c.SpotDataRegion
 
-	skn, sks := aws.getAWSAuth(false, c)
-	aws.ServiceKeyName = skn
-	aws.ServiceKeySecret = sks
+	aws.ConfigureAuthWith(c) // load aws authentication from configuration or secret
 
 	if len(aws.SpotDataBucket) != 0 && len(aws.ProjectID) == 0 {
 		klog.V(1).Infof("using SpotDataBucket \"%s\" without ProjectID will not end well", aws.SpotDataBucket)
@@ -643,15 +714,10 @@ func (aws *AWS) DownloadPricingData() error {
 	aws.ValidPricingKeys = make(map[string]bool)
 	skusToKeys := make(map[string]string)
 
-	pricingURL := "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/index.json"
-	klog.V(2).Infof("starting download of \"%s\", which is quite large ...", pricingURL)
-	resp, err := http.Get(pricingURL)
+	resp, pricingURL, err := aws.getRegionPricing(nodeList)
 	if err != nil {
-		klog.V(2).Infof("Bogus fetch of \"%s\": %v", pricingURL, err)
 		return err
 	}
-	klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
-
 	dec := json.NewDecoder(resp.Body)
 	for {
 		t, err := dec.Token()
@@ -785,6 +851,7 @@ func (aws *AWS) DownloadPricingData() error {
 			}
 		}
 	}
+	klog.V(2).Infof("Finished downloading \"%s\"", pricingURL)
 
 	// Always run spot pricing refresh when performing download
 	aws.refreshSpotPricing(true)
@@ -821,11 +888,13 @@ func (aws *AWS) refreshSpotPricing(force bool) {
 		return
 	}
 
-	sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
+	sp, err := aws.parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion)
 	if err != nil {
 		klog.V(1).Infof("Skipping AWS spot data download: %s", err.Error())
+		aws.SpotPricingStatus = err.Error()
 		return
 	}
+	aws.SpotPricingStatus = ""
 
 	// update time last updated
 	aws.SpotPricingUpdatedAt = &now
@@ -929,10 +998,10 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseCPUPrice: aws.BaseCPUPrice,
 			BaseRAMPrice: aws.BaseRAMPrice,
 			BaseGPUPrice: aws.BaseGPUPrice,
-			UsageType:    usageType,
+			UsageType:    PreemptibleType,
 		}, nil
 	} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
-		log.DedupedWarningf(5, "Node %s marked preemitible but we have no data in spot feed", k.ID())
+		log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
 		return &Node{
 			VCPU:         terms.VCpu,
 			VCPUCost:     aws.BaseSpotCPUPrice,
@@ -943,7 +1012,7 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*No
 			BaseCPUPrice: aws.BaseCPUPrice,
 			BaseRAMPrice: aws.BaseRAMPrice,
 			BaseGPUPrice: aws.BaseGPUPrice,
-			UsageType:    usageType,
+			UsageType:    PreemptibleType,
 		}, nil
 	} else if sp, ok := aws.savingsPlanPricing(k.ID()); ok {
 		strCost := fmt.Sprintf("%f", sp.EffectiveCost)
@@ -1000,7 +1069,7 @@ func (aws *AWS) NodePricing(k Key) (*Node, error) {
 	key := k.Features()
 	usageType := "ondemand"
 	if aws.isPreemptible(key) {
-		usageType = "preemptible"
+		usageType = PreemptibleType
 	}
 
 	terms, ok := aws.Pricing[key]
@@ -1130,6 +1199,31 @@ func (awsProvider *AWS) ClusterInfo() (map[string]string, error) {
 	return makeStructure(defaultClusterName)
 }
 
+// updates the authentication to the latest values (via config or secret)
+func (aws *AWS) ConfigureAuth() error {
+	c, err := aws.Config.GetCustomPricingData()
+	if err != nil {
+		klog.V(1).Infof("Error downloading default pricing data: %s", err.Error())
+	}
+	return aws.ConfigureAuthWith(c)
+}
+
+// updates the authentication to the latest values (via config or secret)
+func (aws *AWS) ConfigureAuthWith(config *CustomPricing) error {
+	accessKeyID, accessKeySecret := aws.getAWSAuth(false, config)
+	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
+		if err != nil {
+			return err
+		}
+		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 // Gets the aws key id and secret
 func (aws *AWS) getAWSAuth(forceReload bool, cp *CustomPricing) (string, string) {
 	if aws.ServiceAccountChecks == nil { // safety in case checks don't exist
@@ -1199,22 +1293,6 @@ func (aws *AWS) loadAWSAuthSecret(force bool) (*AWSAccessKey, error) {
 	return awsSecret, nil
 }
 
-func (aws *AWS) configureAWSAuth() error {
-	accessKeyID := aws.ServiceKeyName
-	accessKeySecret := aws.ServiceKeySecret
-	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
-		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
-		if err != nil {
-			return err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
 func getClusterConfig(ccFile string) (map[string]string, error) {
 	clusterConfig, err := os.Open(ccFile)
 	if err != nil {
@@ -1234,43 +1312,6 @@ func getClusterConfig(ccFile string) (map[string]string, error) {
 	return clusterConf, nil
 }
 
-// SetKeyEnv ensures that the two environment variables necessary to configure
-// a new AWS Session are set.
-func (a *AWS) SetKeyEnv() error {
-	// TODO add this to the helm chart, mirroring the cost-model
-	// configPath := env.GetConfigPath()
-	configPath := defaultConfigPath
-	path := configPath + "aws.json"
-
-	if _, err := os.Stat(path); err != nil {
-		if os.IsNotExist(err) {
-			log.DedupedErrorf(5, "file %s does not exist", path)
-		} else {
-			log.DedupedErrorf(5, "other file open error: %s", err)
-		}
-		return err
-	}
-
-	jsonFile, err := os.Open(path)
-	defer jsonFile.Close()
-
-	configMap := map[string]string{}
-	configBytes, err := ioutil.ReadAll(jsonFile)
-	if err != nil {
-		return err
-	}
-	json.Unmarshal([]byte(configBytes), &configMap)
-
-	keyName := configMap["awsServiceKeyName"]
-	keySecret := configMap["awsServiceKeySecret"]
-
-	// These are required before calling NewEnvCredentials below
-	env.Set(env.AWSAccessKeyIDEnvVar, keyName)
-	env.Set(env.AWSAccessKeySecretEnvVar, keySecret)
-
-	return nil
-}
-
 func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
 	sess, err := session.NewSession(&aws.Config{
 		Region:      aws.String(region),
@@ -1285,9 +1326,7 @@ func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput
 }
 
 func (a *AWS) GetAddresses() ([]byte, error) {
-	if err := a.SetKeyEnv(); err != nil {
-		return nil, err
-	}
+	a.ConfigureAuth() // load authentication data into env vars
 
 	addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
 	errorCh := make(chan error, len(awsRegions))
@@ -1372,9 +1411,7 @@ func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *stri
 
 // GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
 func (a *AWS) GetDisks() ([]byte, error) {
-	if err := a.SetKeyEnv(); err != nil {
-		return nil, err
-	}
+	a.ConfigureAuth() // load authentication data into env vars
 
 	volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
 	errorCh := make(chan error, len(awsRegions))
@@ -1514,21 +1551,81 @@ func generateAWSGroupBy(lastIdx int) string {
 	return strings.Join(sequence, ",")
 }
 
-func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
+func (a *AWS) QueryAthenaPaginated(query string) (*athena.GetQueryResultsInput, *athena.Athena, error) {
 	customPricing, err := a.GetConfig()
 	if err != nil {
-		return nil, err
+		return nil, nil, err
+	}
+	a.ConfigureAuthWith(customPricing)
+	region := aws.String(customPricing.AthenaRegion)
+	resultsBucket := customPricing.AthenaBucketName
+	database := customPricing.AthenaDatabase
+	c := &aws.Config{
+		Region: region,
+	}
+	s := session.Must(session.NewSession(c))
+	svc := athena.New(s)
+	if customPricing.MasterPayerARN != "" {
+		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
+		svc = athena.New(s, &aws.Config{
+			Region:      region,
+			Credentials: creds,
+		})
+	}
+
+	var e athena.StartQueryExecutionInput
+
+	var r athena.ResultConfiguration
+	r.SetOutputLocation(resultsBucket)
+	e.SetResultConfiguration(&r)
+
+	e.SetQueryString(query)
+	var q athena.QueryExecutionContext
+	q.SetDatabase(database)
+	e.SetQueryExecutionContext(&q)
+
+	res, err := svc.StartQueryExecution(&e)
+	if err != nil {
+		return nil, svc, err
 	}
-	if customPricing.ServiceKeyName != "" {
-		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+
+	klog.V(2).Infof("StartQueryExecution result:")
+	klog.V(2).Infof(res.GoString())
+
+	var qri athena.GetQueryExecutionInput
+	qri.SetQueryExecutionId(*res.QueryExecutionId)
+
+	var qrop *athena.GetQueryExecutionOutput
+	duration := time.Duration(2) * time.Second // Pause for 2 seconds
+
+	for {
+		qrop, err = svc.GetQueryExecution(&qri)
 		if err != nil {
-			return nil, err
+			return nil, svc, err
 		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
-		if err != nil {
-			return nil, err
+		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
+			break
 		}
+		time.Sleep(duration)
 	}
+	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
+
+		var ip athena.GetQueryResultsInput
+		ip.SetQueryExecutionId(*res.QueryExecutionId)
+		return &ip, svc, nil
+	} else {
+		return nil, svc, fmt.Errorf("No results available for %s", query)
+	}
+}
+
+func (a *AWS) QueryAthenaBillingData(query string) (*athena.GetQueryResultsOutput, error) {
+	customPricing, err := a.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+
+	a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
+
 	region := aws.String(customPricing.AthenaRegion)
 	resultsBucket := customPricing.AthenaBucketName
 	database := customPricing.AthenaDatabase
@@ -1695,8 +1792,10 @@ func (a *AWS) GetReservationDataFromAthena() error {
 	query := fmt.Sprintf(q, cfg.AthenaTable, start, end)
 	op, err := a.QueryAthenaBillingData(query)
 	if err != nil {
+		a.RIPricingStatus = err.Error()
 		return fmt.Errorf("Error fetching Reserved Instance Data: %s", err)
 	}
+	a.RIPricingStatus = ""
 	klog.Infof("Fetching RI data...")
 	if len(op.ResultSet.Rows) > 1 {
 		a.RIDataLock.Lock()
@@ -1778,104 +1877,44 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 		WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s' AND (%s)
 		GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, aggregatorOr, groupby)
 	}
-
-	klog.V(3).Infof("Running Query: %s", query)
-
-	if customPricing.ServiceKeyName != "" {
-		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
-		if err != nil {
-			return nil, err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
-		if err != nil {
-			return nil, err
-		}
-	}
-	region := aws.String(customPricing.AthenaRegion)
-	resultsBucket := customPricing.AthenaBucketName
-	database := customPricing.AthenaDatabase
-	c := &aws.Config{
-		Region: region,
-	}
-	s := session.Must(session.NewSession(c))
-	svc := athena.New(s)
-	if customPricing.MasterPayerARN != "" {
-		creds := stscreds.NewCredentials(s, customPricing.MasterPayerARN)
-		svc = athena.New(s, &aws.Config{
-			Region:      region,
-			Credentials: creds,
-		})
-	}
-
-	var e athena.StartQueryExecutionInput
-
-	var r athena.ResultConfiguration
-	r.SetOutputLocation(resultsBucket)
-	e.SetResultConfiguration(&r)
-
-	e.SetQueryString(query)
-	var q athena.QueryExecutionContext
-	q.SetDatabase(database)
-	e.SetQueryExecutionContext(&q)
-
-	res, err := svc.StartQueryExecution(&e)
-	if err != nil {
-		return nil, err
-	}
-
-	klog.V(2).Infof("StartQueryExecution result:")
-	klog.V(2).Infof(res.GoString())
-
-	var qri athena.GetQueryExecutionInput
-	qri.SetQueryExecutionId(*res.QueryExecutionId)
-
-	var qrop *athena.GetQueryExecutionOutput
-	duration := time.Duration(2) * time.Second // Pause for 2 seconds
-
-	for {
-		qrop, err = svc.GetQueryExecution(&qri)
-		if err != nil {
-			return nil, err
-		}
-		if *qrop.QueryExecution.Status.State != "RUNNING" && *qrop.QueryExecution.Status.State != "QUEUED" {
-			break
-		}
-		time.Sleep(duration)
-	}
 	var oocAllocs []*OutOfClusterAllocation
-	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
-
-		var ip athena.GetQueryResultsInput
-		ip.SetQueryExecutionId(*res.QueryExecutionId)
-
-		op, err := svc.GetQueryResults(&ip)
-		if err != nil {
-			return nil, err
-		}
-		if len(op.ResultSet.Rows) > 1 {
-			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows))] {
-				cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
-				if err != nil {
-					return nil, err
-				}
-				environment := ""
-				for _, d := range r.Data[1 : len(formattedAggregators)+1] {
-					if *d.VarCharValue != "" {
-						environment = *d.VarCharValue // just set to the first nonempty match
-					}
-					break
-				}
-				ooc := &OutOfClusterAllocation{
-					Aggregator:  strings.Join(aggregators, ","),
-					Environment: environment,
-					Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
-					Cost:        cost,
+	page := 0
+	processResults := func(op *athena.GetQueryResultsOutput, lastpage bool) bool {
+		iter := op.ResultSet.Rows
+		if page == 0 && len(iter) > 0 {
+			iter = op.ResultSet.Rows[1:len(op.ResultSet.Rows)]
+		}
+		page++
+		for _, r := range iter {
+			cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
+			if err != nil {
+				klog.Infof("Error converting cost `%s` from float ", *r.Data[lastIdx].VarCharValue)
+			}
+			environment := ""
+			for _, d := range r.Data[1 : len(formattedAggregators)+1] {
+				if *d.VarCharValue != "" {
+					environment = *d.VarCharValue // just set to the first nonempty match
 				}
-				oocAllocs = append(oocAllocs, ooc)
+				break
 			}
-		} else {
-			klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
+			ooc := &OutOfClusterAllocation{
+				Aggregator:  strings.Join(aggregators, ","),
+				Environment: environment,
+				Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
+				Cost:        cost,
+			}
+			oocAllocs = append(oocAllocs, ooc)
 		}
+		return true
+	}
+
+	klog.V(3).Infof("Running Query: %s", query)
+	ip, svc, err := a.QueryAthenaPaginated(query)
+
+	athenaErr := svc.GetQueryResultsPages(ip, processResults)
+	if athenaErr != nil {
+		klog.Infof("RETURNING ATHENA ERROR")
+		return nil, athenaErr
 	}
 
 	if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
@@ -1900,16 +1939,9 @@ func (a *AWS) QuerySQL(query string) ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
-	if customPricing.ServiceKeyName != "" {
-		err = env.Set(env.AWSAccessKeyIDEnvVar, customPricing.ServiceKeyName)
-		if err != nil {
-			return nil, err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
-		if err != nil {
-			return nil, err
-		}
-	}
+
+	a.ConfigureAuthWith(customPricing) // load aws authentication from configuration or secret
+
 	athenaConfigs, err := os.Open("/var/configs/athena.json")
 	if err != nil {
 		return nil, err
@@ -2024,22 +2056,13 @@ func (f fnames) Less(i, j int) bool {
 	return t1.Before(t2)
 }
 
-func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
+func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, region string) (map[string]*spotInfo, error) {
 	if a.ServiceAccountChecks == nil { // Set up checks to store error/success states
 		a.ServiceAccountChecks = make(map[string]*ServiceAccountCheck)
 	}
 
-	// credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
-	if accessKeyID != "" && accessKeySecret != "" {
-		err := env.Set(env.AWSAccessKeyIDEnvVar, accessKeyID)
-		if err != nil {
-			return nil, err
-		}
-		err = env.Set(env.AWSAccessKeySecretEnvVar, accessKeySecret)
-		if err != nil {
-			return nil, err
-		}
-	}
+	a.ConfigureAuth() // configure aws api authentication by setting env vars
+
 	s3Prefix := projectID
 	if len(prefix) != 0 {
 		s3Prefix = prefix + "/" + s3Prefix
@@ -2194,239 +2217,7 @@ func (a *AWS) parseSpotData(bucket string, prefix string, projectID string, regi
 }
 
 func (a *AWS) ApplyReservedInstancePricing(nodes map[string]*Node) {
-	/*
-		numReserved := len(a.ReservedInstances)
-
-		// Early return if no reserved instance data loaded
-		if numReserved == 0 {
-			klog.V(4).Infof("[Reserved] No Reserved Instances")
-			return
-		}
-
-		cfg, err := a.GetConfig()
-		defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
-		if err != nil {
-			klog.V(3).Infof("Could not parse default cpu price")
-			defaultCPU = 0.031611
-		}
-
-		defaultRAM, err := strconv.ParseFloat(cfg.RAM, 64)
-		if err != nil {
-			klog.V(3).Infof("Could not parse default ram price")
-			defaultRAM = 0.004237
-		}
-
-		cpuToRAMRatio := defaultCPU / defaultRAM
-
-		now := time.Now()
-
-		instances := make(map[string][]*AWSReservedInstance)
-		for _, r := range a.ReservedInstances {
-			if now.Before(r.StartDate) || now.After(r.EndDate) {
-				klog.V(1).Infof("[Reserved] Skipped Reserved Instance due to dates")
-				continue
-			}
-
-			_, ok := instances[r.Region]
-			if !ok {
-				instances[r.Region] = []*AWSReservedInstance{r}
-			} else {
-				instances[r.Region] = append(instances[r.Region], r)
-			}
-		}
-
-		awsNodes := make(map[string]*v1.Node)
-		currentNodes := a.Clientset.GetAllNodes()
-
-		// Create a node name -> node map
-		for _, awsNode := range currentNodes {
-			awsNodes[awsNode.GetName()] = awsNode
-		}
-
-		// go through all provider nodes using k8s nodes for region
-		for nodeName, node := range nodes {
-			// Reset reserved allocation to prevent double allocation
-			node.Reserved = nil
-
-			kNode, ok := awsNodes[nodeName]
-			if !ok {
-				klog.V(1).Infof("[Reserved] Could not find K8s Node with name: %s", nodeName)
-				continue
-			}
-
-			nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
-			if !ok {
-				klog.V(1).Infof("[Reserved] Could not find node region")
-				continue
-			}
-
-			reservedInstances, ok := instances[nodeRegion]
-			if !ok {
-				klog.V(1).Infof("[Reserved] Could not find counters for region: %s", nodeRegion)
-				continue
-			}
-
-			// Determine the InstanceType of the node
-			instanceType, ok := kNode.Labels["beta.kubernetes.io/instance-type"]
-			if !ok {
-				continue
-			}
-
-			ramBytes, err := strconv.ParseFloat(node.RAMBytes, 64)
-			if err != nil {
-				continue
-			}
-			ramGB := ramBytes / 1024 / 1024 / 1024
-
-			cpu, err := strconv.ParseFloat(node.VCPU, 64)
-			if err != nil {
-				continue
-			}
-
-			ramMultiple := cpu*cpuToRAMRatio + ramGB
-
-			node.Reserved = &ReservedInstanceData{
-				ReservedCPU: 0,
-				ReservedRAM: 0,
-			}
-
-			for i, reservedInstance := range reservedInstances {
-				if reservedInstance.InstanceType == instanceType {
-					// Use < 0 to mark as ALL
-					node.Reserved.ReservedCPU = -1
-					node.Reserved.ReservedRAM = -1
-
-					// Set Costs based on CPU/RAM ratios
-					ramPrice := reservedInstance.PricePerHour / ramMultiple
-					node.Reserved.CPUCost = ramPrice * cpuToRAMRatio
-					node.Reserved.RAMCost = ramPrice
-
-					// Remove the reserve from the temporary slice to prevent
-					// being reallocated
-					instances[nodeRegion] = append(reservedInstances[:i], reservedInstances[i+1:]...)
-					break
-				}
-			}
-		}*/
-}
-
-type AWSReservedInstance struct {
-	Zone           string
-	Region         string
-	InstanceType   string
-	InstanceCount  int64
-	InstanceTenacy string
-	StartDate      time.Time
-	EndDate        time.Time
-	PricePerHour   float64
-}
-
-func (ari *AWSReservedInstance) String() string {
-	return fmt.Sprintf("[Zone: %s, Region: %s, Type: %s, Count: %d, Tenacy: %s, Start: %+v, End: %+v, Price: %f]", ari.Zone, ari.Region, ari.InstanceType, ari.InstanceCount, ari.InstanceTenacy, ari.StartDate, ari.EndDate, ari.PricePerHour)
-}
-
-func isReservedInstanceHourlyPrice(rc *ec2.RecurringCharge) bool {
-	return rc != nil && rc.Frequency != nil && *rc.Frequency == "Hourly"
-}
-
-func getReservedInstancePrice(ri *ec2.ReservedInstances) (float64, error) {
-	var pricePerHour float64
-	if len(ri.RecurringCharges) > 0 {
-		for _, rc := range ri.RecurringCharges {
-			if isReservedInstanceHourlyPrice(rc) {
-				pricePerHour = *rc.Amount
-				break
-			}
-		}
-	}
-
-	// If we're still unable to resolve hourly price, try fixed -> hourly
-	if pricePerHour == 0 {
-		if ri.Duration != nil && ri.FixedPrice != nil {
-			var durHours float64
-			durSeconds := float64(*ri.Duration)
-			fixedPrice := float64(*ri.FixedPrice)
-			if durSeconds != 0 && fixedPrice != 0 {
-				durHours = durSeconds / 60 / 60
-				pricePerHour = fixedPrice / durHours
-			}
-		}
-	}
-
-	if pricePerHour == 0 {
-		return 0, fmt.Errorf("Failed to resolve an hourly price from FixedPrice or Recurring Costs")
-	}
-
-	return pricePerHour, nil
-}
-
-func getRegionReservedInstances(region string) ([]*AWSReservedInstance, error) {
-	c := &aws.Config{
-		Region: aws.String(region),
-	}
-	s := session.Must(session.NewSession(c))
-	svc := ec2.New(s)
-
-	response, err := svc.DescribeReservedInstances(&ec2.DescribeReservedInstancesInput{})
-	if err != nil {
-		return nil, err
-	}
-
-	var reservedInstances []*AWSReservedInstance
-	for _, ri := range response.ReservedInstances {
-		var zone string
-		if ri.AvailabilityZone != nil {
-			zone = *ri.AvailabilityZone
-		}
-		pricePerHour, err := getReservedInstancePrice(ri)
-		if err != nil {
-			klog.V(1).Infof("Error Resolving Price: %s", err.Error())
-			continue
-		}
-		reservedInstances = append(reservedInstances, &AWSReservedInstance{
-			Zone:           zone,
-			Region:         region,
-			InstanceType:   *ri.InstanceType,
-			InstanceCount:  *ri.InstanceCount,
-			InstanceTenacy: *ri.InstanceTenancy,
-			StartDate:      *ri.Start,
-			EndDate:        *ri.End,
-			PricePerHour:   pricePerHour,
-		})
-	}
-
-	return reservedInstances, nil
-}
-
-func (a *AWS) getReservedInstances() ([]*AWSReservedInstance, error) {
-	err := a.configureAWSAuth()
-	if err != nil {
-		return nil, fmt.Errorf("Error Configuring aws auth: %s", err.Error())
-	}
-
-	var reservedInstances []*AWSReservedInstance
-
-	nodes := a.Clientset.GetAllNodes()
-	regionsSeen := make(map[string]bool)
-	for _, node := range nodes {
-		region, ok := node.Labels[v1.LabelZoneRegion]
-		if !ok {
-			continue
-		}
-		if regionsSeen[region] {
-			continue
-		}
-
-		ris, err := getRegionReservedInstances(region)
-		if err != nil {
-			klog.V(3).Infof("Error getting reserved instances: %s", err.Error())
-			continue
-		}
-		regionsSeen[region] = true
-		reservedInstances = append(reservedInstances, ris...)
-	}
 
-	return reservedInstances, nil
 }
 
 func (a *AWS) ServiceAccountStatus() *ServiceAccountStatus {

+ 9 - 3
pkg/cloud/azureprovider.go

@@ -193,8 +193,9 @@ type azureKey struct {
 }
 
 func (k *azureKey) Features() string {
-	region := strings.ToLower(k.Labels[v1.LabelZoneRegion])
-	instance := k.Labels[v1.LabelInstanceType]
+	r, _ := util.GetRegion(k.Labels)
+	region := strings.ToLower(r)
+	instance, _ := util.GetInstanceType(k.Labels)
 	usageType := "ondemand"
 	return fmt.Sprintf("%s,%s,%s", region, instance, usageType)
 }
@@ -711,7 +712,7 @@ func (key *azurePvKey) Features() string {
 			storageClass = AzureFileStandardStorageClass
 		}
 	}
-	if region, ok := key.Labels[v1.LabelZoneRegion]; ok {
+	if region, ok := util.GetRegion(key.Labels); ok {
 		return region + "," + storageClass
 	}
 
@@ -835,6 +836,11 @@ func (az *Azure) ServiceAccountStatus() *ServiceAccountStatus {
 		Checks: []*ServiceAccountCheck{},
 	}
 }
+
+func (az *Azure) PricingSourceStatus() map[string]*PricingSource {
+	return make(map[string]*PricingSource)
+}
+
 func (*Azure) ClusterManagementPricing() (string, float64, error) {
 	return "", 0.0, nil
 }

+ 4 - 0
pkg/cloud/customprovider.go

@@ -304,6 +304,10 @@ func (cp *CustomProvider) ServiceAccountStatus() *ServiceAccountStatus {
 	}
 }
 
+func (cp *CustomProvider) PricingSourceStatus() map[string]*PricingSource {
+	return make(map[string]*PricingSource)
+}
+
 func (cp *CustomProvider) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	return 1.0 - ((1.0 - defaultDiscount) * (1.0 - negotiatedDiscount))
 }

+ 11 - 4
pkg/cloud/gcpprovider.go

@@ -1168,7 +1168,7 @@ func (gcp *GCP) ApplyReservedInstancePricing(nodes map[string]*Node) {
 			continue
 		}
 
-		nodeRegion, ok := kNode.Labels[v1.LabelZoneRegion]
+		nodeRegion, ok := util.GetRegion(kNode.Labels)
 		if !ok {
 			klog.V(4).Infof("[Reserved] Could not find node region")
 			continue
@@ -1322,7 +1322,8 @@ func (key *pvKey) Features() string {
 	} else if storageClass == "pd-standard" {
 		storageClass = "pdstandard"
 	}
-	return key.Labels[v1.LabelZoneRegion] + "," + storageClass
+	region, _ := util.GetRegion(key.Labels)
+	return region + "," + storageClass
 }
 
 type gcpKey struct {
@@ -1355,7 +1356,8 @@ func (gcp *gcpKey) GPUType() string {
 
 // GetKey maps node labels to information needed to retrieve pricing data
 func (gcp *gcpKey) Features() string {
-	instanceType := strings.ToLower(strings.Join(strings.Split(gcp.Labels[v1.LabelInstanceType], "-")[:2], ""))
+	it, _ := util.GetInstanceType(gcp.Labels)
+	instanceType := strings.ToLower(strings.Join(strings.Split(it, "-")[:2], ""))
 	if instanceType == "n1highmem" || instanceType == "n1highcpu" {
 		instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
 	} else if instanceType == "n2highmem" || instanceType == "n2highcpu" {
@@ -1365,7 +1367,8 @@ func (gcp *gcpKey) Features() string {
 	} else if strings.HasPrefix(instanceType, "custom") {
 		instanceType = "custom" // The suffix of custom does not matter
 	}
-	region := strings.ToLower(gcp.Labels[v1.LabelZoneRegion])
+	r, _ := util.GetRegion(gcp.Labels)
+	region := strings.ToLower(r)
 	var usageType string
 
 	if t, ok := gcp.Labels["cloud.google.com/gke-preemptible"]; ok && t == "true" {
@@ -1429,6 +1432,10 @@ func (gcp *GCP) ServiceAccountStatus() *ServiceAccountStatus {
 	}
 }
 
+func (gcp *GCP) PricingSourceStatus() map[string]*PricingSource {
+	return make(map[string]*PricingSource)
+}
+
 func (gcp *GCP) CombinedDiscountForNode(instanceType string, isPreemptible bool, defaultDiscount, negotiatedDiscount float64) float64 {
 	class := strings.Split(instanceType, "-")[0]
 	return 1.0 - ((1.0 - sustainedUseDiscount(class, defaultDiscount, isPreemptible)) * (1.0 - negotiatedDiscount))

+ 11 - 0
pkg/cloud/provider.go

@@ -185,6 +185,16 @@ type ServiceAccountCheck struct {
 	AdditionalInfo string `json:additionalInfo`
 }
 
+type PricingSources struct {
+	PricingSources map[string]*PricingSource
+}
+
+type PricingSource struct {
+	Name      string `json:"name"`
+	Available bool   `json:"available"`
+	Error     string `json:"error"`
+}
+
 // Provider represents a k8s provider.
 type Provider interface {
 	ClusterInfo() (map[string]string, error)
@@ -206,6 +216,7 @@ type Provider interface {
 	ExternalAllocations(string, string, []string, string, string, bool) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ServiceAccountStatus() *ServiceAccountStatus
+	PricingSourceStatus() map[string]*PricingSource
 	ClusterManagementPricing() (string, float64, error)
 	CombinedDiscountForNode(string, bool, float64, float64) float64
 	ParseID(string) string

+ 29 - 15
pkg/costmodel/cluster.go

@@ -417,7 +417,9 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	// [$/hr] * [min/res]*[hr/min] = [$/res]
 	hourlyToCumulative := float64(minsPerResolution) * (1.0 / 60.0)
 
-	ctx := prom.NewContext(client)
+	requiredCtx := prom.NewContext(client)
+	optionalCtx := prom.NewContext(client)
+
 	queryNodeCPUCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node) * on(node, cluster_id) group_right avg(node_cpu_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
 	queryNodeCPUCores := fmt.Sprintf(`avg_over_time(avg(kube_node_status_capacity_cpu_cores) by (cluster_id, node)[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeRAMCost := fmt.Sprintf(`sum_over_time((avg(kube_node_status_capacity_memory_bytes) by (cluster_id, node) * on(cluster_id, node) group_right avg(node_ram_hourly_cost) by (cluster_id, node, instance_type, provider_id))[%s:%dm]%s) / 1024 / 1024 / 1024 * %f`, durationStr, minsPerResolution, offsetStr, hourlyToCumulative)
@@ -425,20 +427,23 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	queryNodeGPUCost := fmt.Sprintf(`sum_over_time((avg(node_gpu_hourly_cost * %d.0 / 60.0) by (cluster_id, node, provider_id))[%s:%dm]%s)`, minsPerResolution, durationStr, minsPerResolution, offsetStr)
 	queryNodeLabels := fmt.Sprintf(`avg_over_time(kubecost_node_is_spot[%s:%dm]%s)`, durationStr, minsPerResolution, offsetStr)
 	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm]%s)) by (kubernetes_node, cluster_id, mode)`, durationStr, minsPerResolution, offsetStr)
-	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
-	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / sum(sum_over_time(label_replace(kube_node_status_capacity_memory_bytes, "instance", "$1", "node", "(.*)")[%s:%dm]%s)) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMSystemPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
+	queryNodeRAMUserPct := fmt.Sprintf(`sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace!="kube-system"}[%s:%dm]%s)) by (instance, cluster_id) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[%s:%dm]%s)) by (node, cluster_id), "instance", "$1", "node", "(.*)")) by (instance, cluster_id)`, durationStr, minsPerResolution, offsetStr, durationStr, minsPerResolution, offsetStr)
 	queryActiveMins := fmt.Sprintf(`node_total_hourly_cost[%s:%dm]%s`, durationStr, minsPerResolution, offsetStr)
 
-	resChNodeCPUCost := ctx.Query(queryNodeCPUCost)
-	resChNodeCPUCores := ctx.Query(queryNodeCPUCores)
-	resChNodeRAMCost := ctx.Query(queryNodeRAMCost)
-	resChNodeRAMBytes := ctx.Query(queryNodeRAMBytes)
-	resChNodeGPUCost := ctx.Query(queryNodeGPUCost)
-	resChNodeLabels := ctx.Query(queryNodeLabels)
-	resChNodeCPUModeTotal := ctx.Query(queryNodeCPUModeTotal)
-	resChNodeRAMSystemPct := ctx.Query(queryNodeRAMSystemPct)
-	resChNodeRAMUserPct := ctx.Query(queryNodeRAMUserPct)
-	resChActiveMins := ctx.Query(queryActiveMins)
+	// Return errors if these fail
+	resChNodeCPUCost := requiredCtx.Query(queryNodeCPUCost)
+	resChNodeCPUCores := requiredCtx.Query(queryNodeCPUCores)
+	resChNodeRAMCost := requiredCtx.Query(queryNodeRAMCost)
+	resChNodeRAMBytes := requiredCtx.Query(queryNodeRAMBytes)
+	resChNodeGPUCost := requiredCtx.Query(queryNodeGPUCost)
+	resChNodeLabels := requiredCtx.Query(queryNodeLabels)
+	resChActiveMins := requiredCtx.Query(queryActiveMins)
+
+	// Do not return errors if these fail, but log warnings
+	resChNodeCPUModeTotal := optionalCtx.Query(queryNodeCPUModeTotal)
+	resChNodeRAMSystemPct := optionalCtx.Query(queryNodeRAMSystemPct)
+	resChNodeRAMUserPct := optionalCtx.Query(queryNodeRAMUserPct)
 
 	resNodeCPUCost, _ := resChNodeCPUCost.Await()
 	resNodeCPUCores, _ := resChNodeCPUCores.Await()
@@ -450,8 +455,17 @@ func ClusterNodes(cp cloud.Provider, client prometheus.Client, duration, offset
 	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
 	resNodeRAMUserPct, _ := resChNodeRAMUserPct.Await()
 	resActiveMins, _ := resChActiveMins.Await()
-	if ctx.ErrorCollector.IsError() {
-		return nil, ctx.Errors()
+
+	if optionalCtx.ErrorCollector.IsError() {
+		for _, err := range optionalCtx.Errors() {
+			log.Warningf("ClusterNodes: %s", err)
+		}
+	}
+	if requiredCtx.ErrorCollector.IsError() {
+		for _, err := range requiredCtx.Errors() {
+			log.Errorf("ClusterNodes: %s", err)
+		}
+		return nil, requiredCtx.Errors()
 	}
 
 	nodeMap := map[string]*Node{}

+ 21 - 58
pkg/costmodel/costmodel.go

@@ -181,36 +181,31 @@ const (
 	sum(kube_persistentvolumeclaim_resource_requests_storage_bytes) by (persistentvolumeclaim, namespace, cluster_id, kubernetes_name)) by (persistentvolumeclaim, storageclass, namespace, volumename, cluster_id)`
 	// queryRAMAllocationByteHours yields the total byte-hour RAM allocation over the given
 	// window, aggregated by container.
-	//  [line 3]     sum_over_time(each byte*min in window) / (min/hr kubecost up) = [byte*hour] by metric, adjusted for kubecost downtime
-	//  [lines 2,4]  sum(") by unique container key = [byte*hour] by container
+	//  [line 3]  sum_over_time(each byte) = [byte*scrape] by metric
+	//  [line 4] (scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by container
+	//  [lines 2,4]  sum(") by unique container key and multiply [byte*scrape] * [hours/scrape] for byte*hours
 	//  [lines 1,5]  relabeling
 	queryRAMAllocationByteHours = `
 		label_replace(label_replace(
 			sum(
-				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s:1m]) / %f 
-			) by (namespace,container,pod,node,cluster_id)
+				sum_over_time(container_memory_allocation_bytes{container!="",container!="POD", node!=""}[%s])
+			) by (namespace,container,pod,node,cluster_id) * (scalar(avg(prometheus_target_interval_length_seconds)) / 60 / 60)
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryCPUAllocationVCPUHours yields the total VCPU-hour CPU allocation over the given
 	// window, aggregated by container.
-	//  [line 3]     sum_over_time(each VCPU*mins in window) / (min/hr kubecost up) = [VCPU*hour] by metric, adjusted for kubecost downtime
-	//  [lines 2,4]  sum(") by unique container key = [VCPU*hour] by container
+	//  [line 3] sum_over_time(each VCPU*mins in window) = [VCPU*scrape] by metric
+	//  [line 4] (scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by container
+	//  [lines 2,4]  sum(") by unique container key and multiply [VCPU*scrape] * [hours/scrape] for VCPU*hours
 	//  [lines 1,5]  relabeling
 	queryCPUAllocationVCPUHours = `
 		label_replace(label_replace(
 			sum(
-				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s:1m]) / %f
-			) by (namespace,container,pod,node,cluster_id)
+				sum_over_time(container_cpu_allocation{container!="",container!="POD", node!=""}[%s])
+			) by (namespace,container,pod,node,cluster_id) * (scalar(avg(prometheus_target_interval_length_seconds)) / 60 / 60)
 		, "container_name","$1","container","(.+)"), "pod_name","$1","pod","(.+)")`
 	// queryPVCAllocationFmt yields the total byte-hour PVC allocation over the given window.
-	//  sum(all VCPU measurements within given window) = [byte*min] by metric
-	//  (") / 60 = [byte*hour] by metric, assuming no missed scrapes
-	//  (") * (normalization factor) = [byte*hour] by metric, normalized for missed scrapes
-	//  sum(") by unique pvc = [VCPU*hour] by (cluster, namespace, pod, pv, pvc)
-	// Note: normalization factor is 1.0 if no scrapes are missed and has an upper bound determined by minExpectedScrapeRate
-	// so that coarse resolutions don't push normalization factors too high; e.g. 24h resolution with 1h of data would make
-	// for a normalization factor of 24. With a minimumExpectedScrapeRate of 0.95, that caps the norm factor at
-	queryPVCAllocationFmt = `sum(sum_over_time(pod_pvc_allocation[%s:1m])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) / 60
-		* 60 / clamp_min(count_over_time(sum(pod_pvc_allocation) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim)[%s:1m])/%f, 60 * %f)`
+	// sum_over_time(each byte) = [byte*scrape] by metric *(scalar(avg(prometheus_target_interval_length_seconds)) = [seconds/scrape] / 60 / 60 =  [hours/scrape] by pod
+	queryPVCAllocationFmt     = `sum(sum_over_time(pod_pvc_allocation[%s])) by (cluster_id, namespace, pod, persistentvolume, persistentvolumeclaim) * scalar(avg(prometheus_target_interval_length_seconds)/60/60)`
 	queryPVHourlyCostFmt      = `avg_over_time(pv_hourly_cost[%s])`
 	queryNSLabels             = `avg_over_time(kube_namespace_labels[%s])`
 	queryPodLabels            = `avg_over_time(kube_pod_labels[%s])`
@@ -223,7 +218,6 @@ const (
 	queryRegionNetworkUsage   = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="false", sameZone="false", sameRegion="false"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	queryInternetNetworkUsage = `sum(increase(kubecost_pod_network_egress_bytes_total{internet="true"}[%s] %s)) by (namespace,pod_name,cluster_id) / 1024 / 1024 / 1024`
 	normalizationStr          = `max(count_over_time(kube_pod_container_resource_requests_memory_bytes{}[%s] %s))`
-	kubecostUpMinsPerHourStr  = `max(count_over_time(node_cpu_hourly_cost[%s:1m])) / %f`
 )
 
 func (cm *CostModel) ComputeCostData(cli prometheusClient.Client, clientset kubernetes.Interface, cp costAnalyzerCloud.Provider, window string, offset string, filterNamespace string) (map[string]*CostData, error) {
@@ -796,7 +790,7 @@ func addPVData(cache clustercache.ClusterCache, pvClaimMapping map[string]*Persi
 	var defaultRegion string
 	nodeList := cache.GetAllNodes()
 	if len(nodeList) > 0 {
-		defaultRegion = nodeList[0].Labels[v1.LabelZoneRegion]
+		defaultRegion, _ = util.GetRegion(nodeList[0].Labels)
 	}
 
 	storageClasses := cache.GetAllStorageClasses()
@@ -818,7 +812,7 @@ func addPVData(cache clustercache.ClusterCache, pvClaimMapping map[string]*Persi
 			klog.V(4).Infof("Unable to find parameters for storage class \"%s\". Does pv \"%s\" have a storageClassName?", pv.Spec.StorageClassName, pv.Name)
 		}
 		var region string
-		if r, ok := pv.Labels[v1.LabelZoneRegion]; ok {
+		if r, ok := util.GetRegion(pv.Labels); ok {
 			region = r
 		} else {
 			region = defaultRegion
@@ -899,10 +893,12 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 		}
 		newCnode := *cnode
 		if newCnode.InstanceType == "" {
-			newCnode.InstanceType = n.Labels[v1.LabelInstanceType]
+			it, _ := util.GetInstanceType(n.Labels)
+			newCnode.InstanceType = it
 		}
 		if newCnode.Region == "" {
-			newCnode.Region = n.Labels[v1.LabelZoneRegion]
+			region, _ := util.GetRegion(n.Labels)
+			newCnode.Region = region
 		}
 		newCnode.ProviderID = n.Spec.ProviderID
 
@@ -1503,48 +1499,15 @@ func (cm *CostModel) costDataRange(cli prometheusClient.Client, clientset kubern
 
 	ctx := prom.NewContext(cli)
 
-	// Query for the average number of minutes per hour that Kubecost was up
-	// in the given range by averaging the number of up minutes-per-hour for
-	// each window in the range. Use that number in the RAM and CPU allocation
-	// queries as the adjutsment factor, scaling only if Kubecost was down
-	// for fewer than 3 minutes (as a heuristic for a reasonable amount of
-	// time to interpolate). Otherwise, use 60 minutes per hour and assume
-	// that this period of time is during Kubecost start-up or a long-term
-	// downtime for which we don't want to interpolate.
-	queryKubecostUpMinsPerHour := fmt.Sprintf(kubecostUpMinsPerHourStr, windowString, window.Hours())
-	resKubecostUp, err := ctx.QueryRangeSync(queryKubecostUpMinsPerHour, start, end, window)
-	if err != nil {
-		log.Errorf("costDataRange: error querying Kubecost up: %s", err)
-		return nil, err
-	}
-
-	kubecostMinsPerHour := 0.0
-	num := 0
-	if len(resKubecostUp) > 0 {
-		for _, val := range resKubecostUp[0].Values {
-			kubecostMinsPerHour += val.Value
-			num++
-		}
-		kubecostMinsPerHour /= float64(num)
-	}
-	if kubecostMinsPerHour <= 57.0 {
-		kubecostMinsPerHour = 60.0
-	}
-
-	// TODO niko/queryfix rewrite PVCAllocation query too, and remove this
-	// Use a heuristic to tell the difference between missed scrapes and an incomplete window
-	// of data due to fresh install, etc.
-	minimumExpectedScrapeRate := 0.95
-
-	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString, kubecostMinsPerHour)
-	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString, kubecostMinsPerHour)
+	queryRAMAlloc := fmt.Sprintf(queryRAMAllocationByteHours, windowString)
+	queryCPUAlloc := fmt.Sprintf(queryCPUAllocationVCPUHours, windowString)
 	queryRAMRequests := fmt.Sprintf(queryRAMRequestsStr, windowString, "", windowString, "")
 	queryRAMUsage := fmt.Sprintf(queryRAMUsageStr, windowString, "", windowString, "")
 	queryCPURequests := fmt.Sprintf(queryCPURequestsStr, windowString, "", windowString, "")
 	queryCPUUsage := fmt.Sprintf(queryCPUUsageStr, windowString, "")
 	queryGPURequests := fmt.Sprintf(queryGPURequestsStr, windowString, "", windowString, "", resolutionHours, windowString, "")
 	queryPVRequests := fmt.Sprintf(queryPVRequestsStr)
-	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString, windowString, resolutionHours, minimumExpectedScrapeRate)
+	queryPVCAllocation := fmt.Sprintf(queryPVCAllocationFmt, windowString)
 	queryPVHourlyCost := fmt.Sprintf(queryPVHourlyCostFmt, windowString)
 	queryNetZoneRequests := fmt.Sprintf(queryZoneNetworkUsage, windowString, "")
 	queryNetRegionRequests := fmt.Sprintf(queryRegionNetworkUsage, windowString, "")

+ 8 - 0
pkg/costmodel/router.go

@@ -637,6 +637,13 @@ func (p *Accesses) GetServiceAccountStatus(w http.ResponseWriter, _ *http.Reques
 	w.Write(WrapData(A.Cloud.ServiceAccountStatus(), nil))
 }
 
+func (p *Accesses) GetPricingSourceStatus(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+
+	w.Write(WrapData(A.Cloud.PricingSourceStatus(), nil))
+}
+
 func (p *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -1020,6 +1027,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) {
 	Router.GET("/clusterInfo", A.ClusterInfo)
 	Router.GET("/clusterInfoMap", A.GetClusterInfoMap)
 	Router.GET("/serviceAccountStatus", A.GetServiceAccountStatus)
+	Router.GET("/pricingSourceStatus", A.GetPricingSourceStatus)
 
 	// cluster manager endpoints
 	Router.GET("/clusters", managerEndpoints.GetAllClusters)

+ 35 - 0
pkg/util/compat.go

@@ -0,0 +1,35 @@
+package util
+
+import (
+	v1 "k8s.io/api/core/v1"
+)
+
+func GetRegion(labels map[string]string) (string, bool) {
+	if _, ok := labels[v1.LabelZoneRegion]; ok {
+		return labels[v1.LabelZoneRegion], true
+	} else if _, ok := labels["topology.kubernetes.io/region"]; ok { // Label as of 1.17
+		return labels["topology.kubernetes.io/region"], true
+	} else {
+		return "", false
+	}
+}
+
+func GetInstanceType(labels map[string]string) (string, bool) {
+	if _, ok := labels[v1.LabelInstanceType]; ok {
+		return labels[v1.LabelInstanceType], true
+	} else if _, ok := labels["node.kubernetes.io/instance-type"]; ok {
+		return labels["node.kubernetes.io/instance-type"], true
+	} else {
+		return "", false
+	}
+}
+
+func GetOperatingSystem(labels map[string]string) (string, bool) {
+	if _, ok := labels[v1.LabelOSStable]; ok {
+		return labels[v1.LabelOSStable], true
+	} else if _, ok := labels["beta.kubernetes.io/os"]; ok {
+		return labels["beta.kubernetes.io/os"], true
+	} else {
+		return "", false
+	}
+}

+ 37 - 0
pkg/util/mapper/mapper.go

@@ -2,6 +2,7 @@ package mapper
 
 import (
 	"strconv"
+	"strings"
 )
 
 //--------------------------------------------------------------------------
@@ -83,6 +84,11 @@ type PrimitiveMapReader interface {
 	// GetBool parses a bool from the map key parameter. If the value
 	// is empty or fails to parse, the defaultValue parameter is returned.
 	GetBool(key string, defaultValue bool) bool
+
+	// GetList returns a string list which contains the value set by key split using the
+	// provided delimiter with each entry trimmed of space. If the value doesn't exist,
+	// nil is returned
+	GetList(key string, delimiter string) []string
 }
 
 // PrimitiveMapWriter is an implementation contract for an object capable
@@ -123,6 +129,10 @@ type PrimitiveMapWriter interface {
 
 	// SetBool sets the map to a string formatted bool value.
 	SetBool(key string, value bool) error
+
+	// SetList sets the map's value at key to a string consistent of each value in the list separated
+	// by the provided delimiter.
+	SetList(key string, values []string, delimiter string) error
 }
 
 // PrimitiveMap is capable of reading and writing primitive values
@@ -373,6 +383,27 @@ func (rom *readOnlyMapper) GetBool(key string, defaultValue bool) bool {
 	return b
 }
 
+// GetList returns a string list which contains the value set by key split using the
+// provided delimiter with each entry trimmed of space. If the value doesn't exist,
+// nil is returned
+func (rom *readOnlyMapper) GetList(key string, delimiter string) []string {
+	value := rom.Get(key, "")
+	if value == "" {
+		return nil
+	}
+
+	split := strings.Split(value, delimiter)
+
+	// reuse slice created for split
+	result := split[:0]
+	for _, v := range split {
+		if trimmed := strings.TrimSpace(v); trimmed != "" {
+			result = append(result, trimmed)
+		}
+	}
+	return result
+}
+
 // Set sets the map for the key provided using the value provided.
 func (wom *writeOnlyMapper) Set(key string, value string) error {
 	return wom.setter.Set(key, value)
@@ -432,3 +463,9 @@ func (wom *writeOnlyMapper) SetUInt64(key string, value uint64) error {
 func (wom *writeOnlyMapper) SetBool(key string, value bool) error {
 	return wom.setter.Set(key, strconv.FormatBool(value))
 }
+
+// SetList sets the map's value at key to a string consistent of each value in the list separated
+// by the provided delimiter.
+func (wom *writeOnlyMapper) SetList(key string, values []string, delimiter string) error {
+	return wom.setter.Set(key, strings.Join(values, delimiter))
+}