Преглед изворни кода

first pass of consistent out of cluster support

AjayTripathy пре 7 година
родитељ
комит
7ede74eb3d
5 измењених фајлова са 328 додато и 52 уклоњено
  1. 6 2
      cloud/aws.json
  2. 169 36
      cloud/awsprovider.go
  3. 77 6
      cloud/gcpprovider.go
  4. 34 4
      cloud/provider.go
  5. 42 4
      main.go

+ 6 - 2
cloud/aws.json

@@ -7,10 +7,14 @@
     "spotRAM": "0.000892",
     "spotLabel": "kops.k8s.io/instancegroup",
     "spotLabelValue": "spotinstance-nodes",
-    "awsServiceKeyName": "AKIAXW6UVLRRTBCUKQFP",
-    "awsServiceKeySecret": "",
+    "awsServiceKeyName": "AKIAXW6UVLRRY5RQGGUX",
+    "awsServiceKeySecret": "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",
     "awsSpotDataRegion":"us-east-2",
     "awsSpotDataBucket": "kc-test-spot",
     "awsSpotDataPrefix": "spotdata",
+    "athenaBucketName": "s3://aws-athena-query-results-530337586275-us-east-1",
+	"athenaRegion": "us-east-1",
+	"athenaDatabase": "athenacurcfn_athena_test",
+	"athenaTable": "athena_test",
     "projectID": "530337586275"
 }

+ 169 - 36
cloud/awsprovider.go

@@ -12,6 +12,7 @@ import (
 	"net/url"
 	"os"
 	"regexp"
+	"strconv"
 	"strings"
 	"time"
 
@@ -34,6 +35,8 @@ import (
 const awsAccessKeyIDEnvVar = "AWS_ACCESS_KEY_ID"
 const awsAccessKeySecretEnvVar = "AWS_SECRET_ACCESS_KEY"
 const supportedSpotFeedVersion = "1"
+const SpotInfoUpdateType = "spotinfo"
+const AthenaInfoUpdateType = "athenainfo"
 
 // AWS represents an Amazon Provider
 type AWS struct {
@@ -52,6 +55,7 @@ type AWS struct {
 	SpotDataBucket          string
 	SpotDataPrefix          string
 	ProjectID               string
+	*CustomProvider
 }
 
 // AWSPricing maps a k8s node to an AWS Pricing "product"
@@ -162,13 +166,23 @@ type AwsSpotFeedInfo struct {
 	BucketName       string `json:"bucketName"`
 	Prefix           string `json:"prefix"`
 	Region           string `json:"region"`
-	AccountID        string `json:"accountId"`
+	AccountID        string `json:"projectID"`
 	ServiceKeyName   string `json:"serviceKeyName"`
 	ServiceKeySecret string `json:"serviceKeySecret"`
 	SpotLabel        string `json:"spotLabel"`
 	SpotLabelValue   string `json:"spotLabelValue"`
 }
 
+type AwsAthenaInfo struct {
+	AthenaBucketName string `json:"athenaBucketName"`
+	AthenaRegion     string `json:"athenaRegion"`
+	AthenaDatabase   string `json:"athenaDatabase"`
+	AthenaTable      string `json:"athenaTable"`
+	ServiceKeyName   string `json:"serviceKeyName"`
+	ServiceKeySecret string `json:"serviceKeySecret"`
+	AccountID        string `json:"projectID"`
+}
+
 func (aws *AWS) GetConfig() (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("aws.json")
 	if err != nil {
@@ -177,26 +191,57 @@ func (aws *AWS) GetConfig() (*CustomPricing, error) {
 	return c, nil
 }
 
-func (aws *AWS) UpdateConfig(r io.Reader) (*CustomPricing, error) {
-	a := AwsSpotFeedInfo{}
-	err := json.NewDecoder(r).Decode(&a)
-	if err != nil {
-		return nil, err
-	}
-
+func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	c, err := GetDefaultPricingData("aws.json")
 	if err != nil {
 		return nil, err
 	}
-	c.ServiceKeyName = a.ServiceKeyName
-	c.ServiceKeySecret = a.ServiceKeySecret
-	c.SpotDataPrefix = a.Prefix
-	c.SpotDataBucket = a.BucketName
-	c.ProjectID = a.AccountID
-	c.SpotDataRegion = a.Region
-	c.SpotLabel = a.SpotLabel
-	c.SpotLabelValue = a.SpotLabelValue
+	if updateType == SpotInfoUpdateType {
+		a := AwsSpotFeedInfo{}
+		err := json.NewDecoder(r).Decode(&a)
+		if err != nil {
+			return nil, err
+		}
 
+		if err != nil {
+			return nil, err
+		}
+		c.ServiceKeyName = a.ServiceKeyName
+		c.ServiceKeySecret = a.ServiceKeySecret
+		c.SpotDataPrefix = a.Prefix
+		c.SpotDataBucket = a.BucketName
+		c.ProjectID = a.AccountID
+		c.SpotDataRegion = a.Region
+		c.SpotLabel = a.SpotLabel
+		c.SpotLabelValue = a.SpotLabelValue
+
+	} else if updateType == AthenaInfoUpdateType {
+		a := AwsAthenaInfo{}
+		err := json.NewDecoder(r).Decode(&a)
+		if err != nil {
+			return nil, err
+		}
+		c.AthenaBucketName = a.AthenaBucketName
+		c.AthenaRegion = a.AthenaRegion
+		c.AthenaDatabase = a.AthenaDatabase
+		c.AthenaTable = a.AthenaTable
+		c.ServiceKeyName = a.ServiceKeyName
+		c.ServiceKeySecret = a.ServiceKeySecret
+		c.ProjectID = a.AccountID
+	} else {
+		a := make(map[string]string)
+		err = json.NewDecoder(r).Decode(&a)
+		if err != nil {
+			return nil, err
+		}
+		for k, v := range a {
+			kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+			err := SetCustomPricingField(c, kUpper, v)
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
 	cj, err := json.Marshal(c)
 	if err != nil {
 		return nil, err
@@ -211,7 +256,6 @@ func (aws *AWS) UpdateConfig(r io.Reader) (*CustomPricing, error) {
 		return nil, err
 	}
 	return c, nil
-
 }
 
 type awsKey struct {
@@ -665,33 +709,123 @@ func (*AWS) GetDisks() ([]byte, error) {
 	return json.Marshal(volumeResult)
 }
 
-func (*AWS) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
-	return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
+func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
+	customPricing, err := a.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	query := fmt.Sprintf(`SELECT   
+		CAST(line_item_usage_start_date AS DATE) as start_date,
+		resource_tags_user_kubernetes_%s,
+		line_item_product_code,
+		SUM(line_item_blended_cost) as blended_cost
+	FROM %s as cost_data
+	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+	GROUP BY 1,2,3`, aggregator, customPricing.AthenaTable, start, end)
+
+	if customPricing.ServiceKeyName != "" {
+		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
+		if err != nil {
+			return nil, err
+		}
+		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
+		if err != nil {
+			return nil, err
+		}
+	}
+	region := aws.String(customPricing.AthenaRegion)
+	resultsBucket := customPricing.AthenaBucketName
+	database := customPricing.AthenaDatabase
+
+	c := &aws.Config{
+		Region: region,
+	}
+	s := session.Must(session.NewSession(c))
+	svc := athena.New(s)
+
+	var e athena.StartQueryExecutionInput
+
+	var r athena.ResultConfiguration
+	r.SetOutputLocation(resultsBucket)
+	e.SetResultConfiguration(&r)
+
+	e.SetQueryString(query)
+	var q athena.QueryExecutionContext
+	q.SetDatabase(database)
+	e.SetQueryExecutionContext(&q)
+
+	res, err := svc.StartQueryExecution(&e)
+	if err != nil {
+		return nil, err
+	}
+
+	klog.V(2).Infof("StartQueryExecution result:")
+	klog.V(2).Infof(res.GoString())
+
+	var qri athena.GetQueryExecutionInput
+	qri.SetQueryExecutionId(*res.QueryExecutionId)
+
+	var qrop *athena.GetQueryExecutionOutput
+	duration := time.Duration(2) * time.Second // Pause for 2 seconds
+
+	for {
+		qrop, err = svc.GetQueryExecution(&qri)
+		if err != nil {
+			return nil, err
+		}
+		if *qrop.QueryExecution.Status.State != "RUNNING" {
+			break
+		}
+		time.Sleep(duration)
+	}
+	var oocAllocs []*OutOfClusterAllocation
+	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
+
+		var ip athena.GetQueryResultsInput
+		ip.SetQueryExecutionId(*res.QueryExecutionId)
+
+		op, err := svc.GetQueryResults(&ip)
+		if err != nil {
+			return nil, err
+		}
+
+		for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
+
+			cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+			if err != nil {
+				return nil, err
+			}
+			ooc := &OutOfClusterAllocation{
+				Aggregator:  aggregator,
+				Environment: *r.Data[1].VarCharValue,
+				Service:     *r.Data[2].VarCharValue,
+				Cost:        cost,
+			}
+			oocAllocs = append(oocAllocs, ooc)
+		}
+	}
+
+	return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
 }
 
 // QuerySQL can query a properly configured Athena database.
 // Used to fetch billing data.
 // Requires a json config in /var/configs with key region, output, and database.
-func (*AWS) QuerySQL(query string) ([]byte, error) {
-	jsonFile, err := os.Open("/var/configs/key.json")
-	if err == nil {
-		byteValue, _ := ioutil.ReadAll(jsonFile)
-		var result map[string]string
-		json.Unmarshal([]byte(byteValue), &result)
-		err = os.Setenv(awsAccessKeyIDEnvVar, result["access_key_ID"])
+func (a *AWS) QuerySQL(query string) ([]byte, error) {
+	customPricing, err := a.GetConfig()
+	if err != nil {
+		return nil, err
+	}
+	if customPricing.ServiceKeyName != "" {
+		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
 		if err != nil {
 			return nil, err
 		}
-		err = os.Setenv(awsAccessKeySecretEnvVar, result["secret_access_key"])
+		err = os.Setenv(awsAccessKeySecretEnvVar, customPricing.ServiceKeySecret)
 		if err != nil {
 			return nil, err
 		}
-	} else if os.IsNotExist(err) {
-		klog.V(2).Infof("Using Default Credentials")
-	} else {
-		return nil, err
 	}
-	defer jsonFile.Close()
 	athenaConfigs, err := os.Open("/var/configs/athena.json")
 	if err != nil {
 		return nil, err
@@ -703,9 +837,9 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 	}
 	var athenaConf map[string]string
 	json.Unmarshal([]byte(b), &athenaConf)
-	region := aws.String(athenaConf["region"])
-	resultsBucket := athenaConf["output"]
-	database := athenaConf["database"]
+	region := aws.String(customPricing.AthenaRegion)
+	resultsBucket := customPricing.AthenaBucketName
+	database := customPricing.AthenaDatabase
 
 	c := &aws.Config{
 		Region: region,
@@ -765,7 +899,6 @@ func (*AWS) QuerySQL(query string) ([]byte, error) {
 		return b, nil
 	}
 	return nil, fmt.Errorf("Error getting query results : %s", *qrop.QueryExecution.Status.State)
-
 }
 
 type spotInfo struct {

+ 77 - 6
cloud/gcpprovider.go

@@ -9,6 +9,7 @@ import (
 	"math"
 	"net/http"
 	"net/url"
+	"os"
 	"regexp"
 	"strconv"
 	"strings"
@@ -27,6 +28,7 @@ import (
 )
 
 const GKE_GPU_TAG = "cloud.google.com/gke-accelerator"
+const BigqueryUpdateType = "bigqueryupdate"
 
 type userAgentTransport struct {
 	userAgent string
@@ -46,6 +48,7 @@ type GCP struct {
 	BaseCPUPrice       string
 	ProjectID          string
 	BillingDataDataset string
+	*CustomProvider
 }
 
 type gcpAllocation struct {
@@ -75,14 +78,82 @@ func gcpAllocationToOutOfClusterAllocation(gcpAlloc gcpAllocation) *OutOfCluster
 }
 
 func (gcp *GCP) GetConfig() (*CustomPricing, error) {
-	return nil, nil
+	c, err := GetDefaultPricingData("gcp.json")
+	if err != nil {
+		return nil, err
+	}
+	return c, nil
+}
+
+type BigQueryConfig struct {
+	ProjectID          string            `json:"projectID"`
+	BillingDataDataset string            `json:"billingDataDataset"`
+	Key                map[string]string `json:"key"`
 }
 
-func (gcp *GCP) UpdateConfig(r io.Reader) (*CustomPricing, error) {
-	return nil, nil
+func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
+	c, err := GetDefaultPricingData("gcp.json")
+	if err != nil {
+		return nil, err
+	}
+	path := os.Getenv("CONFIG_PATH")
+	if path == "" {
+		path = "/models/"
+	}
+	if updateType == BigqueryUpdateType {
+		a := BigQueryConfig{}
+		err = json.NewDecoder(r).Decode(&a)
+		if err != nil {
+			return nil, err
+		}
+
+		c.ProjectID = a.ProjectID
+		c.BillingDataDataset = a.BillingDataDataset
+
+		j, err := json.Marshal(a.Key)
+		if err != nil {
+			return nil, err
+		}
+
+		keyPath := path + "key.json"
+		err = ioutil.WriteFile(keyPath, j, 0644)
+		if err != nil {
+			return nil, err
+		}
+	} else {
+		a := make(map[string]string)
+		err = json.NewDecoder(r).Decode(&a)
+		if err != nil {
+			return nil, err
+		}
+		for k, v := range a {
+			kUpper := strings.Title(k) // Just so we consistently supply / receive the same values, uppercase the first letter.
+			err := SetCustomPricingField(c, kUpper, v)
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	cj, err := json.Marshal(c)
+	if err != nil {
+		return nil, err
+	}
+
+	configPath := path + "gcp.json"
+	err = ioutil.WriteFile(configPath, cj, 0644)
+	if err != nil {
+		return nil, err
+	}
+
+	return c, nil
+
 }
 
-func (gcp *GCP) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
+func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
+	c, err := GetDefaultPricingData("gcp.json")
+	if err != nil {
+		return nil, err
+	}
 	// start, end formatted like: "2019-04-20 00:00:00"
 	queryString := fmt.Sprintf(`SELECT
 					service,
@@ -97,8 +168,8 @@ func (gcp *GCP) ExternalAllocations(start string, end string) ([]*OutOfClusterAl
 						WHERE usage_start_time >= "%s" AND usage_start_time < "%s")
 						LEFT JOIN UNNEST(labels) as labels
 						ON labels.key = "kubernetes_namespace" OR labels.key = "kubernetes_container" OR labels.key = "kubernetes_deployment" OR labels.key = "kubernetes_pod" OR labels.key = "kubernetes_daemonset"
-				GROUP BY aggregator, environment, service;`, gcp.BillingDataDataset, start, end) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
-	klog.V(3).Infof("Querying \"%s\" with : %s", gcp.ProjectID, queryString)
+				GROUP BY aggregator, environment, service;`, c.BillingDataDataset, start, end) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
+	klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
 	return gcp.QuerySQL(queryString)
 }
 

+ 34 - 4
cloud/provider.go

@@ -3,10 +3,12 @@ package cloud
 import (
 	"encoding/json"
 	"errors"
+	"fmt"
 	"io"
 	"io/ioutil"
 	"net/url"
 	"os"
+	"reflect"
 	"strings"
 
 	"k8s.io/klog"
@@ -61,10 +63,10 @@ type Provider interface {
 	AllNodePricing() (interface{}, error)
 	DownloadPricingData() error
 	GetKey(map[string]string) Key
-	UpdateConfig(r io.Reader) (*CustomPricing, error)
+	UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 
-	ExternalAllocations(string, string) ([]*OutOfClusterAllocation, error)
+	ExternalAllocations(string, string, string) ([]*OutOfClusterAllocation, error)
 }
 
 // GetDefaultPricingData will search for a json file representing pricing data in /models/ and use it for base pricing info.
@@ -114,6 +116,8 @@ func GetDefaultPricingData(fname string) (*CustomPricing, error) {
 	}
 }
 
+const KeyUpdateType = "athenainfo"
+
 type CustomPricing struct {
 	Provider           string `json:"provider"`
 	Description        string `json:"description"`
@@ -129,9 +133,35 @@ type CustomPricing struct {
 	SpotDataBucket     string `json:"awsSpotDataBucket,omitempty"`
 	SpotDataPrefix     string `json:"awsSpotDataPrefix,omitempty"`
 	ProjectID          string `json:"projectID,omitempty"`
+	AthenaBucketName   string `json:"athenaBucketName"`
+	AthenaRegion       string `json:"athenaRegion"`
+	AthenaDatabase     string `json:"athenaDatabase"`
+	AthenaTable        string `json:"athenaTable"`
 	BillingDataDataset string `json:"billingDataDataset,omitempty"`
 }
 
+func SetCustomPricingField(obj *CustomPricing, name string, value string) error {
+	structValue := reflect.ValueOf(obj).Elem()
+	structFieldValue := structValue.FieldByName(name)
+
+	if !structFieldValue.IsValid() {
+		return fmt.Errorf("No such field: %s in obj", name)
+	}
+
+	if !structFieldValue.CanSet() {
+		return fmt.Errorf("Cannot set %s field value", name)
+	}
+
+	structFieldType := structFieldValue.Type()
+	val := reflect.ValueOf(value)
+	if structFieldType != val.Type() {
+		return fmt.Errorf("Provided value type didn't match custom pricing field type")
+	}
+
+	structFieldValue.Set(val)
+	return nil
+}
+
 type NodePrice struct {
 	CPU string
 	RAM string
@@ -148,7 +178,7 @@ func (*CustomProvider) GetConfig() (*CustomPricing, error) {
 	return nil, nil
 }
 
-func (*CustomProvider) UpdateConfig(r io.Reader) (*CustomPricing, error) {
+func (*CustomProvider) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, error) {
 	return nil, nil
 }
 
@@ -229,7 +259,7 @@ func (c *CustomProvider) GetKey(labels map[string]string) Key {
 	}
 }
 
-func (*CustomProvider) ExternalAllocations(start string, end string) ([]*OutOfClusterAllocation, error) {
+func (*CustomProvider) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
 	return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
 }
 

+ 42 - 4
main.go

@@ -111,8 +111,9 @@ func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps
 
 	start := r.URL.Query().Get("start")
 	end := r.URL.Query().Get("end")
+	aggregator := r.URL.Query().Get("aggregator")
 
-	data, err := a.Cloud.ExternalAllocations(start, end)
+	data, err := a.Cloud.ExternalAllocations(start, end, aggregator)
 	w.Write(wrapData(data, err))
 }
 
@@ -131,10 +132,10 @@ func (p *Accesses) GetConfigs(w http.ResponseWriter, r *http.Request, ps httprou
 	w.Write(wrapData(data, err))
 }
 
-func (p *Accesses) UpdateConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+func (p *Accesses) UpdateSpotInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	data, err := p.Cloud.UpdateConfig(r.Body)
+	data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.SpotInfoUpdateType)
 	if err != nil {
 		w.Write(wrapData(data, err))
 		return
@@ -147,6 +148,40 @@ func (p *Accesses) UpdateConfigs(w http.ResponseWriter, r *http.Request, ps http
 	return
 }
 
+func (p *Accesses) UpdateAthenaInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.AthenaInfoUpdateType)
+	if err != nil {
+		w.Write(wrapData(data, err))
+		return
+	}
+	w.Write(wrapData(data, err))
+	return
+}
+
+func (p *Accesses) UpdateBigQueryInfoConfigs(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	data, err := p.Cloud.UpdateConfig(r.Body, costAnalyzerCloud.BigqueryUpdateType)
+	if err != nil {
+		w.Write(wrapData(data, err))
+		return
+	}
+	w.Write(wrapData(data, err))
+	return
+}
+
+func (p *Accesses) UpdateConfigByKey(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	data, err := p.Cloud.UpdateConfig(r.Body, "")
+	if err != nil {
+		w.Write(wrapData(data, err))
+		return
+	}
+	w.Write(wrapData(data, err))
+	return
+}
+
 func Healthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 	w.WriteHeader(200)
 	w.Header().Set("Content-Length", "0")
@@ -303,7 +338,10 @@ func main() {
 	router.GET("/healthz", Healthz)
 	router.GET("/getConfigs", a.GetConfigs)
 	router.POST("/refreshPricing", a.RefreshPricingData)
-	router.POST("/updateConfigs", a.UpdateConfigs)
+	router.POST("/updateSpotInfoConfigs", a.UpdateSpotInfoConfigs)
+	router.POST("/updateAthenaInfoConfigs", a.UpdateAthenaInfoConfigs)
+	router.POST("/updateBigQueryInfoConfigs", a.UpdateBigQueryInfoConfigs)
+	router.POST("/updateConfigByKey", a.UpdateConfigByKey)
 
 	rootMux := http.NewServeMux()
 	rootMux.Handle("/", router)