Просмотр исходного кода

Updates to caching to wait for HasSyncd on each controller during CacheCluster creation.

Matt Bolt 6 лет назад
Родитель
Сommit
60920ec740
2 измененных файлов с 39 добавлено и 9 удалено
  1. 26 1
      costmodel/clustercache.go
  2. 13 8
      costmodel/watchcontroller.go

+ 26 - 1
costmodel/clustercache.go

@@ -1,6 +1,8 @@
 package costmodel
 
 import (
+	"sync"
+
 	appsv1 "k8s.io/api/apps/v1"
 	v1 "k8s.io/api/core/v1"
 	stv1 "k8s.io/api/storage/v1"
@@ -49,12 +51,17 @@ type KubernetesClusterCache struct {
 	storageClassWatch WatchController
 }
 
+func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
+	defer wg.Done()
+	wc.WarmUp(cancel)
+}
+
 func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	coreRestClient := client.CoreV1().RESTClient()
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 
-	return &KubernetesClusterCache{
+	kcc := &KubernetesClusterCache{
 		client:            client,
 		namespaceWatch:    NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
 		nodeWatch:         NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
@@ -64,6 +71,24 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
 		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
 	}
+
+	// Wait for each caching watcher to initialize
+	var wg sync.WaitGroup
+	wg.Add(7)
+
+	cancel := make(chan struct{})
+
+	go initializeCache(kcc.namespaceWatch, &wg, cancel)
+	go initializeCache(kcc.nodeWatch, &wg, cancel)
+	go initializeCache(kcc.podWatch, &wg, cancel)
+	go initializeCache(kcc.serviceWatch, &wg, cancel)
+	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
+	go initializeCache(kcc.pvWatch, &wg, cancel)
+	go initializeCache(kcc.storageClassWatch, &wg, cancel)
+
+	wg.Wait()
+
+	return kcc
 }
 
 func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {

+ 13 - 8
costmodel/watchcontroller.go

@@ -23,6 +23,9 @@ type WatchHandler = func(interface{})
 // WatchController defines a contract for an object which watches a specific resource set for
 // add, updates, and removals
 type WatchController interface {
+	// Initializes the cache
+	WarmUp(chan struct{})
+
 	// Run starts the watching process
 	Run(int, chan struct{})
 
@@ -167,6 +170,16 @@ func (c *CachingWatchController) handleErr(err error, key interface{}) {
 	klog.Infof("Dropping %s %q out of the queue: %v", c.resourceType, key, err)
 }
 
+func (c *CachingWatchController) WarmUp(cancelCh chan struct{}) {
+	go c.informer.Run(cancelCh)
+
+	// Wait for all involved caches to be synced, before processing items from the queue is started
+	if !cache.WaitForCacheSync(cancelCh, c.informer.HasSynced) {
+		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
+		return
+	}
+}
+
 func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
 	defer runtime.HandleCrash()
 
@@ -174,14 +187,6 @@ func (c *CachingWatchController) Run(threadiness int, stopCh chan struct{}) {
 	defer c.queue.ShutDown()
 	klog.V(3).Infof("Starting %s controller", c.resourceType)
 
-	go c.informer.Run(stopCh)
-
-	// Wait for all involved caches to be synced, before processing items from the queue is started
-	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
-		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
-		return
-	}
-
 	for i := 0; i < threadiness; i++ {
 		go wait.Until(c.runWorker, time.Second, stopCh)
 	}