Переглянути джерело

Migrate Agg API: move env to open source; re-implement agg API cache warming

Niko Kovacevic 5 роки тому
батько
коміт
c78409b8b7
4 змінених файлів з 243 додано та 169 видалено
  1. 60 0
      pkg/cloud/provider.go
  2. 163 0
      pkg/costmodel/aggregation.go
  3. 9 169
      pkg/costmodel/router.go
  4. 11 0
      pkg/env/costmodelenv.go

+ 60 - 0
pkg/cloud/provider.go

@@ -253,6 +253,66 @@ func CustomPricesEnabled(p Provider) bool {
 	return config.CustomPricesEnabled == "true"
 }
 
+// AllocateIdleByDefault returns true if the application settings specify to allocate idle by default
+func AllocateIdleByDefault(p Provider) bool {
+	config, err := p.GetConfig()
+	if err != nil {
+		return false
+	}
+
+	return config.DefaultIdle == "true"
+}
+
+// SharedNamespace returns a list of names of shared namespaces, as defined in the application settings
+func SharedNamespaces(p Provider) []string {
+	namespaces := []string{}
+
+	config, err := p.GetConfig()
+	if err != nil {
+		return namespaces
+	}
+	if config.SharedNamespaces == "" {
+		return namespaces
+	}
+	// trim spaces so that "kube-system, kubecost" is equivalent to "kube-system,kubecost"
+	for _, ns := range strings.Split(config.SharedNamespaces, ",") {
+		namespaces = append(namespaces, strings.Trim(ns, " "))
+	}
+
+	return namespaces
+}
+
+// SharedLabel returns the configured set of shared labels as a parallel tuple of keys to values; e.g.
+// for app:kubecost,type:staging this returns (["app", "type"], ["kubecost", "staging"]) in order to
+// match the signature of the NewSharedResourceInfo
+func SharedLabels(p Provider) ([]string, []string) {
+	names := []string{}
+	values := []string{}
+
+	config, err := p.GetConfig()
+	if err != nil {
+		return names, values
+	}
+
+	if config.SharedLabelNames == "" || config.SharedLabelValues == "" {
+		return names, values
+	}
+
+	ks := strings.Split(config.SharedLabelNames, ",")
+	vs := strings.Split(config.SharedLabelValues, ",")
+	if len(ks) != len(vs) {
+		klog.V(2).Infof("[Warning] shared labels have mis-matched lengths: %d names, %d values", len(ks), len(vs))
+		return names, values
+	}
+
+	for i := range ks {
+		names = append(names, strings.Trim(ks[i], " "))
+		values = append(values, strings.Trim(vs[i], " "))
+	}
+
+	return names, values
+}
+
 func NewCrossClusterProvider(ctype string, overrideConfigPath string, cache clustercache.ClusterCache) (Provider, error) {
 	if ctype == "aws" {
 		return &AWS{

+ 163 - 0
pkg/costmodel/aggregation.go

@@ -13,6 +13,7 @@ import (
 	"github.com/julienschmidt/httprouter"
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/kubecost"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
@@ -32,6 +33,8 @@ const (
 	// chosen Aggregator; e.g. during aggregation by some label, there may be
 	// cost data that do not have the given label.
 	UnallocatedSubfield = "__unallocated__"
+
+	clusterCostsCacheMinutes = 5.0
 )
 
 // Aggregation describes aggregated cost data, containing cumulative cost and
@@ -1595,6 +1598,166 @@ type Aggregator interface {
 	ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string, subfields []string, rate string, filters map[string]string, sri *SharedResourceInfo, shared string, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, disableSharedOverhead, useETLAdapter bool) (map[string]*Aggregation, string, error)
 }
 
+func (a *Accesses) warmAggregateCostModelCache() {
+	// Only allow one concurrent cache-warming operation
+	sem := util.NewSemaphore(1)
+
+	// Set default values, pulling them from application settings where applicable, and warm the cache
+	// for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
+	// if the default parameters change, the old cached defaults with eventually expire. Thus, the
+	// timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
+	warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
+		field := "namespace"
+		subfields := []string{}
+		rate := ""
+		filters := map[string]string{}
+		includeTimeSeries := false
+		includeEfficiency := true
+		disableCache := true
+		clearCache := false
+		noCache := false
+		noExpireCache := false
+		remote := true
+		shareSplit := "weighted"
+		remoteAvailable := env.IsRemoteEnabled()
+		remoteEnabled := remote && remoteAvailable
+		promClient := a.GetPrometheusClient(remote)
+		allocateIdle := cloud.AllocateIdleByDefault(a.CloudProvider)
+
+		sharedNamespaces := cloud.SharedNamespaces(a.CloudProvider)
+		sharedLabelNames, sharedLabelValues := cloud.SharedLabels(a.CloudProvider)
+
+		var sri *SharedResourceInfo
+		if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
+			sri = NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
+		}
+
+		aggKey := GenerateAggKey(aggKeyParams{
+			duration:   duration,
+			offset:     offset,
+			filters:    filters,
+			field:      field,
+			subfields:  subfields,
+			rate:       rate,
+			sri:        sri,
+			shareType:  shareSplit,
+			idle:       allocateIdle,
+			timeSeries: includeTimeSeries,
+			efficiency: includeEfficiency,
+		})
+		log.Infof("aggregation: cache warming defaults: %s", aggKey)
+		key := fmt.Sprintf("%s:%s", durationHrs, offset)
+
+		_, _, aggErr := a.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
+			sri, shareSplit, allocateIdle, includeTimeSeries, includeEfficiency, disableCache,
+			clearCache, noCache, noExpireCache, remoteEnabled, false, false)
+		if aggErr != nil {
+			log.Infof("Error building cache %s: %s", key, aggErr)
+		}
+		if a.ThanosClient != nil {
+			offset = thanos.Offset()
+			log.Infof("Setting offset to %s", offset)
+		}
+		totals, err := a.ComputeClusterCosts(promClient, a.CloudProvider, durationHrs, offset, cacheEfficiencyData)
+		if err != nil {
+			log.Infof("Error building cluster costs cache %s", key)
+		}
+		maxMinutesWithData := 0.0
+		for _, cluster := range totals {
+			if cluster.DataMinutes > maxMinutesWithData {
+				maxMinutesWithData = cluster.DataMinutes
+			}
+		}
+		if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheMinutes {
+			a.ClusterCostsCache.Set(key, totals, a.GetCacheExpiration(duration))
+			log.Infof("caching %s cluster costs for %s", duration, a.GetCacheExpiration(duration))
+		} else {
+			log.Warningf("not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheMinutes)
+		}
+		return aggErr, err
+	}
+
+	// 1 day
+	go func(sem *util.Semaphore) {
+		defer errors.HandlePanic()
+
+		for {
+			duration := "1d"
+			offset := "1m"
+			durHrs := "24h"
+
+			sem.Acquire()
+			warmFunc(duration, durHrs, offset, true)
+			sem.Return()
+
+			log.Infof("aggregation: warm cache: %s", duration)
+			time.Sleep(a.GetCacheRefresh(duration))
+		}
+	}(sem)
+
+	// 2 day
+	go func(sem *util.Semaphore) {
+		defer errors.HandlePanic()
+
+		for {
+			duration := "2d"
+			offset := "1m"
+			durHrs := "48h"
+
+			sem.Acquire()
+			warmFunc(duration, durHrs, offset, false)
+			sem.Return()
+
+			log.Infof("aggregation: warm cache: %s", duration)
+			time.Sleep(a.GetCacheRefresh(duration))
+		}
+	}(sem)
+
+	if !env.IsETLEnabled() {
+		// 7 day
+		go func(sem *util.Semaphore) {
+			defer errors.HandlePanic()
+
+			for {
+				duration := "7d"
+				offset := "1m"
+				durHrs := "168h"
+
+				sem.Acquire()
+				aggErr, err := warmFunc(duration, durHrs, offset, false)
+				sem.Return()
+
+				log.Infof("aggregation: warm cache: %s", duration)
+				if aggErr == nil && err == nil {
+					time.Sleep(a.GetCacheRefresh(duration))
+				} else {
+					time.Sleep(5 * time.Minute)
+				}
+			}
+		}(sem)
+
+		// 30 day
+		go func(sem *util.Semaphore) {
+			defer errors.HandlePanic()
+
+			for {
+				duration := "30d"
+				offset := "1m"
+				durHrs := "720h"
+
+				sem.Acquire()
+				aggErr, err := warmFunc(duration, durHrs, offset, false)
+				sem.Return()
+				if aggErr == nil && err == nil {
+					time.Sleep(a.GetCacheRefresh(duration))
+				} else {
+					time.Sleep(5 * time.Minute)
+				}
+			}
+		}(sem)
+	}
+}
+
 // AggregateCostModelHandler handles requests to the aggregated cost model API. See
 // ComputeAggregateCostModel for details.
 func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

+ 9 - 169
pkg/costmodel/router.go

@@ -24,6 +24,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
 	prometheusClient "github.com/prometheus/client_golang/api"
@@ -640,167 +641,6 @@ func (a *Accesses) GetPrometheusMetadata(w http.ResponseWriter, _ *http.Request,
 	w.Write(WrapData(prom.Validate(a.PrometheusClient)))
 }
 
-// func warmAggregateCostModelCache() {
-// 	// Only allow one concurrent cache-warming operation
-// 	sem := util.NewSemaphore(1)
-
-// 	// Set default values, pulling them from application settings where applicable, and warm the cache
-// 	// for the given duration. Cache is intentionally set to expire (i.e. noExpireCache=false) so that
-// 	// if the default parameters change, the old cached defaults with eventually expire. Thus, the
-// 	// timing of the cache expiry/refresh is the only mechanism ensuring 100% cache warmth.
-// 	warmFunc := func(duration, durationHrs, offset string, cacheEfficiencyData bool) (error, error) {
-// 		field := "namespace"
-// 		subfields := []string{}
-// 		rate := ""
-// 		filters := map[string]string{}
-// 		includeTimeSeries := false
-// 		includeEfficiency := true
-// 		disableCache := true
-// 		clearCache := false
-// 		noCache := false
-// 		noExpireCache := false
-// 		remote := true
-// 		shareSplit := "weighted"
-// 		remoteAvailable := env.IsRemoteEnabled()
-// 		remoteEnabled := remote && remoteAvailable
-// 		promClient := a.OpenSource.GetPrometheusClient(remote)
-// 		allocateIdle := provider.AllocateIdleByDefault(a.CloudProvider)
-
-// 		sharedNamespaces := provider.SharedNamespaces(a.CloudProvider)
-// 		sharedLabelNames, sharedLabelValues := provider.SharedLabels(a.CloudProvider)
-
-// 		var sri *costmodel.SharedResourceInfo
-// 		if len(sharedNamespaces) > 0 || len(sharedLabelNames) > 0 {
-// 			sri = costmodel.NewSharedResourceInfo(true, sharedNamespaces, sharedLabelNames, sharedLabelValues)
-// 		}
-
-// 		aggKey := costmodel.GenerateAggKey(aggKeyParams{
-// 			duration:   duration,
-// 			offset:     offset,
-// 			filters:    filters,
-// 			field:      field,
-// 			subfields:  subfields,
-// 			rate:       rate,
-// 			sri:        sri,
-// 			shareType:  shareSplit,
-// 			idle:       allocateIdle,
-// 			timeSeries: includeTimeSeries,
-// 			efficiency: includeEfficiency,
-// 		})
-// 		klog.V(3).Infof("[Info] aggregation: cache warming defaults: %s", aggKey)
-// 		key := fmt.Sprintf("%s:%s", durationHrs, offset)
-
-// 		_, _, aggErr := costmodel.ComputeAggregateCostModel(promClient, duration, offset, field, subfields, rate, filters,
-// 			sri, shareSplit, allocateIdle, includeTimeSeries, includeEfficiency, disableCache,
-// 			clearCache, noCache, noExpireCache, remoteEnabled, false)
-// 		if aggErr != nil {
-// 			klog.Infof("Error building cache %s: %s", key, aggErr)
-// 		}
-// 		if costmodel.A.ThanosClient != nil {
-// 			offset = thanos.Offset()
-// 			klog.Infof("Setting offset to %s", offset)
-// 		}
-// 		totals, err := costmodel.ComputeClusterCosts(promClient, A.OpenSource.CloudProvider, durationHrs, offset, cacheEfficiencyData)
-// 		if err != nil {
-// 			klog.Infof("Error building cluster costs cache %s", key)
-// 		}
-// 		maxMinutesWithData := 0.0
-// 		for _, cluster := range totals {
-// 			if cluster.DataMinutes > maxMinutesWithData {
-// 				maxMinutesWithData = cluster.DataMinutes
-// 			}
-// 		}
-// 		if len(totals) > 0 && maxMinutesWithData > clusterCostsCacheTime {
-
-// 			A.ClusterCostsCache.Set(key, totals, A.GetCacheExpiration(duration))
-// 			klog.V(3).Infof("[Info] caching %s cluster costs for %s", duration, A.GetCacheExpiration(duration))
-// 		} else {
-// 			klog.V(2).Infof("[Warning] not caching %s cluster costs: no data or less than %f minutes data ", duration, clusterCostsCacheTime)
-// 		}
-// 		return aggErr, err
-// 	}
-
-// 	// 1 day
-// 	go func(sem *util.Semaphore) {
-// 		defer errors.HandlePanic()
-
-// 		for {
-// 			duration := "1d"
-// 			offset := "1m"
-// 			durHrs := "24h"
-
-// 			sem.Acquire()
-// 			warmFunc(duration, durHrs, offset, true)
-// 			sem.Return()
-
-// 			klog.V(3).Infof("aggregation: warm cache: %s", duration)
-// 			time.Sleep(A.GetCacheRefresh(duration))
-// 		}
-// 	}(sem)
-
-// 	// 2 day
-// 	go func(sem *util.Semaphore) {
-// 		defer errors.HandlePanic()
-
-// 		for {
-// 			duration := "2d"
-// 			offset := "1m"
-// 			durHrs := "48h"
-
-// 			sem.Acquire()
-// 			warmFunc(duration, durHrs, offset, false)
-// 			sem.Return()
-
-// 			klog.V(3).Infof("aggregation: warm cache: %s", duration)
-// 			time.Sleep(A.GetCacheRefresh(duration))
-// 		}
-// 	}(sem)
-
-// 	if !env.IsETLEnabled() {
-// 		// 7 day
-// 		go func(sem *util.Semaphore) {
-// 			defer errors.HandlePanic()
-
-// 			for {
-// 				duration := "7d"
-// 				offset := "1m"
-// 				durHrs := "168h"
-
-// 				sem.Acquire()
-// 				aggErr, err := warmFunc(duration, durHrs, offset, false)
-// 				sem.Return()
-
-// 				klog.V(3).Infof("aggregation: warm cache: %s", duration)
-// 				if aggErr == nil && err == nil {
-// 					time.Sleep(A.GetCacheRefresh(duration))
-// 				} else {
-// 					time.Sleep(5 * time.Minute)
-// 				}
-// 			}
-// 		}(sem)
-
-// 		// 30 day
-// 		go func(sem *util.Semaphore) {
-// 			defer errors.HandlePanic()
-
-// 			for {
-// 				duration := "30d"
-// 				offset := "1m"
-// 				durHrs := "720h"
-
-// 				sem.Acquire()
-// 				aggErr, err := warmFunc(duration, durHrs, offset, false)
-// 				sem.Return()
-// 				if aggErr == nil && err == nil {
-// 					time.Sleep(A.GetCacheRefresh(duration))
-// 				} else {
-// 					time.Sleep(5 * time.Minute)
-// 				}
-// 			}
-// 		}(sem)
-// 	}
-// }
-
 // Creates a new ClusterManager instance using a boltdb storage. If that fails,
 // then we fall back to a memory-only storage.
 func newClusterManager() *cm.ClusterManager {
@@ -1077,14 +917,6 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		"30d": maxCacheMinutes30d * time.Minute,
 	}
 
-	// warm the cache unless explicitly set to false
-	// if env.IsCacheWarmingEnabled() {
-	// 	log.Infof("Init: AggregateCostModel cache warming enabled")
-	// 	warmAggregateCostModelCache()
-	// } else {
-	// 	log.Infof("Init: AggregateCostModel cache warming disabled")
-	// }
-
 	costModel := NewCostModel(k8sCache, clusterMap, scrapeInterval)
 	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
 
@@ -1113,6 +945,14 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	// Initialize mechanism for subscribing to settings changes
 	a.InitializeSettingsPubSub()
 
+	// Warm the aggregate cache unless explicitly set to false
+	if env.IsCacheWarmingEnabled() {
+		log.Infof("Init: AggregateCostModel cache warming enabled")
+		a.warmAggregateCostModelCache()
+	} else {
+		log.Infof("Init: AggregateCostModel cache warming disabled")
+	}
+
 	err = a.CloudProvider.DownloadPricingData()
 	if err != nil {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())

+ 11 - 0
pkg/env/costmodelenv.go

@@ -56,6 +56,9 @@ const (
 	KubeConfigPathEnvVar = "KUBECONFIG_PATH"
 
 	UTCOffsetEnvVar = "UTC_OFFSET"
+
+	CacheWarmingEnabledEnvVar = "CACHE_WARMING_ENABLED"
+	ETLEnabledEnvVar          = "ETL_ENABLED"
 )
 
 // GetAWSAccessKeyID returns the environment variable value for AWSAccessKeyIDEnvVar which represents
@@ -309,3 +312,11 @@ func GetParsedUTCOffset() time.Duration {
 
 	return offset
 }
+
+func IsCacheWarmingEnabled() bool {
+	return GetBool(CacheWarmingEnabledEnvVar, true)
+}
+
+func IsETLEnabled() bool {
+	return GetBool(ETLEnabledEnvVar, true)
+}