Просмотр исходного кода

add blocking behavior on Run() to pre-initialize resource lists.

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt 1 год назад
Родитель
Сommit
9f5261ef31
2 измененных файлов с 71 добавлено и 24 удалено
  1. 41 16
      pkg/clustercache/clustercache2.go
  2. 30 8
      pkg/clustercache/store.go

+ 41 - 16
pkg/clustercache/clustercache2.go

@@ -1,7 +1,7 @@
 package clustercache
 
 import (
-	"context"
+	"sync"
 
 	appsv1 "k8s.io/api/apps/v1"
 	batchv1 "k8s.io/api/batch/v1"
@@ -26,32 +26,57 @@ type KubernetesClusterCacheV2 struct {
 	replicationControllerStore *GenericStore[*v1.ReplicationController, *ReplicationController]
 	replicaSetStore            *GenericStore[*appsv1.ReplicaSet, *ReplicaSet]
 	pdbStore                   *GenericStore[*policyv1.PodDisruptionBudget, *PodDisruptionBudget]
+	stopCh                     chan struct{}
 }
 
 func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClusterCacheV2 {
-	ctx := context.TODO()
 	return &KubernetesClusterCacheV2{
-		namespaceStore:             CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "namespaces", transformNamespace),
-		nodeStore:                  CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "nodes", transformNode),
-		persistentVolumeClaimStore: CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "persistentvolumeclaims", transformPersistentVolumeClaim),
-		persistentVolumeStore:      CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "persistentvolumes", transformPersistentVolume),
-		podStore:                   CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "pods", transformPod),
-		replicationControllerStore: CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "replicationcontrollers", transformReplicationController),
-		serviceStore:               CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "services", transformService),
-		daemonSetStore:             CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "daemonsets", transformDaemonSet),
-		deploymentStore:            CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "deployments", transformDeployment),
-		replicaSetStore:            CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "replicasets", transformReplicaSet),
-		statefulSetStore:           CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "statefulsets", transformStatefulSet),
-		storageClassStore:          CreateStoreAndWatch(ctx, clientset.StorageV1().RESTClient(), "storageclasses", transformStorageClass),
-		jobStore:                   CreateStoreAndWatch(ctx, clientset.BatchV1().RESTClient(), "jobs", transformJob),
-		pdbStore:                   CreateStoreAndWatch(ctx, clientset.PolicyV1().RESTClient(), "poddisruptionbudgets", transformPodDisruptionBudget),
+		namespaceStore:             CreateStore(clientset.CoreV1().RESTClient(), "namespaces", transformNamespace),
+		nodeStore:                  CreateStore(clientset.CoreV1().RESTClient(), "nodes", transformNode),
+		persistentVolumeClaimStore: CreateStore(clientset.CoreV1().RESTClient(), "persistentvolumeclaims", transformPersistentVolumeClaim),
+		persistentVolumeStore:      CreateStore(clientset.CoreV1().RESTClient(), "persistentvolumes", transformPersistentVolume),
+		podStore:                   CreateStore(clientset.CoreV1().RESTClient(), "pods", transformPod),
+		replicationControllerStore: CreateStore(clientset.CoreV1().RESTClient(), "replicationcontrollers", transformReplicationController),
+		serviceStore:               CreateStore(clientset.CoreV1().RESTClient(), "services", transformService),
+		daemonSetStore:             CreateStore(clientset.AppsV1().RESTClient(), "daemonsets", transformDaemonSet),
+		deploymentStore:            CreateStore(clientset.AppsV1().RESTClient(), "deployments", transformDeployment),
+		replicaSetStore:            CreateStore(clientset.AppsV1().RESTClient(), "replicasets", transformReplicaSet),
+		statefulSetStore:           CreateStore(clientset.AppsV1().RESTClient(), "statefulsets", transformStatefulSet),
+		storageClassStore:          CreateStore(clientset.StorageV1().RESTClient(), "storageclasses", transformStorageClass),
+		jobStore:                   CreateStore(clientset.BatchV1().RESTClient(), "jobs", transformJob),
+		pdbStore:                   CreateStore(clientset.PolicyV1().RESTClient(), "poddisruptionbudgets", transformPodDisruptionBudget),
+		stopCh:                     make(chan struct{}),
 	}
 }
 
 func (kcc *KubernetesClusterCacheV2) Run() {
+	var wg sync.WaitGroup
+	wg.Add(14)
+
+	kcc.namespaceStore.Watch(kcc.stopCh, wg.Done)
+	kcc.nodeStore.Watch(kcc.stopCh, wg.Done)
+	kcc.persistentVolumeClaimStore.Watch(kcc.stopCh, wg.Done)
+	kcc.persistentVolumeStore.Watch(kcc.stopCh, wg.Done)
+	kcc.podStore.Watch(kcc.stopCh, wg.Done)
+	kcc.replicationControllerStore.Watch(kcc.stopCh, wg.Done)
+	kcc.serviceStore.Watch(kcc.stopCh, wg.Done)
+	kcc.daemonSetStore.Watch(kcc.stopCh, wg.Done)
+	kcc.deploymentStore.Watch(kcc.stopCh, wg.Done)
+	kcc.replicaSetStore.Watch(kcc.stopCh, wg.Done)
+	kcc.statefulSetStore.Watch(kcc.stopCh, wg.Done)
+	kcc.storageClassStore.Watch(kcc.stopCh, wg.Done)
+	kcc.jobStore.Watch(kcc.stopCh, wg.Done)
+	kcc.pdbStore.Watch(kcc.stopCh, wg.Done)
+
+	wg.Wait()
 }
 
 func (kcc *KubernetesClusterCacheV2) Stop() {
+	if kcc.stopCh != nil {
+		close(kcc.stopCh)
+
+		kcc.stopCh = nil
+	}
 }
 
 func (kcc *KubernetesClusterCacheV2) GetAllNamespaces() []*Namespace {

+ 30 - 8
pkg/clustercache/store.go

@@ -1,7 +1,6 @@
 package clustercache
 
 import (
-	"context"
 	"sync"
 
 	v1 "k8s.io/api/core/v1"
@@ -17,6 +16,10 @@ type GenericStore[Input UIDGetter, Output any] struct {
 	mutex         sync.RWMutex
 	items         map[types.UID]Output
 	transformFunc func(input Input) Output
+
+	// storing this cyclic reflector allows us to defer watching
+	reflector *cache.Reflector
+	onInit    func()
 }
 
 // NewGenericStore creates a new instance of GenericStore.
@@ -31,8 +34,7 @@ type UIDGetter interface {
 	GetUID() types.UID
 }
 
-func CreateStoreAndWatch[Input UIDGetter, Output any](
-	ctx context.Context,
+func CreateStore[Input UIDGetter, Output any](
 	restClient rest.Interface,
 	resource string,
 	transformFunc func(input Input) Output,
@@ -40,11 +42,19 @@ func CreateStoreAndWatch[Input UIDGetter, Output any](
 	lw := cache.NewListWatchFromClient(restClient, resource, v1.NamespaceAll, fields.Everything())
 	store := NewGenericStore(transformFunc)
 	var zeroValue Input
-	reflector := cache.NewReflector(lw, zeroValue, store, 0)
-	go reflector.Run(ctx.Done())
+	store.reflector = cache.NewReflector(lw, zeroValue, store, 0)
+
 	return store
 }
 
+func (s *GenericStore[Input, Output]) Watch(stopCh <-chan struct{}, onInit func()) {
+	s.onInit = onInit
+
+	// reflector.Run() will eventually call Replace() on the store with the initial contents
+	// of the resource list. we'll call onInit after that happens the _first_ time
+	go s.reflector.Run(stopCh)
+}
+
 // Add inserts an object into the store.
 func (s *GenericStore[Input, Output]) Add(obj any) error {
 	return s.Update(obj)
@@ -96,16 +106,28 @@ func (s *GenericStore[Input, Output]) Replace(list []any, _ string) error {
 		}
 	}
 
+	// call onInit after the initial list has been processed
+	if s.onInit != nil {
+		s.onInit()
+		s.onInit = nil
+	}
+
 	return nil
 }
 
 // Stubs to satisfy the cache.Store interface
-func (s *GenericStore[Input, Output]) List() []interface{} { return nil }
-func (s *GenericStore[Input, Output]) ListKeys() []string  { return nil }
+func (s *GenericStore[Input, Output]) List() []interface{} {
+	return nil
+}
+func (s *GenericStore[Input, Output]) ListKeys() []string {
+	return nil
+}
 func (s *GenericStore[Input, Output]) Get(_ interface{}) (item interface{}, exists bool, err error) {
 	return nil, false, nil
 }
 func (s *GenericStore[Input, Output]) GetByKey(_ string) (item interface{}, exists bool, err error) {
 	return nil, false, nil
 }
-func (s *GenericStore[Input, Output]) Resync() error { return nil }
+func (s *GenericStore[Input, Output]) Resync() error {
+	return nil
+}