|
@@ -8,6 +8,7 @@ import (
|
|
|
"fmt"
|
|
"fmt"
|
|
|
"io"
|
|
"io"
|
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
|
|
|
+ "log"
|
|
|
"net/http"
|
|
"net/http"
|
|
|
"os"
|
|
"os"
|
|
|
"regexp"
|
|
"regexp"
|
|
@@ -23,6 +24,7 @@ import (
|
|
|
|
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
|
|
|
+ "github.com/aws/aws-sdk-go/aws/credentials"
|
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
|
"github.com/aws/aws-sdk-go/service/athena"
|
|
"github.com/aws/aws-sdk-go/service/athena"
|
|
|
"github.com/aws/aws-sdk-go/service/ec2"
|
|
"github.com/aws/aws-sdk-go/service/ec2"
|
|
@@ -40,6 +42,34 @@ const supportedSpotFeedVersion = "1"
|
|
|
const SpotInfoUpdateType = "spotinfo"
|
|
const SpotInfoUpdateType = "spotinfo"
|
|
|
const AthenaInfoUpdateType = "athenainfo"
|
|
const AthenaInfoUpdateType = "athenainfo"
|
|
|
|
|
|
|
|
|
|
+const defaultConfigPath = "/var/configs/"
|
|
|
|
|
+
|
|
|
|
|
+var awsRegions = []string{
|
|
|
|
|
+ "us-east-2",
|
|
|
|
|
+ "us-east-1",
|
|
|
|
|
+ "us-west-1",
|
|
|
|
|
+ "us-west-2",
|
|
|
|
|
+ "ap-east-1",
|
|
|
|
|
+ "ap-south-1",
|
|
|
|
|
+ "ap-northeast-3",
|
|
|
|
|
+ "ap-northeast-2",
|
|
|
|
|
+ "ap-southeast-1",
|
|
|
|
|
+ "ap-southeast-2",
|
|
|
|
|
+ "ap-northeast-1",
|
|
|
|
|
+ "ca-central-1",
|
|
|
|
|
+ "cn-north-1",
|
|
|
|
|
+ "cn-northwest-1",
|
|
|
|
|
+ "eu-central-1",
|
|
|
|
|
+ "eu-west-1",
|
|
|
|
|
+ "eu-west-2",
|
|
|
|
|
+ "eu-west-3",
|
|
|
|
|
+ "eu-north-1",
|
|
|
|
|
+ "me-south-1",
|
|
|
|
|
+ "sa-east-1",
|
|
|
|
|
+ "us-gov-east-1",
|
|
|
|
|
+ "us-gov-west-1",
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// AWS represents an Amazon Provider
|
|
// AWS represents an Amazon Provider
|
|
|
type AWS struct {
|
|
type AWS struct {
|
|
|
Pricing map[string]*AWSProductTerms
|
|
Pricing map[string]*AWSProductTerms
|
|
@@ -1031,38 +1061,224 @@ func getClusterConfig(ccFile string) (map[string]string, error) {
|
|
|
return clusterConf, nil
|
|
return clusterConf, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
|
|
|
|
|
-func (a *AWS) GetDisks() ([]byte, error) {
|
|
|
|
|
- err := a.configureAWSAuth()
|
|
|
|
|
|
|
+// SetKeyEnv ensures that the two environment variables necessary to configure
|
|
|
|
|
+// a new AWS Session are set.
|
|
|
|
|
+func (a *AWS) SetKeyEnv() error {
|
|
|
|
|
+ // TODO add this to the helm chart, mirroring the cost-model
|
|
|
|
|
+ // configPath := os.Getenv("CONFIG_PATH")
|
|
|
|
|
+ configPath := defaultConfigPath
|
|
|
|
|
+ path := configPath + "aws.json"
|
|
|
|
|
+
|
|
|
|
|
+ if _, err := os.Stat(path); err != nil {
|
|
|
|
|
+ if os.IsNotExist(err) {
|
|
|
|
|
+ log.Printf("error: file %s does not exist", path)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.Printf("error: %s", err)
|
|
|
|
|
+ }
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ jsonFile, err := os.Open(path)
|
|
|
|
|
+ defer jsonFile.Close()
|
|
|
|
|
+
|
|
|
|
|
+ configMap := map[string]string{}
|
|
|
|
|
+ configBytes, err := ioutil.ReadAll(jsonFile)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
+ json.Unmarshal([]byte(configBytes), &configMap)
|
|
|
|
|
+
|
|
|
|
|
+ keyName := configMap["awsServiceKeyName"]
|
|
|
|
|
+ keySecret := configMap["awsServiceKeySecret"]
|
|
|
|
|
+
|
|
|
|
|
+ // These are required before calling NewEnvCredentials below
|
|
|
|
|
+ os.Setenv("AWS_ACCESS_KEY_ID", keyName)
|
|
|
|
|
+ os.Setenv("AWS_SECRET_ACCESS_KEY", keySecret)
|
|
|
|
|
+
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- clusterConfig, err := getClusterConfig("/var/configs/cluster.json")
|
|
|
|
|
|
|
+func (a *AWS) getAddressesForRegion(region string) (*ec2.DescribeAddressesOutput, error) {
|
|
|
|
|
+ sess, err := session.NewSession(&aws.Config{
|
|
|
|
|
+ Region: aws.String(region),
|
|
|
|
|
+ Credentials: credentials.NewEnvCredentials(),
|
|
|
|
|
+ })
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- region := aws.String(clusterConfig["region"])
|
|
|
|
|
- c := &aws.Config{
|
|
|
|
|
- Region: region,
|
|
|
|
|
|
|
+ ec2Svc := ec2.New(sess)
|
|
|
|
|
+ return ec2Svc.DescribeAddresses(&ec2.DescribeAddressesInput{})
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (a *AWS) GetAddresses() ([]byte, error) {
|
|
|
|
|
+ if err := a.SetKeyEnv(); err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
}
|
|
}
|
|
|
- s := session.Must(session.NewSession(c))
|
|
|
|
|
|
|
|
|
|
- ec2Svc := ec2.New(s)
|
|
|
|
|
- input := &ec2.DescribeVolumesInput{}
|
|
|
|
|
- volumeResult, err := ec2Svc.DescribeVolumes(input)
|
|
|
|
|
|
|
+ addressCh := make(chan *ec2.DescribeAddressesOutput, len(awsRegions))
|
|
|
|
|
+ errorCh := make(chan error, len(awsRegions))
|
|
|
|
|
+
|
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
|
+ wg.Add(len(awsRegions))
|
|
|
|
|
+
|
|
|
|
|
+ // Get volumes from each AWS region
|
|
|
|
|
+ for _, r := range awsRegions {
|
|
|
|
|
+ // Fetch IP address response and send results and errors to their
|
|
|
|
|
+ // respective channels
|
|
|
|
|
+ go func(region string) {
|
|
|
|
|
+ defer wg.Done()
|
|
|
|
|
+
|
|
|
|
|
+ // Query for first page of volume results
|
|
|
|
|
+ resp, err := a.getAddressesForRegion(region)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ if aerr, ok := err.(awserr.Error); ok {
|
|
|
|
|
+ switch aerr.Code() {
|
|
|
|
|
+ default:
|
|
|
|
|
+ errorCh <- aerr
|
|
|
|
|
+ }
|
|
|
|
|
+ return
|
|
|
|
|
+ } else {
|
|
|
|
|
+ errorCh <- err
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ addressCh <- resp
|
|
|
|
|
+ }(r)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Close the result channels after everything has been sent
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ wg.Wait()
|
|
|
|
|
+ close(errorCh)
|
|
|
|
|
+ close(addressCh)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ addresses := []*ec2.Address{}
|
|
|
|
|
+ for adds := range addressCh {
|
|
|
|
|
+ addresses = append(addresses, adds.Addresses...)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ errors := []error{}
|
|
|
|
|
+ for err := range errorCh {
|
|
|
|
|
+ log.Printf("error getting addresses: %s", err)
|
|
|
|
|
+ errors = append(errors, err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Return error if no addresses are returned
|
|
|
|
|
+ if len(errors) > 0 && len(addresses) == 0 {
|
|
|
|
|
+ return nil, fmt.Errorf("%d error(s) retrieving addresses: %v", len(errors), errors)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Format the response this way to match the JSON-encoded formatting of a single response
|
|
|
|
|
+ // from DescribeAddresss, so that consumers can always expect AWS disk responses to have
|
|
|
|
|
+ // a "Addresss" key at the top level.
|
|
|
|
|
+ return json.Marshal(map[string][]*ec2.Address{
|
|
|
|
|
+ "Addresses": addresses,
|
|
|
|
|
+ })
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (a *AWS) getDisksForRegion(region string, maxResults int64, nextToken *string) (*ec2.DescribeVolumesOutput, error) {
|
|
|
|
|
+ sess, err := session.NewSession(&aws.Config{
|
|
|
|
|
+ Region: aws.String(region),
|
|
|
|
|
+ Credentials: credentials.NewEnvCredentials(),
|
|
|
|
|
+ })
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- if aerr, ok := err.(awserr.Error); ok {
|
|
|
|
|
- switch aerr.Code() {
|
|
|
|
|
- default:
|
|
|
|
|
- return nil, aerr
|
|
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ec2Svc := ec2.New(sess)
|
|
|
|
|
+ return ec2Svc.DescribeVolumes(&ec2.DescribeVolumesInput{
|
|
|
|
|
+ MaxResults: &maxResults,
|
|
|
|
|
+ NextToken: nextToken,
|
|
|
|
|
+ })
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// GetDisks returns the AWS disks backing PVs. Useful because sometimes k8s will not clean up PVs correctly. Requires a json config in /var/configs with key region.
|
|
|
|
|
+func (a *AWS) GetDisks() ([]byte, error) {
|
|
|
|
|
+ if err := a.SetKeyEnv(); err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ volumeCh := make(chan *ec2.DescribeVolumesOutput, len(awsRegions))
|
|
|
|
|
+ errorCh := make(chan error, len(awsRegions))
|
|
|
|
|
+
|
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
|
+ wg.Add(len(awsRegions))
|
|
|
|
|
+
|
|
|
|
|
+ // Get volumes from each AWS region
|
|
|
|
|
+ for _, r := range awsRegions {
|
|
|
|
|
+ // Fetch volume response and send results and errors to their
|
|
|
|
|
+ // respective channels
|
|
|
|
|
+ go func(region string) {
|
|
|
|
|
+ defer wg.Done()
|
|
|
|
|
+
|
|
|
|
|
+ // Query for first page of volume results
|
|
|
|
|
+ resp, err := a.getDisksForRegion(region, 1000, nil)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ if aerr, ok := err.(awserr.Error); ok {
|
|
|
|
|
+ switch aerr.Code() {
|
|
|
|
|
+ default:
|
|
|
|
|
+ errorCh <- aerr
|
|
|
|
|
+ }
|
|
|
|
|
+ return
|
|
|
|
|
+ } else {
|
|
|
|
|
+ errorCh <- err
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- } else {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ volumeCh <- resp
|
|
|
|
|
+
|
|
|
|
|
+ // A NextToken indicates more pages of results. Keep querying
|
|
|
|
|
+ // until all pages are retrieved.
|
|
|
|
|
+ for resp.NextToken != nil {
|
|
|
|
|
+ resp, err = a.getDisksForRegion(region, 100, resp.NextToken)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ if aerr, ok := err.(awserr.Error); ok {
|
|
|
|
|
+ switch aerr.Code() {
|
|
|
|
|
+ default:
|
|
|
|
|
+ errorCh <- aerr
|
|
|
|
|
+ }
|
|
|
|
|
+ return
|
|
|
|
|
+ } else {
|
|
|
|
|
+ errorCh <- err
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ volumeCh <- resp
|
|
|
|
|
+ }
|
|
|
|
|
+ }(r)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Close the result channels after everything has been sent
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ wg.Wait()
|
|
|
|
|
+ close(errorCh)
|
|
|
|
|
+ close(volumeCh)
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ volumes := []*ec2.Volume{}
|
|
|
|
|
+ for vols := range volumeCh {
|
|
|
|
|
+ volumes = append(volumes, vols.Volumes...)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ errors := []error{}
|
|
|
|
|
+ for err := range errorCh {
|
|
|
|
|
+ log.Printf("error getting disks: %s", err)
|
|
|
|
|
+ errors = append(errors, err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Return error if no volumes are returned
|
|
|
|
|
+ if len(errors) > 0 && len(volumes) == 0 {
|
|
|
|
|
+ return nil, fmt.Errorf("%d error(s) retrieving volumes: %v", len(errors), errors)
|
|
|
}
|
|
}
|
|
|
- return json.Marshal(volumeResult)
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // Format the response this way to match the JSON-encoded formatting of a single response
|
|
|
|
|
+ // from DescribeVolumes, so that consumers can always expect AWS disk responses to have
|
|
|
|
|
+ // a "Volumes" key at the top level.
|
|
|
|
|
+ return json.Marshal(map[string][]*ec2.Volume{
|
|
|
|
|
+ "Volumes": volumes,
|
|
|
|
|
+ })
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// ConvertToGlueColumnFormat takes a string and runs through various regex
|
|
// ConvertToGlueColumnFormat takes a string and runs through various regex
|