Просмотр исходного кода

first pass of spot feed integration

AjayTripathy 7 лет назад
Родитель
Сommit
2c5c5e3915
7 измененных файлов с 287 добавлено и 65 удалено
  1. 7 1
      cloud/aws.json
  2. 207 40
      cloud/awsprovider.go
  3. 21 7
      cloud/gcpprovider.go
  4. 48 17
      cloud/provider.go
  5. 1 0
      costmodel/costmodel.go
  6. 1 0
      go.mod
  7. 2 0
      go.sum

+ 7 - 1
cloud/aws.json

@@ -6,5 +6,11 @@
     "RAM": "0.004237",
     "spotRAM": "0.000892",
     "spotLabel": "kops.k8s.io/instancegroup",
-    "spotLabelValue": "spotinstance-nodes"
+    "spotLabelValue": "spotinstance-nodes",
+    "awsServiceKeyName": "AKIAXW6UVLRRTBCUKQFP",
+    "awsServiceKeySecret": "",
+    "awsSpotDataRegion":"us-east-2",
+    "awsSpotDataBucket": "kc-test-spot",
+    "awsSpotDataPrefix": "spotdata",
+    "awsProjectID": "530337586275"
 }

+ 207 - 40
cloud/awsprovider.go

@@ -1,10 +1,14 @@
 package cloud
 
 import (
+	"bytes"
+	"compress/gzip"
+	"encoding/csv"
 	"encoding/json"
 	"fmt"
 	"io"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"net/url"
 	"os"
@@ -20,6 +24,10 @@ import (
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/athena"
 	"github.com/aws/aws-sdk-go/service/ec2"
+	"github.com/aws/aws-sdk-go/service/s3"
+	"github.com/aws/aws-sdk-go/service/s3/s3manager"
+	"github.com/jszwec/csvutil"
+
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/client-go/kubernetes"
 )
@@ -27,6 +35,7 @@ import (
 // AWS represents an Amazon Provider
 type AWS struct {
 	Pricing          map[string]*AWSProductTerms
+	SpotPricing      map[string]*spotInfo
 	ValidPricingKeys map[string]bool
 	Clientset        *kubernetes.Clientset
 	BaseCPUPrice     string
@@ -34,6 +43,12 @@ type AWS struct {
 	BaseSpotRAMPrice string
 	SpotLabelName    string
 	SpotLabelValue   string
+	ServiceKeyName   string
+	ServiceKeySecret string
+	SpotDataRegion   string
+	SpotDataBucket   string
+	SpotDataPrefix   string
+	ProjectID        string
 }
 
 // AWSPricing maps a k8s node to an AWS Pricing "product"
@@ -137,22 +152,46 @@ func (aws *AWS) KubeAttrConversion(location, instanceType, operatingSystem strin
 	return region + "," + instanceType + "," + operatingSystem
 }
 
+type awsKey struct {
+	SpotLabelName  string
+	SpotLabelValue string
+	Labels         map[string]string
+	ProviderID     string
+}
+
+func (k *awsKey) ID() string {
+	parsedProviderID := strings.Split(k.ProviderID, "/")
+	if len(parsedProviderID) == 5 { // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
+		return parsedProviderID[4]
+	}
+	return ""
+}
+
 // GetKey maps node labels to information needed to retrieve pricing data
-func (aws *AWS) GetKey(labels map[string]string) string {
-	instanceType := labels["beta.kubernetes.io/instance-type"]
-	operatingSystem := labels["beta.kubernetes.io/os"]
-	region := labels["failure-domain.beta.kubernetes.io/region"]
-	if l, ok := labels["lifecycle"]; ok && l == "EC2Spot" {
+func (k *awsKey) Features() string {
+	instanceType := k.Labels["beta.kubernetes.io/instance-type"]
+	operatingSystem := k.Labels["beta.kubernetes.io/os"]
+	region := k.Labels["failure-domain.beta.kubernetes.io/region"]
+	if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
 		usageType := "preemptible"
 		return region + "," + instanceType + "," + operatingSystem + "," + usageType
 	}
-	if l, ok := labels[aws.SpotLabelName]; ok && l == aws.SpotLabelValue {
+	if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
 		usageType := "preemptible"
 		return region + "," + instanceType + "," + operatingSystem + "," + usageType
 	}
 	return region + "," + instanceType + "," + operatingSystem
 }
 
+func (aws *AWS) GetKey(labels map[string]string) Key {
+	return &awsKey{
+		SpotLabelName:  aws.SpotLabelName,
+		SpotLabelValue: aws.SpotLabelValue,
+		Labels:         labels,
+		ProviderID:     labels["providerID"],
+	}
+}
+
 func (aws *AWS) isPreemptible(key string) bool {
 	s := strings.Split(key, ",")
 	if len(s) == 4 && s[3] == "preemptible" {
@@ -172,7 +211,7 @@ func (aws *AWS) DownloadPricingData() error {
 	for _, n := range nodeList.Items {
 		labels := n.GetObjectMeta().GetLabels()
 		key := aws.GetKey(labels)
-		inputkeys[key] = true
+		inputkeys[key.Features()] = true
 	}
 
 	aws.Pricing = make(map[string]*AWSProductTerms)
@@ -266,6 +305,20 @@ func (aws *AWS) DownloadPricingData() error {
 	aws.BaseSpotRAMPrice = c.SpotRAM
 	aws.SpotLabelName = c.SpotLabel
 	aws.SpotLabelValue = c.SpotLabelValue
+	aws.SpotDataBucket = c.SpotDataBucket
+	aws.SpotDataPrefix = c.SpotDataPrefix
+	aws.ProjectID = c.ProjectID
+	aws.SpotDataRegion = c.SpotDataRegion
+	aws.ServiceKeyName = c.ServiceKeyName
+	aws.ServiceKeySecret = c.ServiceKeySecret
+
+	sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
+	if err != nil {
+		klog.V(1).Infof("ERROR DOWNLOADING SPOT DATA")
+	} else {
+		aws.SpotPricing = sp
+	}
+
 	return nil
 }
 
@@ -274,35 +327,59 @@ func (aws *AWS) AllNodePricing() (interface{}, error) {
 	return aws.Pricing, nil
 }
 
-// NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
-func (aws *AWS) NodePricing(key string) (*Node, error) {
-	//return json.Marshal(aws.Pricing[key])
-	usageType := "ondemand"
+func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
+	key := k.Features()
 	if aws.isPreemptible(key) {
-		usageType = "preemptible"
-	}
-	terms, ok := aws.Pricing[key]
-	if ok {
-		if aws.isPreemptible(key) {
+		if spotInfo, ok := aws.SpotPricing[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
+			var spotcost string
+			arr := strings.Split(spotInfo.Charge, " ")
+			if len(arr) == 2 {
+				spotcost = arr[0]
+			} else {
+				klog.V(2).Infof("Spot node %s not found", k.ID())
+			}
 			return &Node{
+				Cost:         spotcost,
 				VCPU:         terms.VCpu,
-				VCPUCost:     aws.BaseSpotCPUPrice,
 				RAM:          terms.Memory,
-				RAMCost:      aws.BaseSpotRAMPrice,
 				Storage:      terms.Storage,
 				BaseCPUPrice: aws.BaseCPUPrice,
 				UsageType:    usageType,
 			}, nil
 		}
-		cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
 		return &Node{
-			Cost:         cost,
 			VCPU:         terms.VCpu,
+			VCPUCost:     aws.BaseSpotCPUPrice,
 			RAM:          terms.Memory,
+			RAMCost:      aws.BaseSpotRAMPrice,
 			Storage:      terms.Storage,
 			BaseCPUPrice: aws.BaseCPUPrice,
 			UsageType:    usageType,
 		}, nil
+	}
+	cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
+	return &Node{
+		Cost:         cost,
+		VCPU:         terms.VCpu,
+		RAM:          terms.Memory,
+		Storage:      terms.Storage,
+		BaseCPUPrice: aws.BaseCPUPrice,
+		UsageType:    usageType,
+	}, nil
+}
+
+// NodePricing takes in a key from GetKey and returns a Node object for use in building the cost model.
+func (aws *AWS) NodePricing(k Key) (*Node, error) {
+	//return json.Marshal(aws.Pricing[key])
+	key := k.Features()
+	usageType := "ondemand"
+	if aws.isPreemptible(key) {
+		usageType = "preemptible"
+	}
+
+	terms, ok := aws.Pricing[key]
+	if ok {
+		return aws.createNode(terms, usageType, k)
 	} else if _, ok := aws.ValidPricingKeys[key]; ok {
 		err := aws.DownloadPricingData()
 		if err != nil {
@@ -312,26 +389,7 @@ func (aws *AWS) NodePricing(key string) (*Node, error) {
 		if !termsOk {
 			return nil, fmt.Errorf("Unable to find any Pricing data for \"%s\"", key)
 		}
-		if aws.isPreemptible(key) {
-			return &Node{
-				VCPU:         terms.VCpu,
-				VCPUCost:     aws.BaseSpotCPUPrice,
-				RAM:          terms.Memory,
-				RAMCost:      aws.BaseSpotRAMPrice,
-				Storage:      terms.Storage,
-				BaseCPUPrice: aws.BaseCPUPrice,
-				UsageType:    usageType,
-			}, nil
-		}
-		cost := terms.OnDemand.PriceDimensions[terms.Sku+OnDemandRateCode+HourlyRateCode].PricePerUnit.USD
-		return &Node{
-			Cost:         cost,
-			VCPU:         terms.VCpu,
-			RAM:          terms.Memory,
-			Storage:      terms.Storage,
-			BaseCPUPrice: aws.BaseCPUPrice,
-			UsageType:    usageType,
-		}, nil
+		return aws.createNode(terms, usageType, k)
 	} else {
 		return nil, fmt.Errorf("Invalid Pricing Key \"%s\"", key)
 	}
@@ -561,3 +619,112 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 	return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
 
 }
+
+type spotInfo struct {
+	Timestamp   string `csv:"Timestamp"`
+	UsageType   string `csv:"UsageType"`
+	Operation   string `csv:"Operation`
+	InstanceID  string `csv:"InstanceID`
+	MyBidID     string `csv:"MyBidID"`
+	MyMaxPrice  string `csv:"MyMaxPrice"`
+	MarketPrice string `csv:"MarketPrice"`
+	Charge      string `csv:"Charge"`
+	Version     string `csv:"Version"`
+}
+
+func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
+	log.Print("bucket " + bucket)
+	log.Print("prefix " + prefix)
+	log.Print("pID " + projectID)
+	log.Print("region " + region)
+	log.Print("accessKeyName " + accessKeyID)
+	log.Print("secret " + accessKeySecret)
+
+	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
+		os.Setenv("AWS_ACCESS_KEY_ID", accessKeyID)
+		os.Setenv("AWS_SECRET_ACCESS_KEY", accessKeySecret)
+	}
+	c := &aws.Config{
+		Region:      aws.String(region),
+		Credentials: credentials.NewEnvCredentials(),
+	}
+	s := session.Must(session.NewSession(c))
+	s3Svc := s3.New(s)
+	downloader := s3manager.NewDownloaderWithClient(s3Svc)
+
+	t := time.Now().Add(time.Duration(-60) * time.Minute)
+	ls := &s3.ListObjectsInput{
+		Bucket: aws.String(bucket),
+		Prefix: aws.String(prefix + "/" + projectID + "." + t.Format("2006-01-02-15")),
+	}
+	lso, err := s3Svc.ListObjects(ls)
+	if err != nil {
+		log.Printf(err.Error())
+	}
+	var keys []*string
+	log.Printf("GETTING LSO CONTNETS")
+	for _, obj := range lso.Contents {
+		log.Printf("KEY: %s", *obj.Key)
+		keys = append(keys, obj.Key)
+	}
+	spots := make(map[string]*spotInfo)
+	for _, key := range keys {
+		log.Printf("KEY: %s", *key)
+		getObj := &s3.GetObjectInput{
+			Bucket: aws.String(bucket),
+			Key:    key,
+		}
+
+		buf := aws.NewWriteAtBuffer([]byte{})
+		_, err := downloader.Download(buf, getObj)
+		if err != nil {
+			log.Print(err.Error())
+		}
+
+		r := bytes.NewReader(buf.Bytes())
+
+		gr, err := gzip.NewReader(r)
+		if err != nil {
+			log.Print(err.Error())
+			return nil, err
+		}
+		defer gr.Close()
+
+		csvReader := csv.NewReader(gr)
+		csvReader.Comma = '\t'
+		header, err := csvutil.Header(spotInfo{}, "csv")
+		if err != nil {
+			log.Fatal(err)
+		}
+		dec, err := csvutil.NewDecoder(csvReader, header...)
+		if err != nil {
+			return nil, err
+		}
+
+		//header := dec.Header()
+		for {
+			log.Printf("TEST")
+			//log.Printf(strings.Join(dec.Record(), ","))
+			if err := dec.Decode(&spotInfo{}); err == io.EOF {
+				break
+			}
+			st := dec.Record()
+			if len(st) == 9 { // it's tab separated but not quite a tsv
+				log.Printf("SPOT INFO %+v", st)
+				spot := &spotInfo{
+					Timestamp:   st[0],
+					UsageType:   st[1],
+					Operation:   st[2],
+					InstanceID:  st[3],
+					MyBidID:     st[4],
+					MyMaxPrice:  st[5],
+					MarketPrice: st[6],
+					Charge:      st[7],
+					Version:     st[8],
+				}
+				spots[spot.InstanceID] = spot
+			}
+		}
+	}
+	return spots, nil
+}

+ 21 - 7
cloud/gcpprovider.go

@@ -303,7 +303,7 @@ func (gcp *GCP) DownloadPricingData() error {
 	for _, n := range nodeList.Items {
 		labels := n.GetObjectMeta().GetLabels()
 		key := gcp.GetKey(labels)
-		inputkeys[key] = true
+		inputkeys[key.Features()] = true
 	}
 
 	pages, err := gcp.parsePages(inputkeys)
@@ -321,18 +321,32 @@ func (gcp *GCP) DownloadPricingData() error {
 	return nil
 }
 
+type gcpKey struct {
+	Labels map[string]string
+}
+
+func (gcp *GCP) GetKey(labels map[string]string) Key {
+	return &gcpKey{
+		Labels: labels,
+	}
+}
+
+func (gcp *gcpKey) ID() string {
+	return ""
+}
+
 // GetKey maps node labels to information needed to retrieve pricing data
-func (gcp *GCP) GetKey(labels map[string]string) string {
+func (gcp *gcpKey) Features() string {
 
-	instanceType := strings.ToLower(strings.Join(strings.Split(labels["beta.kubernetes.io/instance-type"], "-")[:2], ""))
+	instanceType := strings.ToLower(strings.Join(strings.Split(gcp.Labels["beta.kubernetes.io/instance-type"], "-")[:2], ""))
 	if instanceType == "n1highmem" || instanceType == "n1highcpu" {
 		instanceType = "n1standard" // These are priced the same. TODO: support n1ultrahighmem
 	} else if strings.HasPrefix(instanceType, "custom") {
 		instanceType = "custom" // The suffix of custom does not matter
 	}
-	region := strings.ToLower(labels["failure-domain.beta.kubernetes.io/region"])
+	region := strings.ToLower(gcp.Labels["failure-domain.beta.kubernetes.io/region"])
 	var usageType string
-	if t, ok := labels["cloud.google.com/gke-preemptible"]; ok && t == "true" {
+	if t, ok := gcp.Labels["cloud.google.com/gke-preemptible"]; ok && t == "true" {
 		usageType = "preemptible"
 	} else {
 		usageType = "ondemand"
@@ -346,8 +360,8 @@ func (gcp *GCP) AllNodePricing() (interface{}, error) {
 }
 
 // NodePricing returns GCP pricing data for a single node
-func (gcp *GCP) NodePricing(key string) (*Node, error) {
-	if n, ok := gcp.Pricing[key]; ok {
+func (gcp *GCP) NodePricing(key Key) (*Node, error) {
+	if n, ok := gcp.Pricing[key.Features()]; ok {
 		klog.V(2).Infof("Returning pricing for node %s: %+v from SKU %s", key, n.Node, n.Name)
 		n.Node.BaseCPUPrice = gcp.BaseCPUPrice
 		return n.Node, nil

+ 48 - 17
cloud/provider.go

@@ -32,15 +32,21 @@ type Node struct {
 	UsageType        string `json:"usageType"`
 }
 
+// Key represents a way for nodes to match between the k8s API and a pricing API
+type Key interface {
+	ID() string       // ID represents an exact match
+	Features() string // Features are a comma separated string of node metadata that could match pricing
+}
+
 // Provider represents a k8s provider.
 type Provider interface {
 	ClusterName() ([]byte, error)
 	AddServiceKey(url.Values) error
 	GetDisks() ([]byte, error)
-	NodePricing(string) (*Node, error)
+	NodePricing(Key) (*Node, error)
 	AllNodePricing() (interface{}, error)
 	DownloadPricingData() error
-	GetKey(map[string]string) string
+	GetKey(map[string]string) Key
 
 	QuerySQL(string) ([]byte, error)
 }
@@ -65,14 +71,20 @@ func GetDefaultPricingData(fname string) (*CustomPricing, error) {
 }
 
 type CustomPricing struct {
-	Provider       string `json:"provider"`
-	Description    string `json:"description"`
-	CPU            string `json:"CPU"`
-	SpotCPU        string `json:"spotCPU"`
-	RAM            string `json:"RAM"`
-	SpotRAM        string `json:"spotRAM"`
-	SpotLabel      string `json:"spotLabel,omitempty"`
-	SpotLabelValue string `json:"spotLabelValue,omitempty"`
+	Provider         string `json:"provider"`
+	Description      string `json:"description"`
+	CPU              string `json:"CPU"`
+	SpotCPU          string `json:"spotCPU"`
+	RAM              string `json:"RAM"`
+	SpotRAM          string `json:"spotRAM"`
+	SpotLabel        string `json:"spotLabel,omitempty"`
+	SpotLabelValue   string `json:"spotLabelValue,omitempty"`
+	ServiceKeyName   string `json:"awsServiceKeyName,omitempty"`
+	ServiceKeySecret string `json:"awsServiceKeySecret,omitempty"`
+	SpotDataRegion   string `json:"awsSpotDataRegion,omitempty"`
+	SpotDataBucket   string `json:"awsSpotDataBucket,omitempty"`
+	SpotDataPrefix   string `json:"awsSpotDataPrefix,omitempty"`
+	ProjectID        string `json:"awsProjectID,omitempty"`
 }
 
 type NodePrice struct {
@@ -103,13 +115,14 @@ func (c *CustomProvider) AllNodePricing() (interface{}, error) {
 	return c.Pricing, nil
 }
 
-func (c *CustomProvider) NodePricing(key string) (*Node, error) {
-	if _, ok := c.Pricing[key]; !ok {
-		key = "default"
+func (c *CustomProvider) NodePricing(key Key) (*Node, error) {
+	k := key.Features()
+	if _, ok := c.Pricing[k]; !ok {
+		k = "default"
 	}
 	return &Node{
-		VCPUCost: c.Pricing[key].CPU,
-		RAMCost:  c.Pricing[key].RAM,
+		VCPUCost: c.Pricing[k].CPU,
+		RAMCost:  c.Pricing[k].RAM,
 	}, nil
 }
 
@@ -134,13 +147,31 @@ func (c *CustomProvider) DownloadPricingData() error {
 	return nil
 }
 
-func (c *CustomProvider) GetKey(labels map[string]string) string {
-	if labels[c.SpotLabel] != "" && labels[c.SpotLabel] == c.SpotLabelValue {
+type customProviderKey struct {
+	SpotLabel      string
+	SpotLabelValue string
+	Labels         map[string]string
+}
+
+func (c *customProviderKey) ID() string {
+	return ""
+}
+
+func (c *customProviderKey) Features() string {
+	if c.Labels[c.SpotLabel] != "" && c.Labels[c.SpotLabel] == c.SpotLabelValue {
 		return "default,spot"
 	}
 	return "default" // TODO: multiple custom pricing support.
 }
 
+func (c *CustomProvider) GetKey(labels map[string]string) Key {
+	return &customProviderKey{
+		SpotLabel:      c.SpotLabel,
+		SpotLabelValue: c.SpotLabelValue,
+		Labels:         labels,
+	}
+}
+
 func (*CustomProvider) QuerySQL(query string) ([]byte, error) {
 	return nil, nil
 }

+ 1 - 0
costmodel/costmodel.go

@@ -397,6 +397,7 @@ func getNodeCost(clientset kubernetes.Interface, cloud costAnalyzerCloud.Provide
 	for _, n := range nodeList.Items {
 		name := n.GetObjectMeta().GetName()
 		nodeLabels := n.GetObjectMeta().GetLabels()
+		nodeLabels["providerID"] = n.Spec.ProviderID
 		cnode, err := cloud.NodePricing(cloud.GetKey(nodeLabels))
 		if err != nil {
 			klog.V(1).Infof("Error getting node. Error: " + err.Error())

+ 1 - 0
go.mod

@@ -8,6 +8,7 @@ require (
 	cloud.google.com/go v0.34.0
 	github.com/aws/aws-sdk-go v1.19.10
 	github.com/golang/mock v1.2.0
+	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.2.0
 	github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
 	golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a

+ 2 - 0
go.sum

@@ -66,6 +66,8 @@ github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5i
 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
 github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be h1:AHimNtVIpiBjPUhEF5KNCkrUyqTSA5zWUl8sQ2bfGBE=
 github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/jszwec/csvutil v1.2.1 h1:9+vmGqMdYxIbeDmVbTrVryibx2izwHAfKdPwl4GPNHM=
+github.com/jszwec/csvutil v1.2.1/go.mod h1:8YHz6C3KVdIeCxLMvwbbIVDCTA/Wi2df93AZlQNaE2U=
 github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=