Просмотр исходного кода

support multicloud, multilabel

AjayTripathy 6 лет назад
Родитель
Сommit
0ebe474ec0
8 измененных файлов с 182 добавлено и 49 удалено
  1. 46 13
      cloud/awsprovider.go
  2. 1 1
      cloud/azureprovider.go
  3. 1 1
      cloud/customprovider.go
  4. 84 18
      cloud/gcpprovider.go
  5. 15 1
      cloud/provider.go
  6. 2 2
      costmodel/metrics.go
  7. 8 1
      costmodel/promparsers.go
  8. 25 12
      costmodel/router.go

+ 46 - 13
cloud/awsprovider.go

@@ -303,7 +303,7 @@ func (aws *AWS) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			c.AthenaTable = a.AthenaTable
 			c.AthenaTable = a.AthenaTable
 			c.ServiceKeyName = a.ServiceKeyName
 			c.ServiceKeyName = a.ServiceKeyName
 			c.ServiceKeySecret = a.ServiceKeySecret
 			c.ServiceKeySecret = a.ServiceKeySecret
-			c.ProjectID = a.AccountID
+			c.AthenaProjectID = a.AccountID
 		} else {
 		} else {
 			a := make(map[string]interface{})
 			a := make(map[string]interface{})
 			err := json.NewDecoder(r).Decode(&a)
 			err := json.NewDecoder(r).Decode(&a)
@@ -1056,16 +1056,29 @@ func ConvertToGlueColumnFormat(column_name string) string {
 	return final
 	return final
 }
 }
 
 
+func generateAWSGroupBy(lastIdx int) string {
+	sequence := []string{}
+	for i := 1; i < lastIdx+1; i++ {
+		sequence = append(sequence, strconv.Itoa(i))
+	}
+	return strings.Join(sequence, ",")
+}
+
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (a *AWS) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
+func (a *AWS) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
 	customPricing, err := a.GetConfig()
 	customPricing, err := a.GetConfig()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	aggregator_column_name := "resource_tags_user_" + aggregator
-	aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
+	formattedAggregators := []string{}
+	for _, agg := range aggregators {
+		aggregator_column_name := "resource_tags_user_" + agg
+		aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
+		formattedAggregators = append(formattedAggregators, aggregator_column_name)
+	}
+	aggregatorNames := strings.Join(formattedAggregators, ",")
 
 
 	filter_column_name := "resource_tags_user_" + filterType
 	filter_column_name := "resource_tags_user_" + filterType
 	filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
 	filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
@@ -1073,6 +1086,8 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregator string, f
 	var query string
 	var query string
 	var lastIdx int
 	var lastIdx int
 	if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
 	if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
+		lastIdx = len(formattedAggregators) + 3
+		groupby := generateAWSGroupBy(lastIdx)
 		query = fmt.Sprintf(`SELECT   
 		query = fmt.Sprintf(`SELECT   
 			CAST(line_item_usage_start_date AS DATE) as start_date,
 			CAST(line_item_usage_start_date AS DATE) as start_date,
 			%s,
 			%s,
@@ -1081,10 +1096,10 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregator string, f
 			SUM(line_item_blended_cost) as blended_cost
 			SUM(line_item_blended_cost) as blended_cost
 		FROM %s as cost_data
 		FROM %s as cost_data
 		WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s'
 		WHERE (%s='%s') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s'
-		GROUP BY 1,2,3,4`, aggregator_column_name, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end)
-		lastIdx = 4
+		GROUP BY %s`, aggregatorNames, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, start, end, groupby)
 	} else {
 	} else {
-		lastIdx = 3
+		lastIdx = len(formattedAggregators) + 2
+		groupby := generateAWSGroupBy(lastIdx)
 		query = fmt.Sprintf(`SELECT   
 		query = fmt.Sprintf(`SELECT   
 			CAST(line_item_usage_start_date AS DATE) as start_date,
 			CAST(line_item_usage_start_date AS DATE) as start_date,
 			%s,
 			%s,
@@ -1092,7 +1107,7 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregator string, f
 			SUM(line_item_blended_cost) as blended_cost
 			SUM(line_item_blended_cost) as blended_cost
 		FROM %s as cost_data
 		FROM %s as cost_data
 		WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
 		WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
-		GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
+		GROUP BY %s`, aggregatorNames, customPricing.AthenaTable, start, end, groupby)
 	}
 	}
 
 
 	klog.V(3).Infof("Running Query: %s", query)
 	klog.V(3).Infof("Running Query: %s", query)
@@ -1110,7 +1125,6 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregator string, f
 	region := aws.String(customPricing.AthenaRegion)
 	region := aws.String(customPricing.AthenaRegion)
 	resultsBucket := customPricing.AthenaBucketName
 	resultsBucket := customPricing.AthenaBucketName
 	database := customPricing.AthenaDatabase
 	database := customPricing.AthenaDatabase
-
 	c := &aws.Config{
 	c := &aws.Config{
 		Region: region,
 		Region: region,
 	}
 	}
@@ -1169,17 +1183,36 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregator string, f
 				if err != nil {
 				if err != nil {
 					return nil, err
 					return nil, err
 				}
 				}
+				environment := ""
+				for _, d := range r.Data[1 : len(formattedAggregators)+1] {
+					if *d.VarCharValue != "" {
+						environment = *d.VarCharValue // just set to the first nonempty match
+					}
+					break
+				}
 				ooc := &OutOfClusterAllocation{
 				ooc := &OutOfClusterAllocation{
-					Aggregator:  aggregator,
-					Environment: *r.Data[1].VarCharValue,
-					Service:     *r.Data[2].VarCharValue,
+					Aggregator:  strings.Join(aggregators, ","),
+					Environment: environment,
+					Service:     *r.Data[len(formattedAggregators)+1].VarCharValue,
 					Cost:        cost,
 					Cost:        cost,
 				}
 				}
 				oocAllocs = append(oocAllocs, ooc)
 				oocAllocs = append(oocAllocs, ooc)
 			}
 			}
 		} else {
 		} else {
-			klog.V(1).Infof("No results available for %s at database %s between %s and %s", aggregator_column_name, customPricing.AthenaTable, start, end)
+			klog.V(1).Infof("No results available for %s at database %s between %s and %s", strings.Join(formattedAggregators, ","), customPricing.AthenaTable, start, end)
+		}
+	}
+
+	if customPricing.BillingDataDataset != "" { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation
+		gcp, err := NewCrossClusterProvider("aws", "aws.json", a.Clientset)
+		if err != nil {
+			klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
+		}
+		gcpOOC, err := gcp.ExternalAllocations(start, end, aggregators, filterType, filterValue)
+		if err != nil {
+			klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
 		}
 		}
+		oocAllocs = append(oocAllocs, gcpOOC...)
 	}
 	}
 
 
 	return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
 	return oocAllocs, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct

+ 1 - 1
cloud/azureprovider.go

@@ -608,7 +608,7 @@ func (az *Azure) GetConfig() (*CustomPricing, error) {
 	return c, nil
 	return c, nil
 }
 }
 
 
-func (az *Azure) ExternalAllocations(string, string, string, string, string) ([]*OutOfClusterAllocation, error) {
+func (az *Azure) ExternalAllocations(string, string, []string, string, string) ([]*OutOfClusterAllocation, error) {
 	return nil, nil
 	return nil, nil
 }
 }
 
 

+ 1 - 1
cloud/customprovider.go

@@ -191,7 +191,7 @@ func (cp *CustomProvider) GetKey(labels map[string]string) Key {
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (*CustomProvider) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
+func (*CustomProvider) ExternalAllocations(start string, end string, aggregator []string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
 	return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
 	return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
 }
 }
 
 

+ 84 - 18
cloud/gcpprovider.go

@@ -68,23 +68,31 @@ type multiKeyGCPAllocation struct {
 	Cost    float64
 	Cost    float64
 }
 }
 
 
-func multiKeyGCPAllocationToOutOfClusterAllocation(gcpAlloc multiKeyGCPAllocation, aggregatorName string) *OutOfClusterAllocation {
+func multiKeyGCPAllocationToOutOfClusterAllocation(gcpAlloc multiKeyGCPAllocation, aggregatorNames []string) *OutOfClusterAllocation {
 	var keys []map[string]string
 	var keys []map[string]string
 	var environment string
 	var environment string
+	var usedAggregatorName string
 	if gcpAlloc.Keys.Valid {
 	if gcpAlloc.Keys.Valid {
+		klog.Infof("VALID!!!!!!!!")
 		err := json.Unmarshal([]byte(gcpAlloc.Keys.StringVal), &keys)
 		err := json.Unmarshal([]byte(gcpAlloc.Keys.StringVal), &keys)
 		if err != nil {
 		if err != nil {
 			klog.Infof("Invalid unmarshaling response from BigQuery filtered query: %s", err.Error())
 			klog.Infof("Invalid unmarshaling response from BigQuery filtered query: %s", err.Error())
 		}
 		}
+	keyloop:
 		for _, label := range keys {
 		for _, label := range keys {
-			if label["key"] == aggregatorName {
-				environment = label["value"]
+			for _, aggregatorName := range aggregatorNames {
+				if label["key"] == aggregatorName {
+					environment = label["value"]
+					usedAggregatorName = label["key"]
+					klog.Infof("ENVIRONMENT: %s", environment)
+					klog.Infof("AGGREGATOR NAME: %s", usedAggregatorName)
+					break keyloop
+				}
 			}
 			}
-			break
 		}
 		}
 	}
 	}
 	return &OutOfClusterAllocation{
 	return &OutOfClusterAllocation{
-		Aggregator:  aggregatorName,
+		Aggregator:  usedAggregatorName,
 		Environment: environment,
 		Environment: environment,
 		Service:     gcpAlloc.Service,
 		Service:     gcpAlloc.Service,
 		Cost:        gcpAlloc.Cost,
 		Cost:        gcpAlloc.Cost,
@@ -200,6 +208,19 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
+		} else if updateType == AthenaInfoUpdateType {
+			a := AwsAthenaInfo{}
+			err := json.NewDecoder(r).Decode(&a)
+			if err != nil {
+				return err
+			}
+			c.AthenaBucketName = a.AthenaBucketName
+			c.AthenaRegion = a.AthenaRegion
+			c.AthenaDatabase = a.AthenaDatabase
+			c.AthenaTable = a.AthenaTable
+			c.ServiceKeyName = a.ServiceKeyName
+			c.ServiceKeySecret = a.ServiceKeySecret
+			c.AthenaProjectID = a.AccountID
 		} else {
 		} else {
 			a := make(map[string]interface{})
 			a := make(map[string]interface{})
 			err := json.NewDecoder(r).Decode(&a)
 			err := json.NewDecoder(r).Decode(&a)
@@ -240,42 +261,87 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
+func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
 	c, err := gcp.Config.GetCustomPricingData()
 	c, err := gcp.Config.GetCustomPricingData()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
-	if filterType == "" {
+	var s []*OutOfClusterAllocation
+	if c.ServiceKeyName != "" && c.ServiceKeySecret != "" {
+		aws, err := NewCrossClusterProvider("aws", "gcp.json", gcp.Clientset)
+		if err != nil {
+			klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
+		}
+		awsOOC, err := aws.ExternalAllocations(start, end, aggregators, filterType, filterValue)
+		if err != nil {
+			klog.Infof("Could not fetch cross-cluster costs %s", err.Error())
+		}
+		s = append(s, awsOOC...)
+	}
+
+	formattedAggregators := []string{}
+	for _, a := range aggregators {
+		formattedAggregators = append(formattedAggregators, strconv.Quote(a))
+	}
+
+	aggregator := strings.Join(formattedAggregators, ",")
+
+	var qerr error
+	if filterType == "kubernetes_" {
 		// start, end formatted like: "2019-04-20 00:00:00"
 		// start, end formatted like: "2019-04-20 00:00:00"
+		/* OLD METHOD: supported getting all data, including unaggregated.
 		queryString := fmt.Sprintf(`SELECT
 		queryString := fmt.Sprintf(`SELECT
 						service,
 						service,
 						labels.key as aggregator,
 						labels.key as aggregator,
 						labels.value as environment,
 						labels.value as environment,
 						SUM(cost) as cost
 						SUM(cost) as cost
-						FROM  (SELECT 
+						FROM  (SELECT
 								service.description as service,
 								service.description as service,
 								labels,
 								labels,
-								cost 
+								cost
 							FROM %s
 							FROM %s
 							WHERE usage_start_time >= "%s" AND usage_start_time < "%s")
 							WHERE usage_start_time >= "%s" AND usage_start_time < "%s")
 							LEFT JOIN UNNEST(labels) as labels
 							LEFT JOIN UNNEST(labels) as labels
 							ON labels.key = "%s"
 							ON labels.key = "%s"
 					GROUP BY aggregator, environment, service;`, c.BillingDataDataset, start, end, aggregator) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
 					GROUP BY aggregator, environment, service;`, c.BillingDataDataset, start, end, aggregator) // For example, "billing_data.gcp_billing_export_v1_01AC9F_74CF1D_5565A2"
-		klog.V(4).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
-		return gcp.QuerySQL(queryString)
+		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
+		gcpOOC, err := gcp.QuerySQL(queryString)
+		s = append(s, gcpOOC...)
+		qerr = err
+		*/
+		queryString := fmt.Sprintf(`(SELECT
+			service.description as service,
+			TO_JSON_STRING(labels) as keys,
+			SUM(cost) as cost
+		  	FROM  %s
+		 	WHERE
+				EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
+				AND usage_start_time >= "%s" AND usage_start_time < "%s"
+			GROUP BY  service,keys)`, c.BillingDataDataset, aggregator, start, end)
+		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
+		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
+		s = append(s, gcpOOC...)
+		qerr = err
 	} else {
 	} else {
 		queryString := fmt.Sprintf(`(SELECT
 		queryString := fmt.Sprintf(`(SELECT
 			service.description as service,
 			service.description as service,
-			TO_JSON_STRING(labels) as gl,
-			SUM(cost)
+			TO_JSON_STRING(labels) as keys,
+			SUM(cost) as cost
 		  	FROM  %s
 		  	FROM  %s
 		 	WHERE
 		 	WHERE
 				EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "%s" AND l.value = "%s")
 				EXISTS (SELECT * FROM UNNEST(labels) AS l WHERE l.key = "%s" AND l.value = "%s")
-				AND EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key = "%s")
+				AND EXISTS(SELECT * FROM UNNEST(labels) AS l2 WHERE l2.key IN (%s))
 				AND usage_start_time >= "%s" AND usage_start_time < "%s"
 				AND usage_start_time >= "%s" AND usage_start_time < "%s"
-			GROUP BY  service,gl)`, c.BillingDataDataset, filterType, filterValue, aggregator, start, end)
-		return gcp.multiLabelQuery(queryString, aggregator)
+			GROUP BY  service,keys)`, c.BillingDataDataset, filterType, filterValue, aggregator, start, end)
+		klog.V(3).Infof("Querying \"%s\" with : %s", c.ProjectID, queryString)
+		gcpOOC, err := gcp.multiLabelQuery(queryString, aggregators)
+		s = append(s, gcpOOC...)
+		qerr = err
+	}
+	if qerr != nil {
+		klog.Infof("Error querying gcp: %s", qerr)
 	}
 	}
+	return s, qerr
 }
 }
 
 
 /*
 /*
@@ -291,7 +357,7 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string,
 	GROUP BY  service,gl)
 	GROUP BY  service,gl)
 */
 */
 
 
-func (gcp *GCP) multiLabelQuery(query, aggregator string) ([]*OutOfClusterAllocation, error) {
+func (gcp *GCP) multiLabelQuery(query string, aggregators []string) ([]*OutOfClusterAllocation, error) {
 	c, err := gcp.Config.GetCustomPricingData()
 	c, err := gcp.Config.GetCustomPricingData()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -317,7 +383,7 @@ func (gcp *GCP) multiLabelQuery(query, aggregator string) ([]*OutOfClusterAlloca
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		allocations = append(allocations, multiKeyGCPAllocationToOutOfClusterAllocation(a, aggregator))
+		allocations = append(allocations, multiKeyGCPAllocationToOutOfClusterAllocation(a, aggregators))
 	}
 	}
 	return allocations, nil
 	return allocations, nil
 }
 }

+ 15 - 1
cloud/provider.go

@@ -129,6 +129,7 @@ type CustomPricing struct {
 	SpotDataBucket        string            `json:"awsSpotDataBucket,omitempty"`
 	SpotDataBucket        string            `json:"awsSpotDataBucket,omitempty"`
 	SpotDataPrefix        string            `json:"awsSpotDataPrefix,omitempty"`
 	SpotDataPrefix        string            `json:"awsSpotDataPrefix,omitempty"`
 	ProjectID             string            `json:"projectID,omitempty"`
 	ProjectID             string            `json:"projectID,omitempty"`
+	AthenaProjectID       string            `json:"athenaProjectID,omitempty"`
 	AthenaBucketName      string            `json:"athenaBucketName"`
 	AthenaBucketName      string            `json:"athenaBucketName"`
 	AthenaRegion          string            `json:"athenaRegion"`
 	AthenaRegion          string            `json:"athenaRegion"`
 	AthenaDatabase        string            `json:"athenaDatabase"`
 	AthenaDatabase        string            `json:"athenaDatabase"`
@@ -169,7 +170,7 @@ type Provider interface {
 	GetConfig() (*CustomPricing, error)
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
 	GetManagementPlatform() (string, error)
 	GetLocalStorageQuery(string, string, bool) string
 	GetLocalStorageQuery(string, string, bool) string
-	ExternalAllocations(string, string, string, string, string) ([]*OutOfClusterAllocation, error)
+	ExternalAllocations(string, string, []string, string, string) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 	ApplyReservedInstancePricing(map[string]*Node)
 }
 }
 
 
@@ -203,6 +204,19 @@ func CustomPricesEnabled(p Provider) bool {
 	return config.CustomPricesEnabled == "true"
 	return config.CustomPricesEnabled == "true"
 }
 }
 
 
+func NewCrossClusterProvider(ctype string, overrideConfigPath string, cache clustercache.ClusterCache) (Provider, error) {
+	if ctype == "aws" {
+		return &AWS{
+			Clientset: cache,
+			Config:    NewProviderConfig(overrideConfigPath),
+		}, nil
+	}
+	return &CustomProvider{
+		Clientset: cache,
+		Config:    NewProviderConfig(overrideConfigPath),
+	}, nil
+}
+
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
 func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
 func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
 	if metadata.OnGCE() {
 	if metadata.OnGCE() {

+ 2 - 2
costmodel/metrics.go

@@ -23,13 +23,13 @@ func kubeLabelsToPrometheusLabels(labels map[string]string) ([]string, []string)
 
 
 	labelValues := make([]string, 0, len(labels))
 	labelValues := make([]string, 0, len(labels))
 	for i, k := range labelKeys {
 	for i, k := range labelKeys {
-		labelKeys[i] = "label_" + sanitizeLabelName(k)
+		labelKeys[i] = "label_" + SanitizeLabelName(k)
 		labelValues = append(labelValues, labels[k])
 		labelValues = append(labelValues, labels[k])
 	}
 	}
 	return labelKeys, labelValues
 	return labelKeys, labelValues
 }
 }
 
 
-func sanitizeLabelName(s string) string {
+func SanitizeLabelName(s string) string {
 	return invalidLabelCharRE.ReplaceAllString(s, "_")
 	return invalidLabelCharRE.ReplaceAllString(s, "_")
 }
 }
 
 

+ 8 - 1
costmodel/promparsers.go

@@ -343,7 +343,14 @@ func GetPodLabelsMetrics(queryResult interface{}, defaultClusterID string) (map[
 		}
 		}
 
 
 		nsKey := ns + "," + pod + "," + clusterID
 		nsKey := ns + "," + pod + "," + clusterID
-		toReturn[nsKey] = val.GetLabels()
+		if labels, ok := toReturn[nsKey]; ok {
+			newlabels := val.GetLabels()
+			for k, v := range newlabels {
+				labels[k] = v
+			}
+		} else {
+			toReturn[nsKey] = val.GetLabels()
+		}
 	}
 	}
 
 
 	return toReturn, nil
 	return toReturn, nil

+ 25 - 12
costmodel/router.go

@@ -422,6 +422,26 @@ func (a *Accesses) CostDataModelRangeLarge(w http.ResponseWriter, r *http.Reques
 	w.Write(WrapData(data, err))
 	w.Write(WrapData(data, err))
 }
 }
 
 
+func parseAggregations(customAggregation, aggregator, filterType string) (string, []string, string) {
+	var key string
+	var filter string
+	var val []string
+	if customAggregation != "" {
+		key = customAggregation
+		filter = filterType
+		val = strings.Split(customAggregation, ",")
+	} else {
+		aggregations := strings.Split(aggregator, ",")
+		for i, agg := range aggregations {
+			aggregations[i] = "kubernetes_" + agg
+		}
+		key = strings.Join(aggregations, ",")
+		filter = "kubernetes_" + filterType
+		val = aggregations
+	}
+	return key, val, filter
+}
+
 func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -434,11 +454,8 @@ func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps
 	filterValue := r.URL.Query().Get("filterValue")
 	filterValue := r.URL.Query().Get("filterValue")
 	var data []*costAnalyzerCloud.OutOfClusterAllocation
 	var data []*costAnalyzerCloud.OutOfClusterAllocation
 	var err error
 	var err error
-	if customAggregation != "" {
-		data, err = a.Cloud.ExternalAllocations(start, end, customAggregation, filterType, filterValue)
-	} else {
-		data, err = a.Cloud.ExternalAllocations(start, end, "kubernetes_"+aggregator, "kubernetes_"+filterType, filterValue)
-	}
+	_, aggregations, filter := parseAggregations(customAggregation, aggregator, filterType)
+	data, err = a.Cloud.ExternalAllocations(start, end, aggregations, filter, filterValue)
 	w.Write(WrapData(data, err))
 	w.Write(WrapData(data, err))
 }
 }
 
 
@@ -464,11 +481,7 @@ func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Req
 	filterType := r.URL.Query().Get("filterType")
 	filterType := r.URL.Query().Get("filterType")
 	filterValue := r.URL.Query().Get("filterValue")
 	filterValue := r.URL.Query().Get("filterValue")
 
 
-	aggregation := "kubernetes_" + kubernetesAggregation
-	filterType = "kubernetes_" + filterType
-	if customAggregation != "" {
-		aggregation = customAggregation
-	}
+	aggregationkey, aggregation, filter := parseAggregations(customAggregation, kubernetesAggregation, filterType)
 
 
 	// clear cache prior to checking the cache so that a clearCache=true
 	// clear cache prior to checking the cache so that a clearCache=true
 	// request always returns a freshly computed value
 	// request always returns a freshly computed value
@@ -477,7 +490,7 @@ func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Req
 	}
 	}
 
 
 	// attempt to retrieve cost data from cache
 	// attempt to retrieve cost data from cache
-	key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregation, filterType, filterValue)
+	key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregationkey, filter, filterValue)
 	if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
 	if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
 		if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
 		if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
 			w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
 			w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluster cache hit: %s", key)))
@@ -486,7 +499,7 @@ func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Req
 		klog.Errorf("caching error: failed to type cast data: %s", key)
 		klog.Errorf("caching error: failed to type cast data: %s", key)
 	}
 	}
 
 
-	data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filterType, filterValue)
+	data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filter, filterValue)
 	if err == nil {
 	if err == nil {
 		a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
 		a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
 	}
 	}