فهرست منبع

Migrate Agg API WIP: create CostModelAggregator interface for ability to substitute ETL adapter

Niko Kovacevic 5 سال پیش
والد
کامیت
5162c78fee
2فایلهای تغییر یافته به همراه25 افزوده شده و 17 حذف شده
  1. 13 17
      pkg/costmodel/aggregation.go
  2. 12 0
      pkg/costmodel/router.go

+ 13 - 17
pkg/costmodel/aggregation.go

@@ -994,9 +994,7 @@ func compressVectorSeries(vs []*util.Vector, resolutionHours float64) []*util.Ve
 
 
 // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
 // ComputeAggregateCostModel computes cost data for the given window, then aggregates it by the given fields.
 // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
 // Data is cached on two levels: the aggregation is cached as well as the underlying cost data.
-func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string, subfields []string, rate string, filters map[string]string,
-	sri *SharedResourceInfo, shared string, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, disableSharedOverhead bool) (map[string]*Aggregation, string, error) {
-
+func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string, subfields []string, rate string, filters map[string]string, sri *SharedResourceInfo, shared string, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, disableSharedOverhead, useETLAdapter bool) (map[string]*Aggregation, string, error) {
 	profileBaseName := fmt.Sprintf("ComputeAggregateCostModel(duration=%s, offet=%s, field=%s)", duration, offset, field)
 	profileBaseName := fmt.Sprintf("ComputeAggregateCostModel(duration=%s, offet=%s, field=%s)", duration, offset, field)
 	defer measureTime(time.Now(), profileThreshold, profileBaseName)
 	defer measureTime(time.Now(), profileThreshold, profileBaseName)
 
 
@@ -1251,8 +1249,7 @@ func (a *Accesses) ComputeAggregateCostModel(promClient prometheusClient.Client,
 		if !ok {
 		if !ok {
 			// disable cache and recompute if type cast fails
 			// disable cache and recompute if type cast fails
 			klog.Errorf("caching error: failed to cast aggregate data to struct: %s", aggKey)
 			klog.Errorf("caching error: failed to cast aggregate data to struct: %s", aggKey)
-			return a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
-				sri, shared, allocateIdle, includeTimeSeries, includeEfficiency, true, false, noExpireCache, noCache, remoteEnabled, disableSharedOverhead)
+			return a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters, sri, shared, allocateIdle, includeTimeSeries, includeEfficiency, true, false, noExpireCache, noCache, remoteEnabled, disableSharedOverhead, useETLAdapter)
 		}
 		}
 		return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
 		return result, fmt.Sprintf("aggregate cache hit: %s", aggKey), nil
 	}
 	}
@@ -1587,6 +1584,13 @@ func GenerateAggKey(ps aggKeyParams) string {
 		ps.sri, ps.shareType, ps.idle, ps.timeSeries, ps.efficiency)
 		ps.sri, ps.shareType, ps.idle, ps.timeSeries, ps.efficiency)
 }
 }
 
 
+type CostModelAggregator interface {
+	ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string,
+		subfields []string, rate string, filters map[string]string, sri *SharedResourceInfo, shared string,
+		allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache,
+		remoteEnabled, disableSharedOverhead, useETLAdapter bool) (map[string]*Aggregation, string, error)
+}
+
 // AggregateCostModelHandler handles requests to the aggregated cost model API. See
 // AggregateCostModelHandler handles requests to the aggregated cost model API. See
 // ComputeAggregateCostModel for details.
 // ComputeAggregateCostModel for details.
 func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
@@ -1770,22 +1774,14 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 
 
 	promClient := a.GetPrometheusClient(remote)
 	promClient := a.GetPrometheusClient(remote)
 
 
+	useETLAdapter := r.URL.Query().Get("etl") == "true"
+
 	var data map[string]*Aggregation
 	var data map[string]*Aggregation
 	var message string
 	var message string
-
-	// etlEnabled := env.IsETLEnabled()
-	// useETLAdapter := r.URL.Query().Get("etl") == "true"
-	// if etlEnabled && useETLAdapter {
-	// 	data, message, err = a.AdaptETLAggregateCostModel(window, field, subfields, rate, filters, sr, shared, allocateIdle, includeTimeSeries)
-	// } else {
-	// 	data, message, err = a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
-	// 		sr, shared, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, false)
-	// }
-	data, message, err = a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
-		sr, shared, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, false)
+	data, message, err = a.AggAPI.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters, sr, shared, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, false, useETLAdapter)
 
 
 	// Find any warnings in http request context
 	// Find any warnings in http request context
-	warning, _ := product.GetWarning(r)
+	warning, _ := GetWarning(r)
 
 
 	if err != nil {
 	if err != nil {
 		if emptyErr, ok := err.(*EmptyDataError); ok {
 		if emptyErr, ok := err.(*EmptyDataError); ok {

+ 12 - 0
pkg/costmodel/router.go

@@ -85,6 +85,7 @@ type Accesses struct {
 	CostDataCache                 *cache.Cache
 	CostDataCache                 *cache.Cache
 	ClusterCostsCache             *cache.Cache
 	ClusterCostsCache             *cache.Cache
 	CacheExpiration               map[string]time.Duration
 	CacheExpiration               map[string]time.Duration
+	AggAPI                        CostModelAggregator
 }
 }
 
 
 // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
 // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
@@ -400,6 +401,16 @@ func WrapDataWithMessageAndWarning(data interface{}, err error, message, warning
 	return resp
 	return resp
 }
 }
 
 
+const (
+	ContextWarning string = "Warning"
+)
+
+// GetWarning Extracts a warning message from the request context if it exists
+func GetWarning(r *http.Request) (warning string, ok bool) {
+	warning, ok = r.Context().Value(ContextWarning).(string)
+	return
+}
+
 // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
 // RefreshPricingData needs to be called when a new node joins the fleet, since we cache the relevant subsets of pricing data to avoid storing the whole thing.
 func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 func (a *Accesses) RefreshPricingData(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
@@ -1215,6 +1226,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		ClusterCostsCache:             clusterCostsCache,
 		ClusterCostsCache:             clusterCostsCache,
 		CacheExpiration:               cacheExpiration,
 		CacheExpiration:               cacheExpiration,
 	}
 	}
+	a.AggAPI = &a
 
 
 	err = a.CloudProvider.DownloadPricingData()
 	err = a.CloudProvider.DownloadPricingData()
 	if err != nil {
 	if err != nil {