Browse Source

Merge pull request #43 from kubecost/AjayTripathy-spot-feed-integration

first pass of spot feed integration
Ajay Tripathy 7 năm trước cách đây
mục cha
commit
6cf1f50a84
7 tập tin đã thay đổi với 392 bổ sung98 xóa
  1. 7 1
      cloud/aws.json
  2. 310 70
      cloud/awsprovider.go
  3. 23 10
      cloud/gcpprovider.go
  4. 48 17
      cloud/provider.go
  5. 1 0
      costmodel/costmodel.go
  6. 1 0
      go.mod
  7. 2 0
      go.sum

+ 7 - 1
cloud/aws.json

@@ -6,5 +6,11 @@
     "RAM": "0.004237",
     "spotRAM": "0.000892",
     "spotLabel": "kops.k8s.io/instancegroup",
-    "spotLabelValue": "spotinstance-nodes"
+    "spotLabelValue": "spotinstance-nodes",
+    "awsServiceKeyName": "AKIAXW6UVLRRTBCUKQFP",
+    "awsServiceKeySecret": "",
+    "awsSpotDataRegion":"us-east-2",
+    "awsSpotDataBucket": "kc-test-spot",
+    "awsSpotDataPrefix": "spotdata",
+    "awsProjectID": "530337586275"
 }

+ 310 - 70
cloud/awsprovider.go

@@ -1,6 +1,9 @@
 package cloud
 
 import (
+	"bytes"
+	"compress/gzip"
+	"encoding/csv"
 	"encoding/json"
 	"fmt"
 	"io"
@@ -16,24 +19,39 @@ import (
 
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/awserr"
-	"github.com/aws/aws-sdk-go/aws/credentials"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/athena"
 	"github.com/aws/aws-sdk-go/service/ec2"
+	"github.com/aws/aws-sdk-go/service/s3"
+	"github.com/aws/aws-sdk-go/service/s3/s3manager"
+	"github.com/jszwec/csvutil"
+
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/client-go/kubernetes"
 )
 
+const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
+const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
+const supportedSpotFeedVersion = "1"
+
 // AWS represents an Amazon Provider
 type AWS struct {
-	Pricing          map[string]*AWSProductTerms
-	ValidPricingKeys map[string]bool
-	Clientset        *kubernetes.Clientset
-	BaseCPUPrice     string
-	BaseSpotCPUPrice string
-	BaseSpotRAMPrice string
-	SpotLabelName    string
-	SpotLabelValue   string
+	Pricing                 map[string]*AWSProductTerms
+	SpotPricingByInstanceID map[string]*spotInfo
+	ValidPricingKeys        map[string]bool
+	Clientset               *kubernetes.Clientset
+	BaseCPUPrice            string
+	BaseSpotCPUPrice        string
+	BaseSpotRAMPrice        string
+	SpotLabelName           string
+	SpotLabelValue          string
+	ServiceKeyName          string
+	ServiceKeySecret        string
+	SpotDataRegion          string
+	SpotDataBucket          string
+	SpotDataPrefix          string
+	ProjectID               string
 }
 
 // AWSPricing maps a k8s node to an AWS Pricing "product"
@@ -137,22 +155,56 @@ func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem strin
 	return region + "," + instanceType + "," + operatingSystem
 }
 
-// GetKey maps node labels to information needed to retrieve pricing data
-func (aws *AWS) GetKey(labels map[string]string) string {
-	instanceType := labels["beta.kubernetes.io/instance-type"]
-	operatingSystem := labels["beta.kubernetes.io/os"]
-	region := labels["failure-domain.beta.kubernetes.io/region"]
-	if l, ok := labels["lifecycle"]; ok && l == "EC2Spot" {
-		usageType := "preemptible"
-		return region + "," + instanceType + "," + operatingSystem + "," + usageType
-	}
-	if l, ok := labels[aws.SpotLabelName]; ok && l == aws.SpotLabelValue {
-		usageType := "preemptible"
-		return region + "," + instanceType + "," + operatingSystem + "," + usageType
+type awsKey struct {
+	SpotLabelName  string
+	SpotLabelValue string
+	Labels         map[string]string
+	ProviderID     string
+}
+
+func (k *awsKey) ID() string {
+	provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
+	for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
+		if matchNum == 2 {
+			return group
+		}
+	}
+	klog.V(3).Info("Could not find instance ID in " + k.ProviderID)
+	return ""
+}
+
+func (k *awsKey) Features() string {
+
+	instanceType := k.Labels[v1.LabelInstanceType]
+	var operatingSystem string
+	operatingSystem, ok := k.Labels[v1.LabelOSStable]
+	if !ok {
+		operatingSystem = k.Labels["beta.kubernetes.io/os"]
+	}
+	region := k.Labels[v1.LabelZoneRegion]
+
+	key := region + "," + instanceType + "," + operatingSystem
+	usageType := "preemptible"
+	spotKey := key + "," + usageType
+	if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
+		return spotKey
+	}
+	if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
+		return spotKey
 	}
 	return region + "," + instanceType + "," + operatingSystem
 }
 
+// GetKey maps node labels to information needed to retrieve pricing data
+func (aws *AWS) GetKey(labels map[string]string) Key {
+	return &awsKey{
+		SpotLabelName:  aws.SpotLabelName,
+		SpotLabelValue: aws.SpotLabelValue,
+		Labels:         labels,
+		ProviderID:     labels["providerID"],
+	}
+}
+
 func (aws *AWS) isPreemptible(key string) bool {
 	s := strings.Split(key, ",")
 	if len(s) == 4 && s[3] == "preemptible" {
@@ -172,7 +224,7 @@ func (aws *AWS) DownloadPricingData() error {
 	for _, n := range nodeList.Items {
 		labels := n.GetObjectMeta().GetLabels()
 		key := aws.GetKey(labels)
-		inputkeys[key] = true
+		inputkeys[key.Features()] = true
 	}
 
 	aws.Pricing = make(map[string]*AWSProductTerms)
@@ -266,6 +318,20 @@ func (aws *AWS) DownloadPricingData() error {
 	aws.BaseSpotRAMPrice = c.SpotRAM
 	aws.SpotLabelName = c.SpotLabel
 	aws.SpotLabelValue = c.SpotLabelValue
+	aws.SpotDataBucket = c.SpotDataBucket
+	aws.SpotDataPrefix = c.SpotDataPrefix
+	aws.ProjectID = c.ProjectID
+	aws.SpotDataRegion = c.SpotDataRegion
+	aws.ServiceKeyName = c.ServiceKeyName
+	aws.ServiceKeySecret = c.ServiceKeySecret
+
+	sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
+	if err != nil {
+		klog.V(1).Infof("Error downloading spot data %s", err.Error())
+	} else {
+		aws.SpotPricingByInstanceID = sp
+	}
+
 	return nil
 }
 
@@ -274,35 +340,59 @@ func (aws *AWS) AllNodePricing() (interface{}, error) {
 	return aws.Pricing, nil
 }
 
-// NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
-func (aws *AWS) NodePricing(key string) (*Node, error) {
-	//return json.Marshal(aws.Pricing[key])
-	usageType := "ondemand"
+func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
+	key := k.Features()
 	if aws.isPreemptible(key) {
-		usageType = "preemptible"
-	}
-	terms, ok := aws.Pricing[key]
-	if ok {
-		if aws.isPreemptible(key) {
+		if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
+			var spotcost string
+			arr := strings.Split(spotInfo.Charge, " ")
+			if len(arr) == 2 {
+				spotcost = arr[0]
+			} else {
+				klog.V(2).Infof("Spot data for node %s is missing", k.ID())
+			}
 			return &Node{
+				Cost:         spotcost,
 				VCPU:         terms.VCpu,
-				VCPUCost:     aws.BaseSpotCPUPrice,
 				RAM:          terms.Memory,
-				RAMCost:      aws.BaseSpotRAMPrice,
 				Storage:      terms.Storage,
 				BaseCPUPrice: aws.BaseCPUPrice,
 				UsageType:    usageType,
 			}, nil
 		}
-		cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
 		return &Node{
-			Cost:         cost,
 			VCPU:         terms.VCpu,
+			VCPUCost:     aws.BaseSpotCPUPrice,
 			RAM:          terms.Memory,
+			RAMCost:      aws.BaseSpotRAMPrice,
 			Storage:      terms.Storage,
 			BaseCPUPrice: aws.BaseCPUPrice,
 			UsageType:    usageType,
 		}, nil
+	}
+	cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
+	return &Node{
+		Cost:         cost,
+		VCPU:         terms.VCpu,
+		RAM:          terms.Memory,
+		Storage:      terms.Storage,
+		BaseCPUPrice: aws.BaseCPUPrice,
+		UsageType:    usageType,
+	}, nil
+}
+
+// NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
+func (aws *AWS) NodePricing(k Key) (*Node, error) {
+	//return json.Marshal(aws.Pricing[key])
+	key := k.Features()
+	usageType := "ondemand"
+	if aws.isPreemptible(key) {
+		usageType = "preemptible"
+	}
+
+	terms, ok := aws.Pricing[key]
+	if ok {
+		return aws.createNode(terms, usageType, k)
 	} else if _, ok := aws.ValidPricingKeys[key]; ok {
 		err := aws.DownloadPricingData()
 		if err != nil {
@@ -312,26 +402,7 @@ func (aws *AWS) NodePricing(key string) (*Node, error) {
 		if !termsOk {
 			return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
 		}
-		if aws.isPreemptible(key) {
-			return &Node{
-				VCPU:         terms.VCpu,
-				VCPUCost:     aws.BaseSpotCPUPrice,
-				RAM:          terms.Memory,
-				RAMCost:      aws.BaseSpotRAMPrice,
-				Storage:      terms.Storage,
-				BaseCPUPrice: aws.BaseCPUPrice,
-				UsageType:    usageType,
-			}, nil
-		}
-		cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
-		return &Node{
-			Cost:         cost,
-			VCPU:         terms.VCpu,
-			RAM:          terms.Memory,
-			Storage:      terms.Storage,
-			BaseCPUPrice: aws.BaseCPUPrice,
-			UsageType:    usageType,
-		}, nil
+		return aws.createNode(terms, usageType, k)
 	} else {
 		return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
 	}
@@ -431,9 +502,18 @@ func (*AWS) GetDisks() ([]byte, error) {
 	if err == nil {
 		byteValue, _ := ioutil.ReadAll(jsonFile)
 		var result map[string]string
-		json.Unmarshal([]byte(byteValue), &result)
-		os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
-		os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
+		err := json.Unmarshal([]byte(byteValue), &result)
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
+		if err != nil {
+			return nil, err
+		}
 	} else if os.IsNotExist(err) {
 		klog.V(2).Infof("Using Default Credentials")
 	} else {
@@ -445,13 +525,18 @@ func (*AWS) GetDisks() ([]byte, error) {
 		return nil, err
 	}
 	defer clusterConfig.Close()
-	bytes, _ := ioutil.ReadAll(clusterConfig)
+	b, err := ioutil.ReadAll(clusterConfig)
+	if err != nil {
+		return nil, err
+	}
 	var clusterConf map[string]string
-	json.Unmarshal([]byte(bytes), &clusterConf)
+	err = json.Unmarshal([]byte(b), &clusterConf)
+	if err != nil {
+		return nil, err
+	}
 	region := aws.String(clusterConf["region"])
 	c := &aws.Config{
-		Region:      region,
-		Credentials: credentials.NewEnvCredentials(),
+		Region: region,
 	}
 	s := session.Must(session.NewSession(c))
 
@@ -480,8 +565,14 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 		byteValue, _ := ioutil.ReadAll(jsonFile)
 		var result map[string]string
 		json.Unmarshal([]byte(byteValue), &result)
-		os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
-		os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
+		err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
+		if err != nil {
+			return nil, err
+		}
 	} else if os.IsNotExist(err) {
 		klog.V(2).Infof("Using Default Credentials")
 	} else {
@@ -493,16 +584,18 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 		return nil, err
 	}
 	defer athenaConfigs.Close()
-	bytes, _ := ioutil.ReadAll(athenaConfigs)
+	b, err := ioutil.ReadAll(athenaConfigs)
+	if err != nil {
+		return nil, err
+	}
 	var athenaConf map[string]string
-	json.Unmarshal([]byte(bytes), &athenaConf)
+	json.Unmarshal([]byte(b), &athenaConf)
 	region := aws.String(athenaConf["region"])
 	resultsBucket := athenaConf["output"]
 	database := athenaConf["database"]
 
 	c := &aws.Config{
-		Region:      region,
-		Credentials: credentials.NewEnvCredentials(),
+		Region: region,
 	}
 	s := session.Must(session.NewSession(c))
 	svc := athena.New(s)
@@ -551,13 +644,160 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 		if err != nil {
 			return nil, err
 		}
-		bytes, err := json.Marshal(op.ResultSet)
+		b, err := json.Marshal(op.ResultSet)
 		if err != nil {
 			return nil, err
 		}
 
-		return bytes, nil
+		return b, nil
 	}
 	return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
 
 }
+
+type spotInfo struct {
+	Timestamp   string `csv:"Timestamp"`
+	UsageType   string `csv:"UsageType"`
+	Operation   string `csv:"Operation"`
+	InstanceID  string `csv:"InstanceID"`
+	MyBidID     string `csv:"MyBidID"`
+	MyMaxPrice  string `csv:"MyMaxPrice"`
+	MarketPrice string `csv:"MarketPrice"`
+	Charge      string `csv:"Charge"`
+	Version     string `csv:"Version"`
+}
+
+type fnames []*string
+
+func (f fnames) Len() int {
+	return len(f)
+}
+
+func (f fnames) Swap(i, j int) {
+	f[i], f[j] = f[j], f[i]
+}
+
+func (f fnames) Less(i, j int) bool {
+	key1 := strings.Split(*f[i], ".")
+	key2 := strings.Split(*f[j], ".")
+
+	t1, err := time.Parse("2006-01-02-15", key1[1])
+	if err != nil {
+		klog.V(1).Info("Unable to parse timestamp" + key1[1])
+		return false
+	}
+	t2, err := time.Parse("2006-01-02-15", key2[1])
+	if err != nil {
+		klog.V(1).Info("Unable to parse timestamp" + key2[1])
+		return false
+	}
+	return t1.Before(t2)
+}
+
+func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
+
+	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	c := aws.NewConfig().WithRegion(region)
+
+	s := session.Must(session.NewSession(c))
+	s3Svc := s3.New(s)
+	downloader := s3manager.NewDownloaderWithClient(s3Svc)
+
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
+	ls := &s3.ListObjectsInput{
+		Bucket: aws.String(bucket),
+		Prefix: aws.String(prefix + "/" + projectID + "." + tOneDayAgo.Format("2006-01-02")),
+	}
+	ls2 := &s3.ListObjectsInput{
+		Bucket: aws.String(bucket),
+		Prefix: aws.String(prefix + "/" + projectID + "." + tNow.Format("2006-01-02")),
+	}
+	lso, err := s3Svc.ListObjects(ls)
+	if err != nil {
+		return nil, err
+	}
+	klog.V(2).Infof("Found %s files from yesterday", string(len(lso.Contents)))
+	lso2, err := s3Svc.ListObjects(ls2)
+	if err != nil {
+		return nil, err
+	}
+	klog.V(2).Infof("Found %s files from today", string(len(lso.Contents)))
+
+	var keys []*string
+	for _, obj := range lso.Contents {
+		keys = append(keys, obj.Key)
+	}
+	for _, obj := range lso2.Contents {
+		keys = append(keys, obj.Key)
+	}
+
+	spots := make(map[string]*spotInfo)
+	for _, key := range keys {
+		getObj := &s3.GetObjectInput{
+			Bucket: aws.String(bucket),
+			Key:    key,
+		}
+
+		buf := aws.NewWriteAtBuffer([]byte{})
+		_, err := downloader.Download(buf, getObj)
+		if err != nil {
+			return nil, err
+		}
+
+		r := bytes.NewReader(buf.Bytes())
+
+		gr, err := gzip.NewReader(r)
+		if err != nil {
+			return nil, err
+		}
+
+		csvReader := csv.NewReader(gr)
+		csvReader.Comma = '\t'
+		header, err := csvutil.Header(spotInfo{}, "csv")
+		if err != nil {
+			return nil, err
+		}
+		dec, err := csvutil.NewDecoder(csvReader, header...)
+		if err != nil {
+			return nil, err
+		}
+
+		for {
+			if err := dec.Decode(&spotInfo{}); err == io.EOF {
+				break
+			}
+			st := dec.Record()
+			if len(st) == 9 { // it's tab separated but not quite a tsv
+				klog.V(3).Infof("Found spot info %+v", st)
+				spot := &spotInfo{
+					Timestamp:   st[0],
+					UsageType:   st[1],
+					Operation:   st[2],
+					InstanceID:  st[3],
+					MyBidID:     st[4],
+					MyMaxPrice:  st[5],
+					MarketPrice: st[6],
+					Charge:      st[7],
+					Version:     st[8],
+				}
+				if spot.Version != supportedSpotFeedVersion {
+					klog.V(1).Infof("Possibly unsupported spot data feed version " + spot.Version)
+				}
+				spots[spot.InstanceID] = spot
+			}
+		}
+		gr.Close()
+	}
+	return spots, nil
+}

+ 23 - 10
cloud/gcpprovider.go

@@ -17,6 +17,7 @@ import (
 	"golang.org/x/oauth2"
 	"golang.org/x/oauth2/google"
 	compute "google.golang.org/api/compute/v1"
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/client-go/kubernetes"
 )
@@ -159,7 +160,6 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]bool) (map[string]*G
 		if err == io.EOF {
 			break
 		}
-		//fmt.Printf("%v  \n", t)
 		if t == "skus" {
 			dec.Token() // [
 			for dec.More() {
@@ -176,7 +176,6 @@ func (gcp *GCP) parsePage(r io.Reader, inputKeys map[string]bool) (map[string]*G
 				if (instanceType == "ram" || instanceType == "cpu") && strings.Contains(strings.ToUpper(product.Description), "CUSTOM") {
 					instanceType = "custom"
 				}
-				// instance.toLowerCase() === “f1micro”
 				var partialCPU float64
 				if strings.ToLower(instanceType) == "f1micro" {
 					partialCPU = 0.2
@@ -303,7 +302,7 @@ func (gcp *GCP) DownloadPricingData() error {
 	for _, n := range nodeList.Items {
 		labels := n.GetObjectMeta().GetLabels()
 		key := gcp.GetKey(labels)
-		inputkeys[key] = true
+		inputkeys[key.Features()] = true
 	}
 
 	pages, err := gcp.parsePages(inputkeys)
@@ -321,18 +320,32 @@ func (gcp *GCP) DownloadPricingData() error {
 	return nil
 }
 
-// GetKey maps node labels to information needed to retrieve pricing data
-func (gcp *GCP) GetKey(labels map[string]string) string {
+type gcpKey struct {
+	Labels map[string]string
+}
+
+func (gcp *GCP) GetKey(labels map[string]string) Key {
+	return &gcpKey{
+		Labels: labels,
+	}
+}
 
-	instanceType := strings.ToLower(strings.Join(strings.Split(labels["beta.kubernetes.io/instance-type"], "-")[:2], ""))
+func (gcp *gcpKey) ID() string {
+	return ""
+}
+
+// GetKey maps node labels to information needed to retrieve pricing data
+func (gcp *gcpKey) Features() string {
+	instanceType := strings.ToLower(strings.Join(strings.Split(gcp.Labels[v1.LabelInstanceType], "-")[:2], ""))
 	if instanceType == "n1highmem" || instanceType == "n1highcpu" {
 		instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
 	} else if strings.HasPrefix(instanceType, "custom") {
 		instanceType = "custom" // The suffix of custom does not matter
 	}
-	region := strings.ToLower(labels["failure-domain.beta.kubernetes.io/region"])
+	region := strings.ToLower(gcp.Labels[v1.LabelZoneRegion])
 	var usageType string
-	if t, ok := labels["cloud.google.com/gke-preemptible"]; ok && t == "true" {
+
+	if t, ok := gcp.Labels["cloud.google.com/gke-preemptible"]; ok && t == "true" {
 		usageType = "preemptible"
 	} else {
 		usageType = "ondemand"
@@ -346,8 +359,8 @@ func (gcp *GCP) AllNodePricing() (interface{}, error) {
 }
 
 // NodePricing returns GCP pricing data for a single node
-func (gcp *GCP) NodePricing(key string) (*Node, error) {
-	if n, ok := gcp.Pricing[key]; ok {
+func (gcp *GCP) NodePricing(key Key) (*Node, error) {
+	if n, ok := gcp.Pricing[key.Features()]; ok {
 		klog.V(2).Infof("Returning pricing for node %s: %+v from SKU %s", key, n.Node, n.Name)
 		n.Node.BaseCPUPrice = gcp.BaseCPUPrice
 		return n.Node, nil

+ 48 - 17
cloud/provider.go

@@ -32,15 +32,21 @@ type Node struct {
 	UsageType        string `json:"usageType"`
 }
 
+// Key represents a way for nodes to match between the k8s API and a pricing API
+type Key interface {
+	ID() string       // ID represents an exact match
+	Features() string // Features are a comma separated string of node metadata that could match pricing
+}
+
 // Provider represents a k8s provider.
 type Provider interface {
 	ClusterName() ([]byte, error)
 	AddServiceKey(url.Values) error
 	GetDisks() ([]byte, error)
-	NodePricing(string) (*Node, error)
+	NodePricing(Key) (*Node, error)
 	AllNodePricing() (interface{}, error)
 	DownloadPricingData() error
-	GetKey(map[string]string) string
+	GetKey(map[string]string) Key
 
 	QuerySQL(string) ([]byte, error)
 }
@@ -65,14 +71,20 @@ func GetDefaultPricingData(fname string) (*CustomPricing, error) {
 }
 
 type CustomPricing struct {
-	Provider       string `json:"provider"`
-	Description    string `json:"description"`
-	CPU            string `json:"CPU"`
-	SpotCPU        string `json:"spotCPU"`
-	RAM            string `json:"RAM"`
-	SpotRAM        string `json:"spotRAM"`
-	SpotLabel      string `json:"spotLabel,omitempty"`
-	SpotLabelValue string `json:"spotLabelValue,omitempty"`
+	Provider         string `json:"provider"`
+	Description      string `json:"description"`
+	CPU              string `json:"CPU"`
+	SpotCPU          string `json:"spotCPU"`
+	RAM              string `json:"RAM"`
+	SpotRAM          string `json:"spotRAM"`
+	SpotLabel        string `json:"spotLabel,omitempty"`
+	SpotLabelValue   string `json:"spotLabelValue,omitempty"`
+	ServiceKeyName   string `json:"awsServiceKeyName,omitempty"`
+	ServiceKeySecret string `json:"awsServiceKeySecret,omitempty"`
+	SpotDataRegion   string `json:"awsSpotDataRegion,omitempty"`
+	SpotDataBucket   string `json:"awsSpotDataBucket,omitempty"`
+	SpotDataPrefix   string `json:"awsSpotDataPrefix,omitempty"`
+	ProjectID        string `json:"awsProjectID,omitempty"`
 }
 
 type NodePrice struct {
@@ -103,13 +115,14 @@ func (c *CustomProvider) AllNodePricing() (interface{}, error) {
 	return c.Pricing, nil
 }
 
-func (c *CustomProvider) NodePricing(key string) (*Node, error) {
-	if _, ok := c.Pricing[key]; !ok {
-		key = "default"
+func (c *CustomProvider) NodePricing(key Key) (*Node, error) {
+	k := key.Features()
+	if _, ok := c.Pricing[k]; !ok {
+		k = "default"
 	}
 	return &Node{
-		VCPUCost: c.Pricing[key].CPU,
-		RAMCost:  c.Pricing[key].RAM,
+		VCPUCost: c.Pricing[k].CPU,
+		RAMCost:  c.Pricing[k].RAM,
 	}, nil
 }
 
@@ -134,13 +147,31 @@ func (c *CustomProvider) DownloadPricingData() error {
 	return nil
 }
 
-func (c *CustomProvider) GetKey(labels map[string]string) string {
-	if labels[c.SpotLabel] != "" && labels[c.SpotLabel] == c.SpotLabelValue {
+type customProviderKey struct {
+	SpotLabel      string
+	SpotLabelValue string
+	Labels         map[string]string
+}
+
+func (c *customProviderKey) ID() string {
+	return ""
+}
+
+func (c *customProviderKey) Features() string {
+	if c.Labels[c.SpotLabel] != "" && c.Labels[c.SpotLabel] == c.SpotLabelValue {
 		return "default,spot"
 	}
 	return "default" // TODO: multiple custom pricing support.
 }
 
+func (c *CustomProvider) GetKey(labels map[string]string) Key {
+	return &customProviderKey{
+		SpotLabel:      c.SpotLabel,
+		SpotLabelValue: c.SpotLabelValue,
+		Labels:         labels,
+	}
+}
+
 func (*CustomProvider) QuerySQL(query string) ([]byte, error) {
 	return nil, nil
 }

+ 1 - 0
costmodel/costmodel.go

@@ -406,6 +406,7 @@ func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provide
 	for _, n := range nodeList.Items {
 		name := n.GetObjectMeta().GetName()
 		nodeLabels := n.GetObjectMeta().GetLabels()
+		nodeLabels["providerID"] = n.Spec.ProviderID
 		cnode, err := cloud.NodePricing(cloud.GetKey(nodeLabels))
 		if err != nil {
 			klog.V(1).Infof("Error getting node. Error: " + err.Error())

+ 1 - 0
go.mod

@@ -8,6 +8,7 @@ require (
 	cloud.google.com/go v0.34.0
 	github.com/aws/aws-sdk-go v1.19.10
 	github.com/golang/mock v1.2.0
+	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.2.0
 	github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
 	golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a

+ 2 - 0
go.sum

@@ -66,6 +66,8 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5i
 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
 github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be h1:AHimNtVIpiBjPUhEF5KNCkrUyqTSA5zWUl8sQ2bfGBE=
 github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/jszwec/csvutil v1.2.1 h1:9+vmGqMdYxIbeDmVbTrVryibx2izwHAfKdPwl4GPNHM=
+github.com/jszwec/csvutil v1.2.1/go.mod h1:8YHz6C3KVdIeCxLMvwbbIVDCTA/Wi2df93AZlQNaE2U=
 github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=