|
|
@@ -28,6 +28,7 @@ import (
|
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
|
|
"github.com/jszwec/csvutil"
|
|
|
|
|
|
+ v1 "k8s.io/api/core/v1"
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
"k8s.io/client-go/kubernetes"
|
|
|
)
|
|
|
@@ -160,18 +161,24 @@ type awsKey struct {
|
|
|
}
|
|
|
|
|
|
func (k *awsKey) ID() string {
|
|
|
- parsedProviderID := strings.Split(k.ProviderID, "/")
|
|
|
- if len(parsedProviderID) == 5 { // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
|
|
|
- return parsedProviderID[4]
|
|
|
+ provIdRx := regexp.MustCompile("aws:///([^/]+)/([^/]+)") // It's of the form aws:///us-east-2a/i-0fea4fd46592d050b and we want i-0fea4fd46592d050b, if it exists
|
|
|
+ for matchNum, group := range provIdRx.FindStringSubmatch(k.ProviderID) {
|
|
|
+ if matchNum == 2 {
|
|
|
+ return group
|
|
|
+ }
|
|
|
}
|
|
|
return ""
|
|
|
}
|
|
|
|
|
|
-// GetKey maps node labels to information needed to retrieve pricing data
|
|
|
func (k *awsKey) Features() string {
|
|
|
- instanceType := k.Labels["beta.kubernetes.io/instance-type"]
|
|
|
- operatingSystem := k.Labels["beta.kubernetes.io/os"]
|
|
|
- region := k.Labels["failure-domain.beta.kubernetes.io/region"]
|
|
|
+
|
|
|
+ instanceType := k.Labels[v1.LabelInstanceType]
|
|
|
+ var operatingSystem string
|
|
|
+ operatingSystem, ok := k.Labels[v1.LabelOSStable]
|
|
|
+ if !ok {
|
|
|
+ operatingSystem = k.Labels["beta.kubernetes.io/os"]
|
|
|
+ }
|
|
|
+ region := k.Labels[v1.LabelZoneRegion]
|
|
|
if l, ok := k.Labels["lifecycle"]; ok && l == "EC2Spot" {
|
|
|
usageType := "preemptible"
|
|
|
return region + "," + instanceType + "," + operatingSystem + "," + usageType
|
|
|
@@ -183,6 +190,7 @@ func (k *awsKey) Features() string {
|
|
|
return region + "," + instanceType + "," + operatingSystem
|
|
|
}
|
|
|
|
|
|
+// GetKey maps node labels to information needed to retrieve pricing data
|
|
|
func (aws *AWS) GetKey(labels map[string]string) Key {
|
|
|
return &awsKey{
|
|
|
SpotLabelName: aws.SpotLabelName,
|
|
|
@@ -314,7 +322,7 @@ func (aws *AWS) DownloadPricingData() error {
|
|
|
|
|
|
sp, err := parseSpotData(aws.SpotDataBucket, aws.SpotDataPrefix, aws.ProjectID, aws.SpotDataRegion, aws.ServiceKeyName, aws.ServiceKeySecret)
|
|
|
if err != nil {
|
|
|
- klog.V(1).Infof("ERROR DOWNLOADING SPOT DATA")
|
|
|
+ klog.V(1).Infof("Error downloading spot data %s", err.Error())
|
|
|
} else {
|
|
|
aws.SpotPricing = sp
|
|
|
}
|
|
|
@@ -503,9 +511,9 @@ func (*AWS) GetDisks() ([]byte, error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
defer clusterConfig.Close()
|
|
|
- bytes, _ := ioutil.ReadAll(clusterConfig)
|
|
|
+ b, _ := ioutil.ReadAll(clusterConfig)
|
|
|
var clusterConf map[string]string
|
|
|
- json.Unmarshal([]byte(bytes), &clusterConf)
|
|
|
+ json.Unmarshal([]byte(b), &clusterConf)
|
|
|
region := aws.String(clusterConf["region"])
|
|
|
c := &aws.Config{
|
|
|
Region: region,
|
|
|
@@ -551,9 +559,9 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
defer athenaConfigs.Close()
|
|
|
- bytes, _ := ioutil.ReadAll(athenaConfigs)
|
|
|
+ b, _ := ioutil.ReadAll(athenaConfigs)
|
|
|
var athenaConf map[string]string
|
|
|
- json.Unmarshal([]byte(bytes), &athenaConf)
|
|
|
+ json.Unmarshal([]byte(b), &athenaConf)
|
|
|
region := aws.String(athenaConf["region"])
|
|
|
resultsBucket := athenaConf["output"]
|
|
|
database := athenaConf["database"]
|
|
|
@@ -609,12 +617,12 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- bytes, err := json.Marshal(op.ResultSet)
|
|
|
+ b, err := json.Marshal(op.ResultSet)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- return bytes, nil
|
|
|
+ return b, nil
|
|
|
}
|
|
|
return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
|
|
|
|
|
|
@@ -623,8 +631,8 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
|
|
|
type spotInfo struct {
|
|
|
Timestamp string `csv:"Timestamp"`
|
|
|
UsageType string `csv:"UsageType"`
|
|
|
- Operation string `csv:"Operation`
|
|
|
- InstanceID string `csv:"InstanceID`
|
|
|
+ Operation string `csv:"Operation"`
|
|
|
+ InstanceID string `csv:"InstanceID"`
|
|
|
MyBidID string `csv:"MyBidID"`
|
|
|
MyMaxPrice string `csv:"MyMaxPrice"`
|
|
|
MarketPrice string `csv:"MarketPrice"`
|
|
|
@@ -652,21 +660,14 @@ func (f fnames) Less(i, j int) bool {
|
|
|
}
|
|
|
|
|
|
func parseSpotData(bucket string, prefix string, projectID string, region string, accessKeyID string, accessKeySecret string) (map[string]*spotInfo, error) {
|
|
|
- log.Print("bucket " + bucket)
|
|
|
- log.Print("prefix " + prefix)
|
|
|
- log.Print("pID " + projectID)
|
|
|
- log.Print("region " + region)
|
|
|
- log.Print("accessKeyName " + accessKeyID)
|
|
|
- log.Print("secret " + accessKeySecret)
|
|
|
|
|
|
if accessKeyID != "" && accessKeySecret != "" { // credentials may exist on the actual AWS node-- if so, use those. If not, override with the service key
|
|
|
os.Setenv("AWS_ACCESS_KEY_ID", accessKeyID)
|
|
|
os.Setenv("AWS_SECRET_ACCESS_KEY", accessKeySecret)
|
|
|
}
|
|
|
- c := &aws.Config{
|
|
|
- Region: aws.String(region),
|
|
|
- Credentials: credentials.NewEnvCredentials(),
|
|
|
- }
|
|
|
+
|
|
|
+ c := aws.NewConfig().WithRegion(region)
|
|
|
+
|
|
|
s := session.Must(session.NewSession(c))
|
|
|
s3Svc := s3.New(s)
|
|
|
downloader := s3manager.NewDownloaderWithClient(s3Svc)
|
|
|
@@ -709,17 +710,15 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
|
|
|
buf := aws.NewWriteAtBuffer([]byte{})
|
|
|
_, err := downloader.Download(buf, getObj)
|
|
|
if err != nil {
|
|
|
- log.Print(err.Error())
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
r := bytes.NewReader(buf.Bytes())
|
|
|
|
|
|
gr, err := gzip.NewReader(r)
|
|
|
if err != nil {
|
|
|
- log.Print(err.Error())
|
|
|
return nil, err
|
|
|
}
|
|
|
- defer gr.Close()
|
|
|
|
|
|
csvReader := csv.NewReader(gr)
|
|
|
csvReader.Comma = '\t'
|
|
|
@@ -753,6 +752,7 @@ func parseSpotData(bucket string, prefix string, projectID string, region string
|
|
|
spots[spot.InstanceID] = spot
|
|
|
}
|
|
|
}
|
|
|
+ gr.Close()
|
|
|
}
|
|
|
return spots, nil
|
|
|
}
|