소스 검색

add replicationControllerStore, replicaSetStore, pdbStore

Signed-off-by: r2k1 <yokree@gmail.com>
r2k1 2 년 전
부모
커밋
17b58df28e
2개의 변경된 파일115개의 추가작업 그리고 27개의 파일을 삭제
  1. 96 27
      pkg/clustercache/clustercache.go
  2. 19 0
      pkg/clustercache/clustercache2.go

+ 96 - 27
pkg/clustercache/clustercache.go

@@ -11,6 +11,7 @@ import (
 	appsv1 "k8s.io/api/apps/v1"
 	appsv1 "k8s.io/api/apps/v1"
 	batchv1 "k8s.io/api/batch/v1"
 	batchv1 "k8s.io/api/batch/v1"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
+	policyv1 "k8s.io/api/policy/v1"
 	stv1 "k8s.io/api/storage/v1"
 	stv1 "k8s.io/api/storage/v1"
 	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/kubernetes"
@@ -251,6 +252,24 @@ func transformJob(input *batchv1.Job) *Job {
 	}
 	}
 }
 }
 
 
+type ReplicationController struct{}
+
+func transformReplicationController(input *v1.ReplicationController) *ReplicationController {
+	return &ReplicationController{}
+}
+
+type PodDisruptionBudget struct{}
+
+func transformPodDisruptionBudget(input *policyv1.PodDisruptionBudget) *PodDisruptionBudget {
+	return &PodDisruptionBudget{}
+}
+
+type ReplicaSet struct{}
+
+func transformReplicaSet(input *appsv1.ReplicaSet) *ReplicaSet {
+	return &ReplicaSet{}
+}
+
 // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
 // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
 // up to date resources using watchers
 // up to date resources using watchers
 type ClusterCache interface {
 type ClusterCache interface {
@@ -281,6 +300,9 @@ type ClusterCache interface {
 	// GetAllStatfulSets returns all the cached StatefulSets
 	// GetAllStatfulSets returns all the cached StatefulSets
 	GetAllStatefulSets() []*StatefulSet
 	GetAllStatefulSets() []*StatefulSet
 
 
+	// GetAllReplicaSets returns all the cached ReplicaSets
+	GetAllReplicaSets() []*ReplicaSet
+
 	// GetAllPersistentVolumes returns all the cached persistent volumes
 	// GetAllPersistentVolumes returns all the cached persistent volumes
 	GetAllPersistentVolumes() []*PersistentVolume
 	GetAllPersistentVolumes() []*PersistentVolume
 
 
@@ -292,6 +314,13 @@ type ClusterCache interface {
 
 
 	// GetAllJobs returns all the cached jobs
 	// GetAllJobs returns all the cached jobs
 	GetAllJobs() []*Job
 	GetAllJobs() []*Job
+
+	// GetAllPodDisruptionBudgets returns all cached pod disruption budgets
+	GetAllPodDisruptionBudgets() []*PodDisruptionBudget
+
+	// GetAllReplicationControllers returns all cached replication controllers
+	GetAllReplicationControllers() []*ReplicationController
+
 	// SetConfigMapUpdateFunc sets the configmap update function
 	// SetConfigMapUpdateFunc sets the configmap update function
 	SetConfigMapUpdateFunc(func(interface{}))
 	SetConfigMapUpdateFunc(func(interface{}))
 }
 }
@@ -300,19 +329,22 @@ type ClusterCache interface {
 type KubernetesClusterCache struct {
 type KubernetesClusterCache struct {
 	client kubernetes.Interface
 	client kubernetes.Interface
 
 
-	namespaceWatch         WatchController
-	nodeWatch              WatchController
-	podWatch               WatchController
-	kubecostConfigMapWatch WatchController
-	serviceWatch           WatchController
-	daemonsetsWatch        WatchController
-	deploymentsWatch       WatchController
-	statefulsetWatch       WatchController
-	pvWatch                WatchController
-	pvcWatch               WatchController
-	storageClassWatch      WatchController
-	jobsWatch              WatchController
-	stop                   chan struct{}
+	namespaceWatch             WatchController
+	nodeWatch                  WatchController
+	podWatch                   WatchController
+	kubecostConfigMapWatch     WatchController
+	serviceWatch               WatchController
+	daemonsetsWatch            WatchController
+	deploymentsWatch           WatchController
+	statefulsetWatch           WatchController
+	replicasetWatch            WatchController
+	pvWatch                    WatchController
+	pvcWatch                   WatchController
+	storageClassWatch          WatchController
+	jobsWatch                  WatchController
+	pdbWatch                   WatchController
+	replicationControllerWatch WatchController
+	stop                       chan struct{}
 }
 }
 
 
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
@@ -329,24 +361,28 @@ func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
 	appsRestClient := client.AppsV1().RESTClient()
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 	batchClient := client.BatchV1().RESTClient()
 	batchClient := client.BatchV1().RESTClient()
+	pdbClient := client.PolicyV1().RESTClient()
 
 
 	kubecostNamespace := env.GetKubecostNamespace()
 	kubecostNamespace := env.GetKubecostNamespace()
 	log.Infof("NAMESPACE: %s", kubecostNamespace)
 	log.Infof("NAMESPACE: %s", kubecostNamespace)
 
 
 	kcc := &KubernetesClusterCache{
 	kcc := &KubernetesClusterCache{
-		client:                 client,
-		namespaceWatch:         NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
-		nodeWatch:              NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
-		podWatch:               NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
-		kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
-		serviceWatch:           NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
-		daemonsetsWatch:        NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
-		deploymentsWatch:       NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
-		statefulsetWatch:       NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
-		pvWatch:                NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
-		pvcWatch:               NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
-		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
-		jobsWatch:              NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
+		client:                     client,
+		namespaceWatch:             NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:                  NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:                   NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		kubecostConfigMapWatch:     NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
+		serviceWatch:               NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		daemonsetsWatch:            NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
+		deploymentsWatch:           NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		statefulsetWatch:           NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
+		replicasetWatch:            NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
+		pvWatch:                    NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		pvcWatch:                   NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
+		storageClassWatch:          NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+		jobsWatch:                  NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
+		pdbWatch:                   NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
+		replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
 	}
 	}
 
 
 	// Wait for each caching watcher to initialize
 	// Wait for each caching watcher to initialize
@@ -356,7 +392,7 @@ func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
 		wg.Add(1)
 		wg.Add(1)
 		go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 		go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 	} else {
 	} else {
-		wg.Add(12)
+		wg.Add(15)
 		go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 		go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 		go initializeCache(kcc.namespaceWatch, &wg, cancel)
 		go initializeCache(kcc.namespaceWatch, &wg, cancel)
 		go initializeCache(kcc.nodeWatch, &wg, cancel)
 		go initializeCache(kcc.nodeWatch, &wg, cancel)
@@ -365,10 +401,13 @@ func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
 		go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
 		go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
 		go initializeCache(kcc.deploymentsWatch, &wg, cancel)
 		go initializeCache(kcc.deploymentsWatch, &wg, cancel)
 		go initializeCache(kcc.statefulsetWatch, &wg, cancel)
 		go initializeCache(kcc.statefulsetWatch, &wg, cancel)
+		go initializeCache(kcc.replicasetWatch, &wg, cancel)
 		go initializeCache(kcc.pvWatch, &wg, cancel)
 		go initializeCache(kcc.pvWatch, &wg, cancel)
 		go initializeCache(kcc.pvcWatch, &wg, cancel)
 		go initializeCache(kcc.pvcWatch, &wg, cancel)
 		go initializeCache(kcc.storageClassWatch, &wg, cancel)
 		go initializeCache(kcc.storageClassWatch, &wg, cancel)
 		go initializeCache(kcc.jobsWatch, &wg, cancel)
 		go initializeCache(kcc.jobsWatch, &wg, cancel)
+		go initializeCache(kcc.pdbWatch, &wg, cancel)
+		go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
 	}
 	}
 
 
 	wg.Wait()
 	wg.Wait()
@@ -392,10 +431,13 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.daemonsetsWatch.Run(1, stopCh)
 	go kcc.daemonsetsWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.statefulsetWatch.Run(1, stopCh)
 	go kcc.statefulsetWatch.Run(1, stopCh)
+	go kcc.replicasetWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.pvcWatch.Run(1, stopCh)
 	go kcc.pvcWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 	go kcc.jobsWatch.Run(1, stopCh)
 	go kcc.jobsWatch.Run(1, stopCh)
+	go kcc.pdbWatch.Run(1, stopCh)
+	go kcc.replicationControllerWatch.Run(1, stopCh)
 
 
 	kcc.stop = stopCh
 	kcc.stop = stopCh
 }
 }
@@ -472,6 +514,15 @@ func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
 	return statefulsets
 	return statefulsets
 }
 }
 
 
+func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*ReplicaSet {
+	var replicasets []*ReplicaSet
+	items := kcc.replicasetWatch.GetAll()
+	for _, replicaset := range items {
+		replicasets = append(replicasets, transformReplicaSet(replicaset.(*appsv1.ReplicaSet)))
+	}
+	return replicasets
+}
+
 func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
 func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
 	var pvs []*PersistentVolume
 	var pvs []*PersistentVolume
 	items := kcc.pvWatch.GetAll()
 	items := kcc.pvWatch.GetAll()
@@ -508,6 +559,24 @@ func (kcc *KubernetesClusterCache) GetAllJobs() []*Job {
 	return jobs
 	return jobs
 }
 }
 
 
+func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
+	var pdbs []*PodDisruptionBudget
+	items := kcc.pdbWatch.GetAll()
+	for _, pdb := range items {
+		pdbs = append(pdbs, transformPodDisruptionBudget(pdb.(*policyv1.PodDisruptionBudget)))
+	}
+	return pdbs
+}
+
+func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*ReplicationController {
+	var rcs []*ReplicationController
+	items := kcc.replicationControllerWatch.GetAll()
+	for _, rc := range items {
+		rcs = append(rcs, transformReplicationController(rc.(*v1.ReplicationController)))
+	}
+	return rcs
+}
+
 func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
 func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
 	kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
 	kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
 }
 }

+ 19 - 0
pkg/clustercache/clustercache2.go

@@ -6,6 +6,7 @@ import (
 	appsv1 "k8s.io/api/apps/v1"
 	appsv1 "k8s.io/api/apps/v1"
 	batchv1 "k8s.io/api/batch/v1"
 	batchv1 "k8s.io/api/batch/v1"
 	v1 "k8s.io/api/core/v1"
 	v1 "k8s.io/api/core/v1"
+	policyv1 "k8s.io/api/policy/v1"
 	stv1 "k8s.io/api/storage/v1"
 	stv1 "k8s.io/api/storage/v1"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/kubernetes"
 )
 )
@@ -22,6 +23,9 @@ type KubernetesClusterCacheV2 struct {
 	persistentVolumeClaimStore *GenericStore[*v1.PersistentVolumeClaim, *PersistentVolumeClaim]
 	persistentVolumeClaimStore *GenericStore[*v1.PersistentVolumeClaim, *PersistentVolumeClaim]
 	storageClassStore          *GenericStore[*stv1.StorageClass, *StorageClass]
 	storageClassStore          *GenericStore[*stv1.StorageClass, *StorageClass]
 	jobStore                   *GenericStore[*batchv1.Job, *Job]
 	jobStore                   *GenericStore[*batchv1.Job, *Job]
+	replicationControllerStore *GenericStore[*v1.ReplicationController, *ReplicationController]
+	replicaSetStore            *GenericStore[*appsv1.ReplicaSet, *ReplicaSet]
+	pdbStore                   *GenericStore[*policyv1.PodDisruptionBudget, *PodDisruptionBudget]
 }
 }
 
 
 func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClusterCacheV2 {
 func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClusterCacheV2 {
@@ -31,6 +35,8 @@ func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClus
 		nodeStore:                  CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "nodes", transformNode),
 		nodeStore:                  CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "nodes", transformNode),
 		podStore:                   CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "pods", transformPod),
 		podStore:                   CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "pods", transformPod),
 		serviceStore:               CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "services", transformService),
 		serviceStore:               CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "services", transformService),
+		replicationControllerStore: CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "replicationcontrollers", transformReplicationController),
+		replicaSetStore:            CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "replicasets", transformReplicaSet),
 		daemonSetStore:             CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "daemonsets", transformDaemonSet),
 		daemonSetStore:             CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "daemonsets", transformDaemonSet),
 		deploymentStore:            CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "deployments", transformDeployment),
 		deploymentStore:            CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "deployments", transformDeployment),
 		statefulSetStore:           CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "statefulsets", transformStatefulSet),
 		statefulSetStore:           CreateStoreAndWatch(ctx, clientset.AppsV1().RESTClient(), "statefulsets", transformStatefulSet),
@@ -38,6 +44,7 @@ func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClus
 		persistentVolumeClaimStore: CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "persistentvolumeclaims", transformPersistentVolumeClaim),
 		persistentVolumeClaimStore: CreateStoreAndWatch(ctx, clientset.CoreV1().RESTClient(), "persistentvolumeclaims", transformPersistentVolumeClaim),
 		storageClassStore:          CreateStoreAndWatch(ctx, clientset.StorageV1().RESTClient(), "storageclasses", transformStorageClass),
 		storageClassStore:          CreateStoreAndWatch(ctx, clientset.StorageV1().RESTClient(), "storageclasses", transformStorageClass),
 		jobStore:                   CreateStoreAndWatch(ctx, clientset.BatchV1().RESTClient(), "jobs", transformJob),
 		jobStore:                   CreateStoreAndWatch(ctx, clientset.BatchV1().RESTClient(), "jobs", transformJob),
+		pdbStore:                   CreateStoreAndWatch(ctx, clientset.PolicyV1beta1().RESTClient(), "poddisruptionbudgets", transformPodDisruptionBudget),
 	}
 	}
 }
 }
 
 
@@ -92,3 +99,15 @@ func (kcc *KubernetesClusterCacheV2) GetAllStorageClasses() []*StorageClass {
 func (kcc *KubernetesClusterCacheV2) GetAllJobs() []*Job {
 func (kcc *KubernetesClusterCacheV2) GetAllJobs() []*Job {
 	return kcc.jobStore.GetAll()
 	return kcc.jobStore.GetAll()
 }
 }
+
+func (kcc *KubernetesClusterCacheV2) GetAllReplicationControllers() []*ReplicationController {
+	return kcc.replicationControllerStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllReplicaSets() []*ReplicaSet {
+	return kcc.replicaSetStore.GetAll()
+}
+
+func (kcc *KubernetesClusterCacheV2) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
+	return kcc.pdbStore.GetAll()
+}