|
@@ -12,6 +12,7 @@ import (
|
|
|
"net/url"
|
|
"net/url"
|
|
|
"os"
|
|
"os"
|
|
|
"regexp"
|
|
"regexp"
|
|
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
"strings"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
@@ -34,6 +35,8 @@ import (
|
|
|
const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
|
|
const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
|
|
|
const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
|
|
const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
|
|
|
const supportedSpotFeedVersion = "1"
|
|
const supportedSpotFeedVersion = "1"
|
|
|
|
|
+const SpotInfoUpdateType = "spotinfo"
|
|
|
|
|
+const AthenaInfoUpdateType = "athenainfo"
|
|
|
|
|
|
|
|
// AWS represents an Amazon Provider
|
|
// AWS represents an Amazon Provider
|
|
|
type AWS struct {
|
|
type AWS struct {
|
|
@@ -52,6 +55,7 @@ type AWS struct {
|
|
|
SpotDataBucket string
|
|
SpotDataBucket string
|
|
|
SpotDataPrefix string
|
|
SpotDataPrefix string
|
|
|
ProjectID string
|
|
ProjectID string
|
|
|
|
|
+ *CustomProvider
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// AWSPricing maps a k8s node to an AWS Pricing "product"
|
|
// AWSPricing maps a k8s node to an AWS Pricing "product"
|
|
@@ -162,13 +166,23 @@ type AwsSpotFeedInfo struct {
|
|
|
BucketName string `json:"bucketName"`
|
|
BucketName string `json:"bucketName"`
|
|
|
Prefix string `json:"prefix"`
|
|
Prefix string `json:"prefix"`
|
|
|
Region string `json:"region"`
|
|
Region string `json:"region"`
|
|
|
- AccountID string `json:"accountId"`
|
|
|
|
|
|
|
+ AccountID string `json:"projectID"`
|
|
|
ServiceKeyName string `json:"serviceKeyName"`
|
|
ServiceKeyName string `json:"serviceKeyName"`
|
|
|
ServiceKeySecret string `json:"serviceKeySecret"`
|
|
ServiceKeySecret string `json:"serviceKeySecret"`
|
|
|
SpotLabel string `json:"spotLabel"`
|
|
SpotLabel string `json:"spotLabel"`
|
|
|
SpotLabelValue string `json:"spotLabelValue"`
|
|
SpotLabelValue string `json:"spotLabelValue"`
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+type AwsAthenaInfo struct {
|
|
|
|
|
+ AthenaBucketName string `json:"athenaBucketName"`
|
|
|
|
|
+ AthenaRegion string `json:"athenaRegion"`
|
|
|
|
|
+ AthenaDatabase string `json:"athenaDatabase"`
|
|
|
|
|
+ AthenaTable string `json:"athenaTable"`
|
|
|
|
|
+ ServiceKeyName string `json:"serviceKeyName"`
|
|
|
|
|
+ ServiceKeySecret string `json:"serviceKeySecret"`
|
|
|
|
|
+ AccountID string `json:"projectID"`
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (aws *AWS) GetConfig() (*CustomPricing, error) {
|
|
func (aws *AWS) GetConfig() (*CustomPricing, error) {
|
|
|
c, err := GetDefaultPricingData("aws.json")
|
|
c, err := GetDefaultPricingData("aws.json")
|
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -177,26 +191,57 @@ func (aws *AWS) GetConfig() (*CustomPricing, error) {
|
|
|
return c, nil
|
|
return c, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (aws *AWS) UpdateConfig(r io.Reader) (*CustomPricing, error) {
|
|
|
|
|
- a := AwsSpotFeedInfo{}
|
|
|
|
|
- err := json.NewDecoder(r).Decode(&a)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
|
|
|
c, err := GetDefaultPricingData("aws.json")
|
|
c, err := GetDefaultPricingData("aws.json")
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
- c.ServiceKeyName = a.ServiceKeyName
|
|
|
|
|
- c.ServiceKeySecret = a.ServiceKeySecret
|
|
|
|
|
- c.SpotDataPrefix = a.Prefix
|
|
|
|
|
- c.SpotDataBucket = a.BucketName
|
|
|
|
|
- c.ProjectID = a.AccountID
|
|
|
|
|
- c.SpotDataRegion = a.Region
|
|
|
|
|
- c.SpotLabel = a.SpotLabel
|
|
|
|
|
- c.SpotLabelValue = a.SpotLabelValue
|
|
|
|
|
|
|
+ if updateType == SpotInfoUpdateType {
|
|
|
|
|
+ a := AwsSpotFeedInfo{}
|
|
|
|
|
+ err := json.NewDecoder(r).Decode(&a)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ c.ServiceKeyName = a.ServiceKeyName
|
|
|
|
|
+ c.ServiceKeySecret = a.ServiceKeySecret
|
|
|
|
|
+ c.SpotDataPrefix = a.Prefix
|
|
|
|
|
+ c.SpotDataBucket = a.BucketName
|
|
|
|
|
+ c.ProjectID = a.AccountID
|
|
|
|
|
+ c.SpotDataRegion = a.Region
|
|
|
|
|
+ c.SpotLabel = a.SpotLabel
|
|
|
|
|
+ c.SpotLabelValue = a.SpotLabelValue
|
|
|
|
|
+
|
|
|
|
|
+ } else if updateType == AthenaInfoUpdateType {
|
|
|
|
|
+ a := AwsAthenaInfo{}
|
|
|
|
|
+ err := json.NewDecoder(r).Decode(&a)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ c.AthenaBucketName = a.AthenaBucketName
|
|
|
|
|
+ c.AthenaRegion = a.AthenaRegion
|
|
|
|
|
+ c.AthenaDatabase = a.AthenaDatabase
|
|
|
|
|
+ c.AthenaTable = a.AthenaTable
|
|
|
|
|
+ c.ServiceKeyName = a.ServiceKeyName
|
|
|
|
|
+ c.ServiceKeySecret = a.ServiceKeySecret
|
|
|
|
|
+ c.ProjectID = a.AccountID
|
|
|
|
|
+ } else {
|
|
|
|
|
+ a := make(map[string]string)
|
|
|
|
|
+ err = json.NewDecoder(r).Decode(&a)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ for k, v := range a {
|
|
|
|
|
+ kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
|
|
|
|
|
+ err := SetCustomPricingField(c, kUpper, v)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
cj, err := json.Marshal(c)
|
|
cj, err := json.Marshal(c)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
@@ -211,7 +256,6 @@ func (aws *AWS) UpdateConfig(r io.Reader) (*CustomPricing, error) {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
return c, nil
|
|
return c, nil
|
|
|
-
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type awsKey struct {
|
|
type awsKey struct {
|
|
@@ -665,33 +709,123 @@ func (*AWS) GetDisks() ([]byte, error) {
|
|
|
return json.Marshal(volumeResult)
|
|
return json.Marshal(volumeResult)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (*AWS) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
|
|
|
|
|
- return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
|
|
|
|
|
|
|
+func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
|
|
|
|
|
+ customPricing, err := a.GetConfig()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ query := fmt.Sprintf(`SELECT
|
|
|
|
|
+ CAST(line_item_usage_start_date AS DATE) as start_date,
|
|
|
|
|
+ resource_tags_user_kubernetes_%s,
|
|
|
|
|
+ line_item_product_code,
|
|
|
|
|
+ SUM(line_item_blended_cost) as blended_cost
|
|
|
|
|
+ FROM %s as cost_data
|
|
|
|
|
+ WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
|
|
|
|
|
+ GROUP BY 1,2,3`, aggregator, customPricing.AthenaTable, start, end)
|
|
|
|
|
+
|
|
|
|
|
+ if customPricing.ServiceKeyName != "" {
|
|
|
|
|
+ err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ region := aws.String(customPricing.AthenaRegion)
|
|
|
|
|
+ resultsBucket := customPricing.AthenaBucketName
|
|
|
|
|
+ database := customPricing.AthenaDatabase
|
|
|
|
|
+
|
|
|
|
|
+ c := &aws.Config{
|
|
|
|
|
+ Region: region,
|
|
|
|
|
+ }
|
|
|
|
|
+ s := session.Must(session.NewSession(c))
|
|
|
|
|
+ svc := athena.New(s)
|
|
|
|
|
+
|
|
|
|
|
+ var e athena.StartQueryExecutionInput
|
|
|
|
|
+
|
|
|
|
|
+ var r athena.ResultConfiguration
|
|
|
|
|
+ r.SetOutputLocation(resultsBucket)
|
|
|
|
|
+ e.SetResultConfiguration(&r)
|
|
|
|
|
+
|
|
|
|
|
+ e.SetQueryString(query)
|
|
|
|
|
+ var q athena.QueryExecutionContext
|
|
|
|
|
+ q.SetDatabase(database)
|
|
|
|
|
+ e.SetQueryExecutionContext(&q)
|
|
|
|
|
+
|
|
|
|
|
+ res, err := svc.StartQueryExecution(&e)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ klog.V(2).Infof("StartQueryExecution result:")
|
|
|
|
|
+ klog.V(2).Infof(res.GoString())
|
|
|
|
|
+
|
|
|
|
|
+ var qri athena.GetQueryExecutionInput
|
|
|
|
|
+ qri.SetQueryExecutionId(*res.QueryExecutionId)
|
|
|
|
|
+
|
|
|
|
|
+ var qrop *athena.GetQueryExecutionOutput
|
|
|
|
|
+ duration := time.Duration(2) * time.Second // Pause for 2 seconds
|
|
|
|
|
+
|
|
|
|
|
+ for {
|
|
|
|
|
+ qrop, err = svc.GetQueryExecution(&qri)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if *qrop.QueryExecution.Status.State != "RUNNING" {
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ time.Sleep(duration)
|
|
|
|
|
+ }
|
|
|
|
|
+ var oocAllocs []*OutOfClusterAllocation
|
|
|
|
|
+ if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
|
|
|
|
|
+
|
|
|
|
|
+ var ip athena.GetQueryResultsInput
|
|
|
|
|
+ ip.SetQueryExecutionId(*res.QueryExecutionId)
|
|
|
|
|
+
|
|
|
|
|
+ op, err := svc.GetQueryResults(&ip)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
|
|
|
|
|
+
|
|
|
|
|
+ cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ ooc := &OutOfClusterAllocation{
|
|
|
|
|
+ Aggregator: aggregator,
|
|
|
|
|
+ Environment: *r.Data[1].VarCharValue,
|
|
|
|
|
+ Service: *r.Data[2].VarCharValue,
|
|
|
|
|
+ Cost: cost,
|
|
|
|
|
+ }
|
|
|
|
|
+ oocAllocs = append(oocAllocs, ooc)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// QuerySQL can query a properly configured Athena database.
|
|
// QuerySQL can query a properly configured Athena database.
|
|
|
// Used to fetch billing data.
|
|
// Used to fetch billing data.
|
|
|
// Requires a json config in /var/configs with key region, output, and database.
|
|
// Requires a json config in /var/configs with key region, output, and database.
|
|
|
-func (*AWS) QuerySQL(query string) ([]byte, error) {
|
|
|
|
|
- jsonFile, err := os.Open("/var/configs/key.json")
|
|
|
|
|
- if err == nil {
|
|
|
|
|
- byteValue, _ := ioutil.ReadAll(jsonFile)
|
|
|
|
|
- var result map[string]string
|
|
|
|
|
- json.Unmarshal([]byte(byteValue), &result)
|
|
|
|
|
- err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
|
|
|
|
|
|
|
+func (a *AWS) QuerySQL(query string) ([]byte, error) {
|
|
|
|
|
+ customPricing, err := a.GetConfig()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return nil, err
|
|
|
|
|
+ }
|
|
|
|
|
+ if customPricing.ServiceKeyName != "" {
|
|
|
|
|
+ err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
- err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
|
|
|
|
|
|
|
+ err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
- } else if os.IsNotExist(err) {
|
|
|
|
|
- klog.V(2).Infof("Using Default Credentials")
|
|
|
|
|
- } else {
|
|
|
|
|
- return nil, err
|
|
|
|
|
}
|
|
}
|
|
|
- defer jsonFile.Close()
|
|
|
|
|
athenaConfigs, err := os.Open("/var/configs/athena.json")
|
|
athenaConfigs, err := os.Open("/var/configs/athena.json")
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
@@ -703,9 +837,9 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
|
|
|
}
|
|
}
|
|
|
var athenaConf map[string]string
|
|
var athenaConf map[string]string
|
|
|
json.Unmarshal([]byte(b), &athenaConf)
|
|
json.Unmarshal([]byte(b), &athenaConf)
|
|
|
- region := aws.String(athenaConf["region"])
|
|
|
|
|
- resultsBucket := athenaConf["output"]
|
|
|
|
|
- database := athenaConf["database"]
|
|
|
|
|
|
|
+ region := aws.String(customPricing.AthenaRegion)
|
|
|
|
|
+ resultsBucket := customPricing.AthenaBucketName
|
|
|
|
|
+ database := customPricing.AthenaDatabase
|
|
|
|
|
|
|
|
c := &aws.Config{
|
|
c := &aws.Config{
|
|
|
Region: region,
|
|
Region: region,
|
|
@@ -765,7 +899,6 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
|
|
|
return b, nil
|
|
return b, nil
|
|
|
}
|
|
}
|
|
|
return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
|
|
return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
|
|
|
-
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type spotInfo struct {
|
|
type spotInfo struct {
|