Ajay Tripathy пре 4 година
родитељ
комит
a8a567f6a7

+ 43 - 34
pkg/clustercache/clustercache.go

@@ -78,22 +78,23 @@ type ClusterCache interface {
 type KubernetesClusterCache struct {
 	client kubernetes.Interface
 
-	namespaceWatch         WatchController
-	nodeWatch              WatchController
-	podWatch               WatchController
-	kubecostConfigMapWatch WatchController
-	serviceWatch           WatchController
-	daemonsetsWatch        WatchController
-	deploymentsWatch       WatchController
-	statefulsetWatch       WatchController
-	replicasetWatch        WatchController
-	pvWatch                WatchController
-	pvcWatch               WatchController
-	storageClassWatch      WatchController
-	jobsWatch              WatchController
-	hpaWatch               WatchController
-	pdbWatch               WatchController
-	stop                   chan struct{}
+	namespaceWatch             WatchController
+	nodeWatch                  WatchController
+	podWatch                   WatchController
+	kubecostConfigMapWatch     WatchController
+	serviceWatch               WatchController
+	daemonsetsWatch            WatchController
+	deploymentsWatch           WatchController
+	statefulsetWatch           WatchController
+	replicasetWatch            WatchController
+	pvWatch                    WatchController
+	pvcWatch                   WatchController
+	storageClassWatch          WatchController
+	jobsWatch                  WatchController
+	hpaWatch                   WatchController
+	pdbWatch                   WatchController
+	replicationControllerWatch WatchController
+	stop                       chan struct{}
 }
 
 func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
@@ -113,27 +114,28 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	klog.Infof("NAMESPACE: %s", kubecostNamespace)
 
 	kcc := &KubernetesClusterCache{
-		client:                 client,
-		namespaceWatch:         NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
-		nodeWatch:              NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
-		podWatch:               NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
-		kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
-		serviceWatch:           NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
-		daemonsetsWatch:        NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
-		deploymentsWatch:       NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
-		statefulsetWatch:       NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
-		replicasetWatch:        NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
-		pvWatch:                NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
-		pvcWatch:               NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
-		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
-		jobsWatch:              NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
-		hpaWatch:               NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
-		pdbWatch:               NewCachingWatcher(pdbClient, "poddisruptionbudgets", &v1beta1.PodDisruptionBudget{}, "", fields.Everything()),
+		client:                     client,
+		namespaceWatch:             NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
+		nodeWatch:                  NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
+		podWatch:                   NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
+		kubecostConfigMapWatch:     NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
+		serviceWatch:               NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
+		daemonsetsWatch:            NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
+		deploymentsWatch:           NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
+		statefulsetWatch:           NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
+		replicasetWatch:            NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
+		pvWatch:                    NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
+		pvcWatch:                   NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
+		storageClassWatch:          NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
+		jobsWatch:                  NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
+		hpaWatch:                   NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
+		pdbWatch:                   NewCachingWatcher(pdbClient, "poddisruptionbudgets", &v1beta1.PodDisruptionBudget{}, "", fields.Everything()),
+		replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
 	}
 
 	// Wait for each caching watcher to initialize
 	var wg sync.WaitGroup
-	wg.Add(15)
+	wg.Add(16)
 
 	cancel := make(chan struct{})
 
@@ -152,9 +154,12 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	go initializeCache(kcc.jobsWatch, &wg, cancel)
 	go initializeCache(kcc.hpaWatch, &wg, cancel)
 	go initializeCache(kcc.podWatch, &wg, cancel)
+	go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
 
 	wg.Wait()
 
+	klog.Infof("Done waiting")
+
 	return kcc
 }
 
@@ -179,6 +184,7 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.jobsWatch.Run(1, stopCh)
 	go kcc.hpaWatch.Run(1, stopCh)
 	go kcc.pdbWatch.Run(1, stopCh)
+	go kcc.replicationControllerWatch.Run(1, stopCh)
 
 	kcc.stop = stopCh
 }
@@ -320,7 +326,10 @@ func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*v1beta1.PodDi
 
 func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
 	var rcs []*v1.ReplicationController
-	//TODO: Implement
+	items := kcc.replicationControllerWatch.GetAll()
+	for _, rc := range items {
+		rcs = append(rcs, rc.(*v1.ReplicationController))
+	}
 	return rcs
 }
 

+ 1 - 0
pkg/clustercache/clusterexporter.go

@@ -32,6 +32,7 @@ type clusterEncoding struct {
 	Jobs                     []*batchv1.Job                         `json:"jobs,omitempty"`
 	HorizontalPodAutoscalers []*autoscaling.HorizontalPodAutoscaler `json:"horizontalPodAutoscalers,omitempty"`
 	PodDisruptionBudgets     []*v1beta1.PodDisruptionBudget         `json:"podDisruptionBudgets,omitEmpty"`
+	ReplicationControllers   []*v1.ReplicationController            `json:"replicationController,omitEmpty"`
 }
 
 // ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.

+ 11 - 3
pkg/clustercache/clusterimporter.go

@@ -302,9 +302,17 @@ func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruption
 }
 
 func (ci *ClusterImporter) GetAllReplicationControllers() []*v1.ReplicationController {
-	var rcs []*v1.ReplicationController
-	//TODO: Implement
-	return rcs
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimics the behavior of the default cluster cache impl.
+	rcs := ci.data.ReplicationControllers
+	cloneList := make([]*v1.ReplicationController, 0, len(rcs))
+	for _, v := range rcs {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
 }
 
 // SetConfigMapUpdateFunc sets the configmap update function