Ver Fonte

Add DaemonSets and ReplicaSets to clustercache watcher; WIP jobs;

Niko Kovacevic há 6 anos atrás
pai
commit
578ca1f43b
1 ficheiros alterados com 57 adições e 7 exclusões
  1. 57 7
      pkg/clustercache/clustercache.go

+ 57 - 7
pkg/clustercache/clustercache.go

@@ -7,6 +7,7 @@ import (
 	"k8s.io/klog"
 
 	appsv1 "k8s.io/api/apps/v1"
+	// batchv1 "k8s.io/api/batch/v1"
 	v1 "k8s.io/api/core/v1"
 	stv1 "k8s.io/api/storage/v1"
 	"k8s.io/apimachinery/pkg/fields"
@@ -38,12 +39,21 @@ type ClusterCache interface {
 	// GetAllServices returns all the cached services
 	GetAllServices() []*v1.Service
 
+	// GetAllDaemonSets returns all the cached DaemonSets
+	GetAllDaemonSets() []*appsv1.DaemonSet
+
 	// GetAllDeployments returns all the cached deployments
 	GetAllDeployments() []*appsv1.Deployment
 
-	// GetAllDeployments returns all the cached deployments
+	// GetAllStatfulSets returns all the cached StatefulSets
 	GetAllStatefulSets() []*appsv1.StatefulSet
 
+	// GetAllReplicaSets returns all the cached ReplicaSets
+	GetAllReplicaSets() []*appsv1.ReplicaSet
+
+	// GetAllJobs returns all the cached Jobs
+	// GetAllJobs() []*batchv1.Job
+
 	// GetAllPersistentVolumes returns all the cached persistent volumes
 	GetAllPersistentVolumes() []*v1.PersistentVolume
 
@@ -63,11 +73,14 @@ type KubernetesClusterCache struct {
 	podWatch               WatchController
 	kubecostConfigMapWatch WatchController
 	serviceWatch           WatchController
+	daemonsetsWatch        WatchController
 	deploymentsWatch       WatchController
 	statefulsetWatch       WatchController
-	pvWatch                WatchController
-	storageClassWatch      WatchController
-	stop                   chan struct{}
+	replicasetWatch        WatchController
+	// jobWatch               WatchController
+	pvWatch           WatchController
+	storageClassWatch WatchController
+	stop              chan struct{}
 }
 
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
@@ -90,15 +103,19 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 		podWatch:               NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
 		kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
 		serviceWatch:           NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		daemonsetsWatch:        NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
 		deploymentsWatch:       NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
 		statefulsetWatch:       NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
-		pvWatch:                NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
-		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+		replicasetWatch:        NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
+		// jobWatch:               NewCachingWatcher(appsRestClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
+		pvWatch:           NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
 	}
 
 	// Wait for each caching watcher to initialize
 	var wg sync.WaitGroup
-	wg.Add(9)
+	// wg.Add(12)
+	wg.Add(11)
 
 	cancel := make(chan struct{})
 
@@ -107,8 +124,11 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	go initializeCache(kcc.podWatch, &wg, cancel)
 	go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
 	go initializeCache(kcc.serviceWatch, &wg, cancel)
+	go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
 	go initializeCache(kcc.deploymentsWatch, &wg, cancel)
 	go initializeCache(kcc.statefulsetWatch, &wg, cancel)
+	go initializeCache(kcc.replicasetWatch, &wg, cancel)
+	// go initializeCache(kcc.jobWatch, &wg, cancel)
 	go initializeCache(kcc.pvWatch, &wg, cancel)
 	go initializeCache(kcc.storageClassWatch, &wg, cancel)
 
@@ -128,8 +148,11 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.podWatch.Run(1, stopCh)
 	go kcc.serviceWatch.Run(1, stopCh)
 	go kcc.kubecostConfigMapWatch.Run(1, stopCh)
+	go kcc.daemonsetsWatch.Run(1, stopCh)
 	go kcc.deploymentsWatch.Run(1, stopCh)
 	go kcc.statefulsetWatch.Run(1, stopCh)
+	go kcc.replicasetWatch.Run(1, stopCh)
+	// go kcc.jobWatch.Run(1, stopCh)
 	go kcc.pvWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 
@@ -185,6 +208,15 @@ func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
 	return services
 }
 
+func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
+	var daemonsets []*appsv1.DaemonSet
+	items := kcc.daemonsetsWatch.GetAll()
+	for _, daemonset := range items {
+		daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
+	}
+	return daemonsets
+}
+
 func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
 	var deployments []*appsv1.Deployment
 	items := kcc.deploymentsWatch.GetAll()
@@ -203,6 +235,24 @@ func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
 	return statefulsets
 }
 
+func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
+	var replicasets []*appsv1.ReplicaSet
+	items := kcc.replicasetWatch.GetAll()
+	for _, replicaset := range items {
+		replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
+	}
+	return replicasets
+}
+
+// func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
+// 	var jobs []*batchv1.Job
+// 	items := kcc.jobWatch.GetAll()
+// 	for _, job := range items {
+// 		jobs = append(jobs, job.(*batchv1.Job))
+// 	}
+// 	return jobs
+// }
+
 func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
 	var pvs []*v1.PersistentVolume
 	items := kcc.pvWatch.GetAll()