Parcourir la source

create reflector-based version of cluster cache

Signed-off-by: r2k1 <yokree@gmail.com>
r2k1 il y a 2 ans
Parent
commit
58ff00f42b
3 fichiers modifiés avec 219 ajouts et 0 suppressions
  1. 4 0
      pkg/clustercache/clustercache.go
  2. 94 0
      pkg/clustercache/clustercache2.go
  3. 121 0
      pkg/clustercache/store.go

+ 4 - 0
pkg/clustercache/clustercache.go

@@ -321,6 +321,10 @@ func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{
 }
 
 func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
+	return NewKubernetesClusterCache2(client)
+}
+
+func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
 	coreRestClient := client.CoreV1().RESTClient()
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()

+ 94 - 0
pkg/clustercache/clustercache2.go

@@ -0,0 +1,94 @@
+package clustercache
+
+import (
+	"context"
+
+	appsv1 "k8s.io/api/apps/v1"
+	batchv1 "k8s.io/api/batch/v1"
+	v1 "k8s.io/api/core/v1"
+	stv1 "k8s.io/api/storage/v1"
+	"k8s.io/client-go/kubernetes"
+)
+
+type KubernetesClusterCacheV2 struct {
+	namespaceStore             *GenericStore[*v1.Namespace, *Namespace]
+	nodeStore                  *GenericStore[*v1.Node, *Node]
+	podStore                   *GenericStore[*v1.Pod, *Pod]
+	serviceStore               *GenericStore[*v1.Service, *Service]
+	daemonSetStore             *GenericStore[*appsv1.DaemonSet, *DaemonSet]
+	deploymentStore            *GenericStore[*appsv1.Deployment, *Deployment]
+	statefulSetStore           *GenericStore[*appsv1.StatefulSet, *StatefulSet]
+	persistentVolumeStore      *GenericStore[*v1.PersistentVolume, *PersistentVolume]
+	persistentVolumeClaimStore *GenericStore[*v1.PersistentVolumeClaim, *PersistentVolumeClaim]
+	storageClassStore          *GenericStore[*stv1.StorageClass, *StorageClass]
+	jobStore                   *GenericStore[*batchv1.Job, *Job]
+}
+
+func NewKubernetesClusterCache2(clientset kubernetes.Interface) *KubernetesClusterCacheV2 {
+	ctx := context.TODO()
+	return &KubernetesClusterCacheV2{
+		namespaceStore:             CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "namespaces", transformNamespace),
+		nodeStore:                  CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "nodes", transformNode),
+		podStore:                   CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "pods", transformPod),
+		serviceStore:               CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "services", transformService),
+		daemonSetStore:             CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "daemonsets", transformDaemonSet),
+		deploymentStore:            CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "deployments", transformDeployment),
+		statefulSetStore:           CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "statefulsets", transformStatefulSet),
+		persistentVolumeStore:      CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "persistentvolumes", transformPersistentVolume),
+		persistentVolumeClaimStore: CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "persistentvolumeclaims", transformPersistentVolumeClaim),
+		storageClassStore:          CreateStoreAndWatch(ctx, clientset.StorageV1().RESTClient(), "storageclasses", transformStorageClass),
+		jobStore:                   CreateStoreAndWatch(ctx, clientset.BatchV1().RESTClient(), "jobs", transformJob),
+	}
+}
+
+func (kcc *KubernetesClusterCacheV2) Run() {
+}
+
+func (kcc *KubernetesClusterCacheV2) Stop() {
+}
+
+func (kcc *KubernetesClusterCacheV2) SetConfigMapUpdateFunc(f func(interface{})) {}
+
+func (kcc *KubernetesClusterCacheV2) GetAllNamespaces() []*Namespace {
+	return kcc.namespaceStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllNodes() []*Node {
+	return kcc.nodeStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllPods() []*Pod {
+	return kcc.podStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllServices() []*Service {
+	return kcc.serviceStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllDaemonSets() []*DaemonSet {
+	return kcc.daemonSetStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllDeployments() []*Deployment {
+	return kcc.deploymentStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllStatefulSets() []*StatefulSet {
+	return kcc.statefulSetStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllPersistentVolumes() []*PersistentVolume {
+	return kcc.persistentVolumeStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
+	return kcc.persistentVolumeClaimStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllStorageClasses() []*StorageClass {
+	return kcc.storageClassStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllJobs() []*Job {
+	return kcc.jobStore.GetAll()
+}

+ 121 - 0
pkg/clustercache/store.go

@@ -0,0 +1,121 @@
+package clustercache
+
+import (
+	"context"
+	"sync"
+
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/apimachinery/pkg/api/meta"
+	"k8s.io/apimachinery/pkg/fields"
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/cache"
+)
+
+// GenericStore is a generic store implementation. It converts objects to a different type using a transform function.
+// The main purpose is to reduce a memory footprint by storing only the necessary data.
+type GenericStore[Input any, Output any] struct {
+	mutex         sync.RWMutex
+	items         map[types.UID]Output
+	transformFunc func(input Input) Output
+}
+
+// NewGenericStore creates a new instance of GenericStore.
+func NewGenericStore[Input any, Output any](transformFunc func(input Input) Output) *GenericStore[Input, Output] {
+	return &GenericStore[Input, Output]{
+		items:         make(map[types.UID]Output),
+		transformFunc: transformFunc,
+	}
+}
+
+func CreateStoreAndWatch[Input any, Output any](
+	ctx context.Context,
+	restClient rest.Interface,
+	resource string,
+	transformFunc func(input Input) Output,
+) *GenericStore[Input, Output] {
+	lw := cache.NewListWatchFromClient(restClient, resource, v1.NamespaceAll, fields.Everything())
+	store := &GenericStore[Input, Output]{
+		items:         make(map[types.UID]Output),
+		transformFunc: transformFunc,
+	}
+	var zeroValue Input
+	reflector := cache.NewReflector(lw, zeroValue, store, 0)
+	go reflector.Run(ctx.Done())
+	return store
+}
+
+// Add inserts an object into the store.
+func (s *GenericStore[Input, Output]) Add(obj any) error {
+	return s.Update(obj)
+}
+
+// Update updates the existing entry in the store.
+func (s *GenericStore[Input, Output]) Update(obj any) error {
+	// The 'meta.Accessor' function can be used for types implementing 'metav1.Object'.
+	o, err := meta.Accessor(obj)
+	if err != nil {
+		return err
+	}
+	uid := o.GetUID()
+
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	s.items[uid] = s.transformFunc(obj.(Input))
+
+	return nil
+}
+
+// Delete removes an object from the store.
+func (s *GenericStore[Input, Output]) Delete(obj any) error {
+	o, err := meta.Accessor(obj)
+	if err != nil {
+		return err
+	}
+
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	delete(s.items, o.GetUID())
+
+	return nil
+}
+
+// GetAll returns all stored objects.
+func (s *GenericStore[Input, Output]) GetAll() []Output {
+	s.mutex.RLock()
+	defer s.mutex.RUnlock()
+	allItems := make([]Output, 0, len(s.items))
+	for _, item := range s.items {
+		allItems = append(allItems, item)
+	}
+	return allItems
+}
+
+// Replace replaces the current list of items in the store.
+func (s *GenericStore[Input, Output]) Replace(list []any, _ string) error {
+	s.mutex.Lock()
+	s.items = make(map[types.UID]Output, len(list))
+	s.mutex.Unlock()
+
+	for _, o := range list {
+		err := s.Add(o)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// Stubs to satisfy the cache.Store interface
+func (s *GenericStore[Input, Output]) List() []interface{} { return nil }
+func (s *GenericStore[Input, Output]) ListKeys() []string  { return nil }
+func (s *GenericStore[Input, Output]) Get(_ interface{}) (item interface{}, exists bool, err error) {
+	return nil, false, nil
+}
+func (s *GenericStore[Input, Output]) GetByKey(_ string) (item interface{}, exists bool, err error) {
+	return nil, false, nil
+}
+func (s *GenericStore[Input, Output]) Resync() error { return nil }