AjayTripathy 7 лет назад
Родитель
Сommit
773f403f1d
1 измененных файлов с 86 добавлено и 45 удалено
  1. 86 45
      cloud/awsprovider.go

+ 86 - 45
cloud/awsprovider.go

@@ -8,7 +8,6 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"net/http"
 	"net/url"
 	"os"
@@ -20,7 +19,6 @@ import (
 
 	"github.com/aws/aws-sdk-go/aws"
 	"github.com/aws/aws-sdk-go/aws/awserr"
-	"github.com/aws/aws-sdk-go/aws/credentials"
 	"github.com/aws/aws-sdk-go/aws/session"
 	"github.com/aws/aws-sdk-go/service/athena"
 	"github.com/aws/aws-sdk-go/service/ec2"
@@ -33,23 +31,26 @@ import (
 	"k8s.io/client-go/kubernetes"
 )
 
+const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
+const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
+
 // AWS represents an Amazon Provider
 type AWS struct {
-	Pricing          map[string]*AWSProductTerms
-	SpotPricing      map[string]*spotInfo
-	ValidPricingKeys map[string]bool
-	Clientset        *kubernetes.Clientset
-	BaseCPUPrice     string
-	BaseSpotCPUPrice string
-	BaseSpotRAMPrice string
-	SpotLabelName    string
-	SpotLabelValue   string
-	ServiceKeyName   string
-	ServiceKeySecret string
-	SpotDataRegion   string
-	SpotDataBucket   string
-	SpotDataPrefix   string
-	ProjectID        string
+	Pricing                 map[string]*AWSProductTerms
+	SpotPricingByInstanceID map[string]*spotInfo
+	ValidPricingKeys        map[string]bool
+	Clientset               *kubernetes.Clientset
+	BaseCPUPrice            string
+	BaseSpotCPUPrice        string
+	BaseSpotRAMPrice        string
+	SpotLabelName           string
+	SpotLabelValue          string
+	ServiceKeyName          string
+	ServiceKeySecret        string
+	SpotDataRegion          string
+	SpotDataBucket          string
+	SpotDataPrefix          string
+	ProjectID               string
 }
 
 // AWSPricing maps a k8s node to an AWS Pricing "product"
@@ -167,6 +168,7 @@ func (k *awsKey) ID() string {
 			return group
 		}
 	}
+	klog.V(3).Info("Could not find instance ID in " + k.ProviderID)
 	return ""
 }
 
@@ -179,13 +181,15 @@ func (k *awsKey) Features() string {
 		operatingSystem = k.Labels["beta.kubernetes.io/os"]
 	}
 	region := k.Labels[v1.LabelZoneRegion]
+
+	key := region + "," + instanceType + "," + operatingSystem
+	usageType := "preemptible"
+	spotKey := key + "," + usageType
 	if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
-		usageType := "preemptible"
-		return region + "," + instanceType + "," + operatingSystem + "," + usageType
+		return spotKey
 	}
 	if l, ok := k.Labels[k.SpotLabelName]; ok && l == k.SpotLabelValue {
-		usageType := "preemptible"
-		return region + "," + instanceType + "," + operatingSystem + "," + usageType
+		return spotKey
 	}
 	return region + "," + instanceType + "," + operatingSystem
 }
@@ -324,7 +328,7 @@ func (aws *AWS) DownloadPricingData() error {
 	if err != nil {
 		klog.V(1).Infof("Error downloading spot data %s", err.Error())
 	} else {
-		aws.SpotPricing = sp
+		aws.SpotPricingByInstanceID = sp
 	}
 
 	return nil
@@ -338,13 +342,13 @@ func (aws *AWS) AllNodePricing() (interface{}, error) {
 func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k Key) (*Node, error) {
 	key := k.Features()
 	if aws.isPreemptible(key) {
-		if spotInfo, ok := aws.SpotPricing[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
+		if spotInfo, ok := aws.SpotPricingByInstanceID[k.ID()]; ok { // try and match directly to an ID for pricing. We'll still need the features
 			var spotcost string
 			arr := strings.Split(spotInfo.Charge, " ")
 			if len(arr) == 2 {
 				spotcost = arr[0]
 			} else {
-				klog.V(2).Infof("Spot node %s not found", k.ID())
+				klog.V(2).Infof("Spot data for node %s is missing", k.ID())
 			}
 			return &Node{
 				Cost:         spotcost,
@@ -497,9 +501,18 @@ func (*AWS) GetDisks() ([]byte, error) {
 	if err == nil {
 		byteValue, _ := ioutil.ReadAll(jsonFile)
 		var result map[string]string
-		json.Unmarshal([]byte(byteValue), &result)
-		os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
-		os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
+		err := json.Unmarshal([]byte(byteValue), &result)
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
+		if err != nil {
+			return nil, err
+		}
 	} else if os.IsNotExist(err) {
 		klog.V(2).Infof("Using Default Credentials")
 	} else {
@@ -511,13 +524,18 @@ func (*AWS) GetDisks() ([]byte, error) {
 		return nil, err
 	}
 	defer clusterConfig.Close()
-	b, _ := ioutil.ReadAll(clusterConfig)
+	b, err := ioutil.ReadAll(clusterConfig)
+	if err != nil {
+		return nil, err
+	}
 	var clusterConf map[string]string
-	json.Unmarshal([]byte(b), &clusterConf)
+	err = json.Unmarshal([]byte(b), &clusterConf)
+	if err != nil {
+		return nil, err
+	}
 	region := aws.String(clusterConf["region"])
 	c := &aws.Config{
-		Region:      region,
-		Credentials: credentials.NewEnvCredentials(),
+		Region: region,
 	}
 	s := session.Must(session.NewSession(c))
 
@@ -546,8 +564,14 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 		byteValue, _ := ioutil.ReadAll(jsonFile)
 		var result map[string]string
 		json.Unmarshal([]byte(byteValue), &result)
-		os.Setenv("AWS_ACCESS_KEY_ID", result["access_key_ID"])
-		os.Setenv("AWS_SECRET_ACCESS_KEY", result["secret_access_key"])
+		err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
+		if err != nil {
+			return nil, err
+		}
 	} else if os.IsNotExist(err) {
 		klog.V(2).Infof("Using Default Credentials")
 	} else {
@@ -559,7 +583,10 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 		return nil, err
 	}
 	defer athenaConfigs.Close()
-	b, _ := ioutil.ReadAll(athenaConfigs)
+	b, err := ioutil.ReadAll(athenaConfigs)
+	if err != nil {
+		return nil, err
+	}
 	var athenaConf map[string]string
 	json.Unmarshal([]byte(b), &athenaConf)
 	region := aws.String(athenaConf["region"])
@@ -567,8 +594,7 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 	database := athenaConf["database"]
 
 	c := &aws.Config{
-		Region:      region,
-		Credentials: credentials.NewEnvCredentials(),
+		Region: region,
 	}
 	s := session.Must(session.NewSession(c))
 	svc := athena.New(s)
@@ -654,16 +680,30 @@ func (f fnames) Less(i, j int) bool {
 	key1 := strings.Split(*f[i], ".")
 	key2 := strings.Split(*f[j], ".")
 
-	t1, _ := time.Parse("2006-01-02-15", key1[1])
-	t2, _ := time.Parse("2006-01-02-15", key2[1])
+	t1, err := time.Parse("2006-01-02-15", key1[1])
+	if err != nil {
+		klog.V(1).Info("Unable to parse timestamp" + key1[1])
+		return false
+	}
+	t2, err := time.Parse("2006-01-02-15", key2[1])
+	if err != nil {
+		klog.V(1).Info("Unable to parse timestamp" + key2[1])
+		return false
+	}
 	return t1.Before(t2)
 }
 
 func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
 
 	if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
-		os.Setenv("AWS_ACCESS_KEY_ID", accessKeyID)
-		os.Setenv("AWS_SECRET_ACCESS_KEY", accessKeySecret)
+		err := os.Setenv(awsAccessKeyIDEnvVar, accessKeyID)
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, accessKeySecret)
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	c := aws.NewConfig().WithRegion(region)
@@ -672,25 +712,26 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 	s3Svc := s3.New(s)
 	downloader := s3manager.NewDownloaderWithClient(s3Svc)
 
-	tOneDayAgo := time.Now().Add(time.Duration(-60) * time.Minute) // Also get files from one day ago to avoid boundary conditions
+	tNow := time.Now()
+	tOneDayAgo := tNow.Add(time.Duration(-24) * time.Hour) // Also get files from one day ago to avoid boundary conditions
 	ls := &s3.ListObjectsInput{
 		Bucket: aws.String(bucket),
 		Prefix: aws.String(prefix + "/" + projectID + "." + tOneDayAgo.Format("2006-01-02")),
 	}
 	ls2 := &s3.ListObjectsInput{
 		Bucket: aws.String(bucket),
-		Prefix: aws.String(prefix + "/" + projectID + "." + time.Now().Format("2006-01-02")),
+		Prefix: aws.String(prefix + "/" + projectID + "." + tNow.Format("2006-01-02")),
 	}
 	lso, err := s3Svc.ListObjects(ls)
 	if err != nil {
 		return nil, err
 	}
-	klog.V(2).Info("Found " + string(len(lso.Contents)) + " files from yesterday")
+	klog.V(2).Infof("Found %s files from yesterday", string(len(lso.Contents)))
 	lso2, err := s3Svc.ListObjects(ls2)
 	if err != nil {
 		return nil, err
 	}
-	klog.V(2).Info("Found " + string(len(lso.Contents)) + " files from today")
+	klog.V(2).Infof("Found %s files from today", string(len(lso.Contents)))
 
 	var keys []*string
 	for _, obj := range lso.Contents {
@@ -724,7 +765,7 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
 		csvReader.Comma = '\t'
 		header, err := csvutil.Header(spotInfo{}, "csv")
 		if err != nil {
-			log.Fatal(err)
+			return nil, err
 		}
 		dec, err := csvutil.NewDecoder(csvReader, header...)
 		if err != nil {