|
|
@@ -62,6 +62,43 @@ type gcpAllocation struct {
|
|
|
Cost float64
|
|
|
}
|
|
|
|
|
|
+type multiKeyGCPAllocation struct {
|
|
|
+ Keys bigquery.NullString
|
|
|
+ Service string
|
|
|
+ Cost float64
|
|
|
+}
|
|
|
+
|
|
|
+func multiKeyGCPAllocationToOutOfClusterAllocation(gcpAlloc multiKeyGCPAllocation, aggregatorNames []string) *OutOfClusterAllocation {
|
|
|
+ var keys []map[string]string
|
|
|
+ var environment string
|
|
|
+ var usedAggregatorName string
|
|
|
+ if gcpAlloc.Keys.Valid {
|
|
|
+ klog.Infof("VALID!!!!!!!!")
|
|
|
+ err := json.Unmarshal([]byte(gcpAlloc.Keys.StringVal), &keys)
|
|
|
+ if err != nil {
|
|
|
+ klog.Infof("Invalid unmarshaling response from BigQuery filtered query: %s", err.Error())
|
|
|
+ }
|
|
|
+ keyloop:
|
|
|
+ for _, label := range keys {
|
|
|
+ for _, aggregatorName := range aggregatorNames {
|
|
|
+ if label["key"] == aggregatorName {
|
|
|
+ environment = label["value"]
|
|
|
+ usedAggregatorName = label["key"]
|
|
|
+ klog.Infof("ENVIRONMENT: %s", environment)
|
|
|
+ klog.Infof("AGGREGATOR NAME: %s", usedAggregatorName)
|
|
|
+ break keyloop
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return &OutOfClusterAllocation{
|
|
|
+ Aggregator: usedAggregatorName,
|
|
|
+ Environment: environment,
|
|
|
+ Service: gcpAlloc.Service,
|
|
|
+ Cost: gcpAlloc.Cost,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func gcpAllocationToOutOfClusterAllocation(gcpAlloc gcpAllocation) *OutOfClusterAllocation {
|
|
|
var aggregator string
|
|
|
if gcpAlloc.Aggregator.Valid {
|
|
|
@@ -171,6 +208,19 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ } else if updateType == AthenaInfoUpdateType {
|
|
|
+ a := AwsAthenaInfo{}
|
|
|
+ err := json.NewDecoder(r).Decode(&a)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ c.AthenaBucketName = a.AthenaBucketName
|
|
|
+ c.AthenaRegion = a.AthenaRegion
|
|
|
+ c.AthenaDatabase = a.AthenaDatabase
|
|
|
+ c.AthenaTable = a.AthenaTable
|
|
|
+ c.ServiceKeyName = a.ServiceKeyName
|
|
|
+ c.ServiceKeySecret = a.ServiceKeySecret
|
|
|
+ c.AthenaProjectID = a.AccountID
|
|
|
} else {
|
|
|
a := make(map[string]interface{})
|
|
|
err := json.NewDecoder(r).Decode(&a)
|
|
|
@@ -211,28 +261,131 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
|
|
|
// ExternalAllocations represents tagged assets outside the scope of kubernetes.
|
|
|
// "start" and "end" are dates of the format YYYY-MM-DD
|
|
|
// "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
|
|
|
-func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
|
|
|
+func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
|
|
|
c, err := gcp.Config.GetCustomPricingData()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- // start, end formatted like: "2019-04-20 00:00:00"
|
|
|
- queryString := fmt.Sprintf(`SELECT
|
|
|
- service,
|
|
|
- labels.key as aggregator,
|
|
|
- labels.value as environment,
|
|
|
- SUM(cost) as cost
|
|
|
- FROM (SELECT
|
|
|
- service.description as service,
|
|
|
- labels,
|
|
|
- cost
|
|
|
- FROM %s
|
|
|
- WHERE usage_start_time >= "%s" AND usage_start_time < "%s")
|
|
|
- LEFT JOIN UNNEST(labels) as labels
|
|
|
- ON labels.key = "%s"
|
|
|
- GROUP BY aggregator, environment, service;`, c.BillingDataDataset, start, end, aggregator) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
|
|
|
- klog.V(4).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
|
|
|
- return gcp.QuerySQL(queryString)
|
|
|
+ var s []*OutOfClusterAllocation
|
|
|
+ if c.ServiceKeyName != "" && c.ServiceKeySecret != "" {
|
|
|
+ aws, err := NewCrossClusterProvider("aws", "gcp.json", gcp.Clientset)
|
|
|
+ if err != nil {
|
|
|
+ klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
|
|
|
+ }
|
|
|
+ awsOOC, err := aws.ExternalAllocations(start, end, aggregators, filterType, filterValue)
|
|
|
+ if err != nil {
|
|
|
+ klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
|
|
|
+ }
|
|
|
+ s = append(s, awsOOC...)
|
|
|
+ }
|
|
|
+
|
|
|
+ formattedAggregators := []string{}
|
|
|
+ for _, a := range aggregators {
|
|
|
+ formattedAggregators = append(formattedAggregators, strconv.Quote(a))
|
|
|
+ }
|
|
|
+
|
|
|
+ aggregator := strings.Join(formattedAggregators, ",")
|
|
|
+
|
|
|
+ var qerr error
|
|
|
+ if filterType == "kubernetes_" {
|
|
|
+ // start, end formatted like: "2019-04-20 00:00:00"
|
|
|
+ /* OLD METHOD: supported getting all data, including unaggregated.
|
|
|
+ queryString := fmt.Sprintf(`SELECT
|
|
|
+ service,
|
|
|
+ labels.key as aggregator,
|
|
|
+ labels.value as environment,
|
|
|
+ SUM(cost) as cost
|
|
|
+ FROM (SELECT
|
|
|
+ service.description as service,
|
|
|
+ labels,
|
|
|
+ cost
|
|
|
+ FROM %s
|
|
|
+ WHERE usage_start_time >= "%s" AND usage_start_time < "%s")
|
|
|
+ LEFT JOIN UNNEST(labels) as labels
|
|
|
+ ON labels.key = "%s"
|
|
|
+ GROUP BY aggregator, environment, service;`, c.BillingDataDataset, start, end, aggregator) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
|
|
|
+ klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
|
|
|
+ gcpOOC, err := gcp.QuerySQL(queryString)
|
|
|
+ s = append(s, gcpOOC...)
|
|
|
+ qerr = err
|
|
|
+ */
|
|
|
+ queryString := fmt.Sprintf(`(SELECT
|
|
|
+ service.description as service,
|
|
|
+ TO_JSON_STRING(labels) as keys,
|
|
|
+ SUM(cost) as cost
|
|
|
+ FROM %s
|
|
|
+ WHERE
|
|
|
+ EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
|
|
|
+ AND usage_start_time >= "%s" AND usage_start_time < "%s"
|
|
|
+ GROUP BY service,keys)`, c.BillingDataDataset, aggregator, start, end)
|
|
|
+ klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
|
|
|
+ gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
|
|
|
+ s = append(s, gcpOOC...)
|
|
|
+ qerr = err
|
|
|
+ } else {
|
|
|
+ queryString := fmt.Sprintf(`(SELECT
|
|
|
+ service.description as service,
|
|
|
+ TO_JSON_STRING(labels) as keys,
|
|
|
+ SUM(cost) as cost
|
|
|
+ FROM %s
|
|
|
+ WHERE
|
|
|
+ EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "%s" AND l.value = "%s")
|
|
|
+ AND EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
|
|
|
+ AND usage_start_time >= "%s" AND usage_start_time < "%s"
|
|
|
+ GROUP BY service,keys)`, c.BillingDataDataset, filterType, filterValue, aggregator, start, end)
|
|
|
+ klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
|
|
|
+ gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
|
|
|
+ s = append(s, gcpOOC...)
|
|
|
+ qerr = err
|
|
|
+ }
|
|
|
+ if qerr != nil {
|
|
|
+ klog.Infof("Error querying gcp: %s", qerr)
|
|
|
+ }
|
|
|
+ return s, qerr
|
|
|
+}
|
|
|
+
|
|
|
+/*
|
|
|
+ (SELECT
|
|
|
+ service.description as service,
|
|
|
+ TO_JSON_STRING(labels) as gl,
|
|
|
+ SUM(cost)
|
|
|
+ FROM `guestbook-227502.billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2`
|
|
|
+ WHERE
|
|
|
+ EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "kubernetes_namespace" AND l.value = "kubecost")
|
|
|
+ AND EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key = "kubernetes_label_app")
|
|
|
+ AND usage_start_time >= "2020-02-10" AND usage_start_time < "2020-02-13"
|
|
|
+ GROUP BY service,gl)
|
|
|
+*/
|
|
|
+
|
|
|
+func (gcp *GCP) multiLabelQuery(query string, aggregators []string) ([]*OutOfClusterAllocation, error) {
|
|
|
+ c, err := gcp.Config.GetCustomPricingData()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ ctx := context.Background()
|
|
|
+ client, err := bigquery.NewClient(ctx, c.ProjectID) // For example, "guestbook-227502"
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ q := client.Query(query)
|
|
|
+ it, err := q.Read(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ var allocations []*OutOfClusterAllocation
|
|
|
+ for {
|
|
|
+ var a multiKeyGCPAllocation
|
|
|
+ err := it.Next(&a)
|
|
|
+ if err == iterator.Done {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ allocations = append(allocations, multiKeyGCPAllocationToOutOfClusterAllocation(a, aggregators))
|
|
|
+ }
|
|
|
+ return allocations, nil
|
|
|
}
|
|
|
|
|
|
// QuerySQL should query BigQuery for billing data for out of cluster costs.
|