Bladeren bron

Cluster Cache Optimization Part 2 - Config Map Separation (#3011)

Matt Bolt 1 jaar geleden
bovenliggende
commit
dc268c389c

+ 0 - 74
core/pkg/util/watcher/configwatchers.go

@@ -1,74 +0,0 @@
-package watcher
-
-import (
-	"github.com/opencost/opencost/core/pkg/log"
-	v1 "k8s.io/api/core/v1"
-)
-
-// ConfigMapWatcher represents a single configmap watcher
-type ConfigMapWatcher struct {
-	ConfigMapName string
-	WatchFunc     func(string, map[string]string) error
-}
-
-type ConfigMapWatchers struct {
-	watchers map[string][]*ConfigMapWatcher
-}
-
-func NewConfigMapWatchers(watchers ...*ConfigMapWatcher) *ConfigMapWatchers {
-	cmw := &ConfigMapWatchers{
-		watchers: make(map[string][]*ConfigMapWatcher),
-	}
-
-	for _, w := range watchers {
-		cmw.AddWatcher(w)
-	}
-
-	return cmw
-}
-
-func (cmw *ConfigMapWatchers) AddWatcher(watcher *ConfigMapWatcher) {
-	if watcher == nil {
-		return
-	}
-
-	name := watcher.ConfigMapName
-	cmw.watchers[name] = append(cmw.watchers[name], watcher)
-}
-
-func (cmw *ConfigMapWatchers) Add(configMapName string, watchFunc func(string, map[string]string) error) {
-	cmw.AddWatcher(&ConfigMapWatcher{
-		ConfigMapName: configMapName,
-		WatchFunc:     watchFunc,
-	})
-}
-
-func (cmw *ConfigMapWatchers) GetWatchedConfigs() []string {
-	configNames := []string{}
-
-	for k := range cmw.watchers {
-		configNames = append(configNames, k)
-	}
-
-	return configNames
-}
-
-func (cmw *ConfigMapWatchers) ToWatchFunc() func(interface{}) {
-	return func(c interface{}) {
-		conf, ok := c.(*v1.ConfigMap)
-		if !ok {
-			return
-		}
-
-		name := conf.GetName()
-		data := conf.Data
-		if watchers, ok := cmw.watchers[name]; ok {
-			for _, cw := range watchers {
-				err := cw.WatchFunc(name, data)
-				if err != nil {
-					log.Infof("ERROR UPDATING %s CONFIG: %s", name, err.Error())
-				}
-			}
-		}
-	}
-}

+ 1 - 1
pkg/cloud/provider/provider.go

@@ -26,10 +26,10 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/util/httputil"
-	"github.com/opencost/opencost/core/pkg/util/watcher"
 	"github.com/opencost/opencost/pkg/clustercache"
 	"github.com/opencost/opencost/pkg/config"
 	"github.com/opencost/opencost/pkg/env"
+	"github.com/opencost/opencost/pkg/util/watcher"
 )
 
 // ClusterName returns the name defined in cluster info, defaulting to the

+ 2 - 16
pkg/clustercache/clustercache.go

@@ -410,9 +410,6 @@ type ClusterCache interface {
 
 	// GetAllReplicationControllers returns all cached replication controllers
 	GetAllReplicationControllers() []*ReplicationController
-
-	// SetConfigMapUpdateFunc sets the configmap update function
-	SetConfigMapUpdateFunc(func(interface{}))
 }
 
 // KubernetesClusterCache is the implementation of ClusterCache
@@ -422,7 +419,6 @@ type KubernetesClusterCache struct {
 	namespaceWatch             WatchController
 	nodeWatch                  WatchController
 	podWatch                   WatchController
-	kubecostConfigMapWatch     WatchController
 	serviceWatch               WatchController
 	daemonsetsWatch            WatchController
 	deploymentsWatch           WatchController
@@ -464,7 +460,6 @@ func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
 		namespaceWatch:             NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
 		nodeWatch:                  NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
 		podWatch:                   NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
-		kubecostConfigMapWatch:     NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
 		serviceWatch:               NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
 		daemonsetsWatch:            NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
 		deploymentsWatch:           NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
@@ -481,12 +476,8 @@ func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
 	// Wait for each caching watcher to initialize
 	cancel := make(chan struct{})
 	var wg sync.WaitGroup
-	if env.IsETLReadOnlyMode() {
-		wg.Add(1)
-		go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
-	} else {
-		wg.Add(15)
-		go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
+	if !env.IsETLReadOnlyMode() {
+		wg.Add(14)
 		go initializeCache(kcc.namespaceWatch, &wg, cancel)
 		go initializeCache(kcc.nodeWatch, &wg, cancel)
 		go initializeCache(kcc.podWatch, &wg, cancel)
@@ -520,7 +511,6 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.nodeWatch.Run(1, stopCh)
 	go kcc.podWatch.Run(1, stopCh)
 	go kcc.serviceWatch.Run(1, stopCh)
-	go kcc.kubecostConfigMapWatch.Run(1, stopCh)
 	go kcc.daemonsetsWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.statefulsetWatch.Run(1, stopCh)
@@ -669,7 +659,3 @@ func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*Replication
 	}
 	return rcs
 }
-
-func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
-	kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
-}

+ 0 - 2
pkg/clustercache/clustercache2.go

@@ -54,8 +54,6 @@ func (kcc *KubernetesClusterCacheV2) Run() {
 func (kcc *KubernetesClusterCacheV2) Stop() {
 }
 
-func (kcc *KubernetesClusterCacheV2) SetConfigMapUpdateFunc(f func(interface{})) {}
-
 func (kcc *KubernetesClusterCacheV2) GetAllNamespaces() []*Namespace {
 	return kcc.namespaceStore.GetAll()
 }

+ 0 - 7
pkg/clustercache/clusterimporter.go

@@ -196,10 +196,3 @@ func (ci *ClusterImporter) GetAllReplicationControllers() []*ReplicationControll
 
 	return slices.Clone(ci.data.ReplicationControllers)
 }
-
-// SetConfigMapUpdateFunc sets the configmap update function
-func (ci *ClusterImporter) SetConfigMapUpdateFunc(_ func(interface{})) {
-	// TODO: (bolt) This function is still a bit strange to me for the ClusterCache interface.
-	// TODO: (bolt) no-op for now.
-	log.Warnf("SetConfigMapUpdateFunc is disabled for imported cluster data.")
-}

+ 4 - 21
pkg/cmd/agent/agent.go

@@ -9,7 +9,7 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/clusters"
 	"github.com/opencost/opencost/core/pkg/log"
-	"github.com/opencost/opencost/core/pkg/util/watcher"
+	"github.com/opencost/opencost/pkg/util/watcher"
 
 	"github.com/opencost/opencost/core/pkg/version"
 	"github.com/opencost/opencost/pkg/cloud/provider"
@@ -25,7 +25,6 @@ import (
 	prometheus "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
-	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/rs/cors"
 	"k8s.io/client-go/kubernetes"
@@ -129,9 +128,6 @@ func newPrometheusClient() (prometheus.Client, error) {
 
 func Execute(opts *AgentOpts) error {
 	log.Infof("Starting Kubecost Agent version %s", version.FriendlyVersion())
-
-	configWatchers := watcher.NewConfigMapWatchers()
-
 	scrapeInterval := env.GetKubecostScrapeInterval()
 	promCli, err := newPrometheusClient()
 	if err != nil {
@@ -168,23 +164,10 @@ func Execute(opts *AgentOpts) error {
 	}
 
 	// Append the pricing config watcher
-	configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
-	watchConfigFunc := configWatchers.ToWatchFunc()
-	watchedConfigs := configWatchers.GetWatchedConfigs()
-
 	kubecostNamespace := env.GetKubecostNamespace()
-
-	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
-	for _, cw := range watchedConfigs {
-		configs, err := k8sClient.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
-		if err != nil {
-			log.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
-		} else {
-			watchConfigFunc(configs)
-		}
-	}
-
-	clusterCache.SetConfigMapUpdateFunc(watchConfigFunc)
+	configWatchers := watcher.NewConfigMapWatchers(k8sClient, kubecostNamespace)
+	configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
+	configWatchers.Watch()
 
 	configPrefix := env.GetConfigPathWithDefault(env.DefaultConfigMountPath)
 

+ 5 - 20
pkg/costmodel/router.go

@@ -19,7 +19,6 @@ import (
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/util/httputil"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
-	"github.com/opencost/opencost/core/pkg/util/watcher"
 	"github.com/opencost/opencost/core/pkg/version"
 	"github.com/opencost/opencost/pkg/cloud/aws"
 	cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
@@ -32,6 +31,7 @@ import (
 	"github.com/opencost/opencost/pkg/kubeconfig"
 	"github.com/opencost/opencost/pkg/metrics"
 	"github.com/opencost/opencost/pkg/services"
+	"github.com/opencost/opencost/pkg/util/watcher"
 	"github.com/spf13/viper"
 	v1 "k8s.io/api/core/v1"
 
@@ -1451,8 +1451,6 @@ func handlePanic(p errors.Panic) bool {
 }
 
 func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
-	configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
-
 	var err error
 	if errorReportingEnabled {
 		err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
@@ -1561,25 +1559,12 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 	}
 
 	// Append the pricing config watcher
-	configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
-	configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
-
-	watchConfigFunc := configWatchers.ToWatchFunc()
-	watchedConfigs := configWatchers.GetWatchedConfigs()
-
 	kubecostNamespace := env.GetKubecostNamespace()
-	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
-	for _, cw := range watchedConfigs {
-		configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
-		if err != nil {
-			log.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
-		} else {
-			log.Infof("Found configmap %s, watching...", configs.Name)
-			watchConfigFunc(configs)
-		}
-	}
 
-	k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
+	configWatchers := watcher.NewConfigMapWatchers(kubeClientset, kubecostNamespace, additionalConfigWatchers...)
+	configWatchers.AddWatcher(provider.ConfigWatcherFor(cloudProvider))
+	configWatchers.AddWatcher(metrics.GetMetricsConfigWatcher())
+	configWatchers.Watch()
 
 	remoteEnabled := env.IsRemoteEnabled()
 	if remoteEnabled {

+ 1 - 1
pkg/metrics/metricsconfig.go

@@ -7,8 +7,8 @@ import (
 	"path"
 	"sync"
 
-	"github.com/opencost/opencost/core/pkg/util/watcher"
 	"github.com/opencost/opencost/pkg/env"
+	"github.com/opencost/opencost/pkg/util/watcher"
 )
 
 var (

+ 10 - 6
core/pkg/util/watcher/configwatcher_test.go → pkg/util/watcher/configwatcher_test.go

@@ -40,8 +40,8 @@ func TestConfigWatcherSingleHandler(t *testing.T) {
 	// triggered
 	var didRun bool = false
 
-	w := NewConfigMapWatchers(newTestWatcher(t, TestConfigMapName, "single", &didRun))
-	f := w.ToWatchFunc()
+	w := NewConfigMapWatchers(nil, "", newTestWatcher(t, TestConfigMapName, "single", &didRun))
+	f := w.toWatchFunc()
 
 	// Execute watch func with a 'test-config' configmap
 	f(newConfigMap(TestConfigMapName, "testing 1 2 3"))
@@ -57,10 +57,12 @@ func TestConfigWatcherMultipleHandlers(t *testing.T) {
 	var secondDidRun bool = false
 
 	w := NewConfigMapWatchers(
+		nil,
+		"",
 		newTestWatcher(t, TestConfigMapName, "single", &firstDidRun),
 		newTestWatcher(t, AlternateTestConfigMapName, "alternate", &secondDidRun))
 
-	f := w.ToWatchFunc()
+	f := w.toWatchFunc()
 
 	// Execute watch func with a 'alternate-test-config' configmap
 	f(newConfigMap(AlternateTestConfigMapName, "oof!"))
@@ -82,13 +84,15 @@ func TestConfigWatcherMultipleHandlersForSameConfig(t *testing.T) {
 	var thirdDidRun bool = false
 
 	w := NewConfigMapWatchers(
+		nil,
+		"",
 		newTestWatcher(t, TestConfigMapName, "first", &firstDidRun),
 		newTestWatcher(t, AlternateTestConfigMapName, "alternate", &secondDidRun),
 		// third watcher watches for the same configmap as "first"
 		newTestWatcher(t, TestConfigMapName, "third", &thirdDidRun),
 	)
 
-	f := w.ToWatchFunc()
+	f := w.toWatchFunc()
 
 	// Execute watch func with a 'test-config' configmap
 	f(newConfigMap(TestConfigMapName, "double trouble"))
@@ -118,12 +122,12 @@ func TestConfigMapWatcherWithAdd(t *testing.T) {
 		// third watcher watches for the same configmap as "first"
 		newTestWatcher(t, TestConfigMapName, "third", &thirdDidRun)
 
-	w := NewConfigMapWatchers()
+	w := NewConfigMapWatchers(nil, "")
 	w.AddWatcher(a)
 	w.AddWatcher(b)
 	w.Add(c.ConfigMapName, c.WatchFunc)
 
-	f := w.ToWatchFunc()
+	f := w.toWatchFunc()
 
 	// Execute watch func with a 'test-config' configmap
 	f(newConfigMap(TestConfigMapName, "double trouble"))

+ 135 - 0
pkg/util/watcher/configwatchers.go

@@ -0,0 +1,135 @@
+package watcher
+
+import (
+	"context"
+	"sync/atomic"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/pkg/clustercache"
+	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/fields"
+
+	"k8s.io/client-go/kubernetes"
+)
+
+// ConfigMapWatcher represents a single configmap watcher
+type ConfigMapWatcher struct {
+	ConfigMapName string
+	WatchFunc     func(string, map[string]string) error
+}
+
+type ConfigMapWatchers struct {
+	kubeClientset   kubernetes.Interface
+	namespace       string
+	watchers        map[string][]*ConfigMapWatcher
+	watchController clustercache.WatchController
+	started         atomic.Bool
+	stop            chan struct{}
+}
+
+func NewConfigMapWatchers(kubeClientset kubernetes.Interface, namespace string, watchers ...*ConfigMapWatcher) *ConfigMapWatchers {
+	var stopCh chan struct{}
+	var watchController clustercache.WatchController
+
+	if kubeClientset != nil {
+		coreRestClient := kubeClientset.CoreV1().RESTClient()
+		watchController = clustercache.NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, namespace, fields.Everything())
+		stopCh = make(chan struct{})
+
+		// a bit awkward here, but since we'll mostly be deferring adding a watcher after initializing k8s,
+		// we'll warmup and start the actual watcher here
+		watchController.WarmUp(stopCh)
+		go watchController.Run(1, stopCh)
+	}
+
+	cmw := &ConfigMapWatchers{
+		kubeClientset:   kubeClientset,
+		namespace:       namespace,
+		watchController: watchController,
+		watchers:        make(map[string][]*ConfigMapWatcher),
+		stop:            stopCh,
+	}
+
+	for _, w := range watchers {
+		cmw.AddWatcher(w)
+	}
+
+	return cmw
+}
+
+func (cmw *ConfigMapWatchers) AddWatcher(watcher *ConfigMapWatcher) {
+	if cmw.started.Load() {
+		log.Warnf("Cannot add watcher %s after starting", watcher.ConfigMapName)
+		return
+	}
+
+	if watcher == nil {
+		return
+	}
+
+	name := watcher.ConfigMapName
+	cmw.watchers[name] = append(cmw.watchers[name], watcher)
+}
+
+func (cmw *ConfigMapWatchers) Add(configMapName string, watchFunc func(string, map[string]string) error) {
+	cmw.AddWatcher(&ConfigMapWatcher{
+		ConfigMapName: configMapName,
+		WatchFunc:     watchFunc,
+	})
+}
+
+func (cmw *ConfigMapWatchers) Watch() {
+	if cmw.kubeClientset == nil {
+		return
+	}
+
+	if !cmw.started.CompareAndSwap(false, true) {
+		log.Warnf("Already started")
+		return
+	}
+
+	watchConfigFunc := cmw.toWatchFunc()
+
+	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
+	for cw := range cmw.watchers {
+		configs, err := cmw.kubeClientset.CoreV1().ConfigMaps(cmw.namespace).Get(context.Background(), cw, metav1.GetOptions{})
+		if err != nil {
+			log.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
+		} else {
+			log.Infof("Found configmap %s, watching...", configs.Name)
+			watchConfigFunc(configs)
+		}
+	}
+
+	cmw.watchController.SetUpdateHandler(watchConfigFunc)
+}
+
+func (cmw *ConfigMapWatchers) Stop() {
+	if cmw.stop == nil {
+		return
+	}
+
+	close(cmw.stop)
+	cmw.stop = nil
+}
+
+func (cmw *ConfigMapWatchers) toWatchFunc() func(any) {
+	return func(c any) {
+		conf, ok := c.(*v1.ConfigMap)
+		if !ok {
+			return
+		}
+
+		name := conf.GetName()
+		data := conf.Data
+		if watchers, ok := cmw.watchers[name]; ok {
+			for _, cw := range watchers {
+				err := cw.WatchFunc(name, data)
+				if err != nil {
+					log.Infof("ERROR UPDATING %s CONFIG: %s", name, err.Error())
+				}
+			}
+		}
+	}
+}