Explorar el Código

add out of cluster filtering on AWS

AjayTripathy hace 6 años
padre
commit
8d1bf719b2
Se han modificado 6 ficheros con 46 adiciones y 18 borrados
  1. 32 10
      cloud/awsprovider.go
  2. 1 1
      cloud/azureprovider.go
  3. 1 1
      cloud/customprovider.go
  4. 1 1
      cloud/gcpprovider.go
  5. 1 1
      cloud/provider.go
  6. 10 4
      costmodel/router.go

+ 32 - 10
cloud/awsprovider.go

@@ -1068,21 +1068,43 @@ func ConvertToGlueColumnFormat(column_name string) string {
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (a *AWS) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
+func (a *AWS) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
 	customPricing, err := a.GetConfig()
 	if err != nil {
 		return nil, err
 	}
 	aggregator_column_name := "resource_tags_user_" + aggregator
 	aggregator_column_name = ConvertToGlueColumnFormat(aggregator_column_name)
-	query := fmt.Sprintf(`SELECT   
-		CAST(line_item_usage_start_date AS DATE) as start_date,
-		%s,
-		line_item_product_code,
-		SUM(line_item_blended_cost) as blended_cost
-	FROM %s as cost_data
-	WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
-	GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
+
+	filter_column_name := "resource_tags_user_" + filterType
+	filter_column_name = ConvertToGlueColumnFormat(filter_column_name)
+
+	var query string
+	var lastIdx int
+	if filterType != "kubernetes_" { // This gets appended upstream and is equivalent to no filter.
+		query = fmt.Sprintf(`SELECT   
+			CAST(line_item_usage_start_date AS DATE) as start_date,
+			%s,
+			line_item_product_code,
+			%s,
+			SUM(line_item_blended_cost) as blended_cost
+		FROM %s as cost_data
+		WHERE (%s='%s' OR %s='') AND line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+		GROUP BY 1,2,3,4`, aggregator_column_name, filter_column_name, customPricing.AthenaTable, filter_column_name, filterValue, filter_column_name, start, end)
+		lastIdx = 4
+	} else {
+		lastIdx = 3
+		query = fmt.Sprintf(`SELECT   
+			CAST(line_item_usage_start_date AS DATE) as start_date,
+			%s,
+			line_item_product_code,
+			SUM(line_item_blended_cost) as blended_cost
+		FROM %s as cost_data
+		WHERE line_item_usage_start_date BETWEEN date '%s' AND date '%s'
+		GROUP BY 1,2,3`, aggregator_column_name, customPricing.AthenaTable, start, end)
+	}
+
+	klog.V(3).Infof("Running Query: %s", query)
 
 	if customPricing.ServiceKeyName != "" {
 		err = os.Setenv(awsAccessKeyIDEnvVar, customPricing.ServiceKeyName)
@@ -1152,7 +1174,7 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregator string) (
 		if len(op.ResultSet.Rows) > 1 {
 			for _, r := range op.ResultSet.Rows[1:(len(op.ResultSet.Rows) - 1)] {
 
-				cost, err := strconv.ParseFloat(*r.Data[3].VarCharValue, 64)
+				cost, err := strconv.ParseFloat(*r.Data[lastIdx].VarCharValue, 64)
 				if err != nil {
 					return nil, err
 				}

+ 1 - 1
cloud/azureprovider.go

@@ -585,7 +585,7 @@ func (az *Azure) GetConfig() (*CustomPricing, error) {
 	return c, nil
 }
 
-func (az *Azure) ExternalAllocations(string, string, string) ([]*OutOfClusterAllocation, error) {
+func (az *Azure) ExternalAllocations(string, string, string, string, string) ([]*OutOfClusterAllocation, error) {
 	return nil, nil
 }
 

+ 1 - 1
cloud/customprovider.go

@@ -196,7 +196,7 @@ func (cp *CustomProvider) GetKey(labels map[string]string) Key {
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (*CustomProvider) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
+func (*CustomProvider) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
 	return nil, nil // TODO: transform the QuerySQL lines into the new OutOfClusterAllocation Struct
 }
 

+ 1 - 1
cloud/gcpprovider.go

@@ -196,7 +196,7 @@ func (gcp *GCP) UpdateConfig(r io.Reader, updateType string) (*CustomPricing, er
 // ExternalAllocations represents tagged assets outside the scope of kubernetes.
 // "start" and "end" are dates of the format YYYY-MM-DD
 // "aggregator" is the tag used to determine how to allocate those assets, ie namespace, pod, etc.
-func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string) ([]*OutOfClusterAllocation, error) {
+func (gcp *GCP) ExternalAllocations(start string, end string, aggregator string, filterType string, filterValue string) ([]*OutOfClusterAllocation, error) {
 	c, err := GetDefaultPricingData("gcp.json")
 	if err != nil {
 		return nil, err

+ 1 - 1
cloud/provider.go

@@ -166,7 +166,7 @@ type Provider interface {
 	GetConfig() (*CustomPricing, error)
 	GetManagementPlatform() (string, error)
 	GetLocalStorageQuery(offset string) (string, error)
-	ExternalAllocations(string, string, string) ([]*OutOfClusterAllocation, error)
+	ExternalAllocations(string, string, string, string, string) ([]*OutOfClusterAllocation, error)
 	ApplyReservedInstancePricing(map[string]*Node)
 }
 

+ 10 - 4
costmodel/router.go

@@ -415,12 +415,14 @@ func (a *Accesses) OutofClusterCosts(w http.ResponseWriter, r *http.Request, ps
 	end := r.URL.Query().Get("end")
 	aggregator := r.URL.Query().Get("aggregator")
 	customAggregation := r.URL.Query().Get("customAggregation")
+	filterType := r.URL.Query().Get("filterType")
+	filterValue := r.URL.Query().Get("filterValue")
 	var data []*costAnalyzerCloud.OutOfClusterAllocation
 	var err error
 	if customAggregation != "" {
-		data, err = a.Cloud.ExternalAllocations(start, end, customAggregation)
+		data, err = a.Cloud.ExternalAllocations(start, end, customAggregation, filterType, filterValue)
 	} else {
-		data, err = a.Cloud.ExternalAllocations(start, end, "kubernetes_"+aggregator)
+		data, err = a.Cloud.ExternalAllocations(start, end, "kubernetes_"+aggregator, "kubernetes_"+filterType, filterValue)
 	}
 	w.Write(WrapData(data, err))
 }
@@ -444,7 +446,11 @@ func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Req
 	// then recompute and cache the requested data
 	clearCache := r.URL.Query().Get("clearCache") == "true"
 
+	filterType := r.URL.Query().Get("filterType")
+	filterValue := r.URL.Query().Get("filterValue")
+
 	aggregation := "kubernetes_" + kubernetesAggregation
+	filterType = "kubernetes_" + filterType
 	if customAggregation != "" {
 		aggregation = customAggregation
 	}
@@ -456,7 +462,7 @@ func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Req
 	}
 
 	// attempt to retrieve cost data from cache
-	key := fmt.Sprintf(`%s:%s:%s`, start, end, aggregation)
+	key := fmt.Sprintf(`%s:%s:%s:%s:%s`, start, end, aggregation, filterType, filterValue)
 	if value, found := a.OutOfClusterCache.Get(key); found && !disableCache {
 		if data, ok := value.([]*costAnalyzerCloud.OutOfClusterAllocation); ok {
 			w.Write(WrapDataWithMessage(data, nil, fmt.Sprintf("out of cluser cache hit: %s", key)))
@@ -465,7 +471,7 @@ func (a *Accesses) OutOfClusterCostsWithCache(w http.ResponseWriter, r *http.Req
 		klog.Errorf("caching error: failed to type cast data: %s", key)
 	}
 
-	data, err := a.Cloud.ExternalAllocations(start, end, aggregation)
+	data, err := a.Cloud.ExternalAllocations(start, end, aggregation, filterType, filterValue)
 	if err == nil {
 		a.OutOfClusterCache.Set(key, data, cache.DefaultExpiration)
 	}