Jelajahi Sumber

Migrate Agg API WIP: pub/sub for settings changes

Niko Kovacevic 5 tahun lalu
induk
melakukan
e6ccc315e7
3 mengubah file dengan 182 tambahan dan 11 penghapusan
  1. 6 10
      pkg/costmodel/aggregation.go
  2. 19 1
      pkg/costmodel/router.go
  3. 157 0
      pkg/costmodel/settings.go

+ 6 - 10
pkg/costmodel/aggregation.go

@@ -1587,11 +1587,12 @@ func GenerateAggKey(ps aggKeyParams) string {
 		ps.sri, ps.shareType, ps.idle, ps.timeSeries, ps.efficiency)
 }
 
-type CostModelAggregator interface {
-	ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string,
-		subfields []string, rate string, filters map[string]string, sri *SharedResourceInfo, shared string,
-		allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache,
-		remoteEnabled, disableSharedOverhead, useETLAdapter bool) (map[string]*Aggregation, string, error)
+// Aggregator is capable of computing the aggregated cost model. This is
+// a brutal interface, which should be cleaned up, but it's necessary for
+// being able to swap in an ETL-backed implementation.
+// TODO clean up, simplify
+type Aggregator interface {
+	ComputeAggregateCostModel(promClient prometheusClient.Client, duration, offset, field string, subfields []string, rate string, filters map[string]string, sri *SharedResourceInfo, shared string, allocateIdle, includeTimeSeries, includeEfficiency, disableCache, clearCache, noCache, noExpireCache, remoteEnabled, disableSharedOverhead, useETLAdapter bool) (map[string]*Aggregation, string, error)
 }
 
 // AggregateCostModelHandler handles requests to the aggregated cost model API. See
@@ -1770,11 +1771,6 @@ func (a *Accesses) AggregateCostModelHandler(w http.ResponseWriter, r *http.Requ
 	// enable remote if it is available and not disabled
 	remoteEnabled := remote && env.IsRemoteEnabled()
 
-	// if custom pricing has changed, then clear the cache and recompute data
-	if a.CustomPricingHasChanged() {
-		clearCache = true
-	}
-
 	promClient := a.GetPrometheusClient(remote)
 
 	useETLAdapter := r.URL.Query().Get("etl") == "true"

+ 19 - 1
pkg/costmodel/router.go

@@ -9,6 +9,7 @@ import (
 	"reflect"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"k8s.io/klog"
@@ -44,6 +45,8 @@ const (
 	maxCacheMinutes2d           = 17
 	maxCacheMinutes7d           = 37
 	maxCacheMinutes30d          = 137
+	CustomPricingSetting        = "CustomPricing"
+	DiscountSetting             = "Discount"
 )
 
 var (
@@ -68,7 +71,13 @@ type Accesses struct {
 	CostDataCache     *cache.Cache
 	ClusterCostsCache *cache.Cache
 	CacheExpiration   map[string]time.Duration
-	AggAPI            CostModelAggregator
+	AggAPI            Aggregator
+	// SettingsCache stores current state of app settings
+	SettingsCache *cache.Cache
+	// settingsSubscribers tracks channels through which changes to different
+	// settings will be published in a pub/sub model
+	settingsSubscribers map[string][]chan string
+	settingsMutex       sync.Mutex
 }
 
 // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
@@ -1056,6 +1065,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	aggregateCache := cache.New(time.Minute*10, time.Minute*20)
 	costDataCache := cache.New(time.Minute*10, time.Minute*20)
 	clusterCostsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
+	settingsCache := cache.New(cache.NoExpiration, cache.NoExpiration)
 
 	// query durations that should be cached longer should be registered here
 	// use relatively prime numbers to minimize likelihood of synchronized
@@ -1091,10 +1101,18 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		AggregateCache:    aggregateCache,
 		CostDataCache:     costDataCache,
 		ClusterCostsCache: clusterCostsCache,
+		SettingsCache:     settingsCache,
 		CacheExpiration:   cacheExpiration,
 	}
+	// Use the Accesses instance, itself, as the CostModelAggregator. This is
+	// confusing and unconventional, but necessary so that we can swap it
+	// out for the ETL-adapted version elsewhere.
+	// TODO clean this up once ETL is open-sourced.
 	a.AggAPI = &a
 
+	// Initialize mechanism for subscribing to settings changes
+	a.InitializeSettingsPubSub()
+
 	err = a.CloudProvider.DownloadPricingData()
 	if err != nil {
 		klog.V(1).Info("Failed to download pricing data: " + err.Error())

+ 157 - 0
pkg/costmodel/settings.go

@@ -0,0 +1,157 @@
+package costmodel
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/patrickmn/go-cache"
+	"k8s.io/klog"
+)
+
+// InitializeSettingsPubSub sets up the pub/sub mechanisms and kicks of
+// routines to detect and publish changes, as well as some routines that
+// subscribe and take actions.
+func (a *Accesses) InitializeSettingsPubSub() {
+	a.settingsSubscribers = map[string][]chan string{}
+
+	// Publish settings changes
+	go func(a *Accesses) {
+		for {
+			// Publish changes to custom pricing
+			if a.customPricingHasChanged() {
+				for _, ch := range a.settingsSubscribers[CustomPricingSetting] {
+					if data, ok := a.SettingsCache.Get(CustomPricingSetting); ok {
+						if cpStr, ok := data.(string); ok {
+							ch <- cpStr
+						}
+					}
+				}
+			}
+
+			// Publish changes to discount
+			if a.discountHasChanged() {
+				for _, ch := range a.settingsSubscribers[DiscountSetting] {
+					if data, ok := a.SettingsCache.Get(DiscountSetting); ok {
+						if discStr, ok := data.(string); ok {
+							ch <- discStr
+						}
+					}
+				}
+			}
+
+			time.Sleep(500 * time.Millisecond)
+		}
+	}(a)
+
+	// Clear caches when custom pricing or discount changes
+	go func(a *Accesses) {
+		costDataCacheCh := make(chan string)
+		a.SubscribeToCustomPricingChanges(costDataCacheCh)
+		a.SubscribeToDiscountChanges(costDataCacheCh)
+		for {
+			msg := <-costDataCacheCh
+			log.Infof("Flushing cost data caches: %s", msg)
+			a.AggregateCache.Flush()
+			a.CostDataCache.Flush()
+		}
+	}(a)
+}
+
+// SubscribeToCustomPricingChanges subscribes the given channel to receive
+// custom pricing changes.
+func (a *Accesses) SubscribeToCustomPricingChanges(ch chan string) {
+	a.settingsMutex.Lock()
+	defer a.settingsMutex.Unlock()
+
+	a.settingsSubscribers[CustomPricingSetting] = append(a.settingsSubscribers[CustomPricingSetting], ch)
+}
+
+// SubscribeToDiscountChanges subscribes the given channel to receive discount
+// changes.
+func (a *Accesses) SubscribeToDiscountChanges(ch chan string) {
+	a.settingsMutex.Lock()
+	defer a.settingsMutex.Unlock()
+
+	a.settingsSubscribers[DiscountSetting] = append(a.settingsSubscribers[DiscountSetting], ch)
+}
+
+// customPricingHasChanged returns true if custom pricing settings have changed
+// since the last time this function was called.
+func (a *Accesses) customPricingHasChanged() bool {
+	customPricing, err := a.CloudProvider.GetConfig()
+	if err != nil || customPricing == nil {
+		klog.Errorf("error accessing cloud provider configuration: %s", err)
+		return false
+	}
+
+	// describe parameters by which we determine whether or not custom
+	// pricing settings have changed
+	encodeCustomPricing := func(cp *cloud.CustomPricing) string {
+		return fmt.Sprintf("%s:%s:%s:%s:%s:%s:%s:%s:%+v", cp.CustomPricesEnabled, cp.CPU, cp.SpotCPU,
+			cp.RAM, cp.SpotRAM, cp.GPU, cp.Storage, cp.CurrencyCode, cp.SharedCosts)
+	}
+
+	// compare cached custom pricing parameters with current values
+	cpStr := encodeCustomPricing(customPricing)
+	cpStrCached := ""
+	val, found := a.SettingsCache.Get(CustomPricingSetting)
+	if !found {
+		// if no settings are found (e.g. upon first call) cache custom pricing settings but
+		// return false, as nothing has "changed" per se
+		a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.NoExpiration)
+		return false
+	}
+	cpStrCached, ok := val.(string)
+	if !ok {
+		klog.Errorf("caching error: failed to cast custom pricing to string")
+	}
+	if cpStr == cpStrCached {
+		return false
+	}
+
+	// cache new custom pricing settings
+	a.SettingsCache.Set(CustomPricingSetting, cpStr, cache.DefaultExpiration)
+
+	return true
+}
+
+// discountHasChanged returns true if discount settings have changed
+// since the last time this function was called.
+func (a *Accesses) discountHasChanged() bool {
+	customPricing, err := a.CloudProvider.GetConfig()
+	if err != nil || customPricing == nil {
+		klog.Errorf("error accessing cloud provider configuration: %s", err)
+		return false
+	}
+
+	// describe parameters by which we determine whether or not custom
+	// pricing settings have changed
+	encodeDiscount := func(cp *cloud.CustomPricing) string {
+		return fmt.Sprintf("%s:%s", cp.Discount, cp.NegotiatedDiscount)
+	}
+
+	// compare cached custom pricing parameters with current values
+	discStr := encodeDiscount(customPricing)
+	discStrCached := ""
+	val, found := a.SettingsCache.Get(DiscountSetting)
+	if !found {
+		// if no settings are found (e.g. upon first call) cache custom pricing settings but
+		// return false, as nothing has "changed" per se
+		a.SettingsCache.Set(DiscountSetting, discStr, cache.NoExpiration)
+		return false
+	}
+	discStrCached, ok := val.(string)
+	if !ok {
+		klog.Errorf("caching error: failed to cast discount to string")
+	}
+	if discStr == discStrCached {
+		return false
+	}
+
+	// cache new custom pricing settings
+	a.SettingsCache.Set(DiscountSetting, discStr, cache.DefaultExpiration)
+
+	return true
+}