Просмотр исходного кода

Add ResourceQuota to ClusterCache (#3274)

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Co-authored-by: Niko Kovacevic <nikovacevic@gmail.com>
Matt Bolt 6 месяцев назад
Родитель
Сommit
111cd37677

+ 19 - 0
core/pkg/clustercache/clustercache.go

@@ -160,6 +160,13 @@ type ReplicaSet struct {
 	Spec            appsv1.ReplicaSetSpec
 }
 
+type ResourceQuota struct {
+	Name      string
+	Namespace string
+	Spec      v1.ResourceQuotaSpec
+	Status    v1.ResourceQuotaStatus
+}
+
 type Volume struct {
 }
 
@@ -381,6 +388,15 @@ func TransformReplicaSet(input *appsv1.ReplicaSet) *ReplicaSet {
 	}
 }
 
+func TransformResourceQuota(input *v1.ResourceQuota) *ResourceQuota {
+	return &ResourceQuota{
+		Name:      input.Name,
+		Namespace: input.Namespace,
+		Spec:      input.Spec,
+		Status:    input.Status,
+	}
+}
+
 // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
 // up to date resources using watchers
 type ClusterCache interface {
@@ -431,4 +447,7 @@ type ClusterCache interface {
 
 	// GetAllReplicationControllers returns all cached replication controllers
 	GetAllReplicationControllers() []*ReplicationController
+
+	// GetAllResourceQuotas returns all cached resource quotas
+	GetAllResourceQuotas() []*ResourceQuota
 }

+ 4 - 0
core/pkg/nodestats/nodes_test.go

@@ -133,3 +133,7 @@ func (tcc *NodesOnlyClusterCache) GetAllPodDisruptionBudgets() []*clustercache.P
 func (tcc *NodesOnlyClusterCache) GetAllReplicationControllers() []*clustercache.ReplicationController {
 	return nil
 }
+
+func (tcc *NodesOnlyClusterCache) GetAllResourceQuotas() []*clustercache.ResourceQuota {
+	return nil
+}

+ 15 - 1
pkg/clustercache/clustercache.go

@@ -34,6 +34,7 @@ type KubernetesClusterCache struct {
 	jobsWatch                  WatchController
 	pdbWatch                   WatchController
 	replicationControllerWatch WatchController
+	resourceQuotasWatch        WatchController
 	stop                       chan struct{}
 }
 
@@ -75,13 +76,14 @@ func NewKubernetesClusterCacheV1(client kubernetes.Interface) cc.ClusterCache {
 		jobsWatch:                  NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
 		pdbWatch:                   NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
 		replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
+		resourceQuotasWatch:        NewCachingWatcher(coreRestClient, "resourcequotas", &v1.ResourceQuota{}, "", fields.Everything()),
 	}
 
 	// Wait for each caching watcher to initialize
 	cancel := make(chan struct{})
 	var wg sync.WaitGroup
 	if env.HasKubernetesResourceAccess() {
-		wg.Add(14)
+		wg.Add(15)
 		go initializeCache(kcc.namespaceWatch, &wg, cancel)
 		go initializeCache(kcc.nodeWatch, &wg, cancel)
 		go initializeCache(kcc.podWatch, &wg, cancel)
@@ -96,6 +98,7 @@ func NewKubernetesClusterCacheV1(client kubernetes.Interface) cc.ClusterCache {
 		go initializeCache(kcc.jobsWatch, &wg, cancel)
 		go initializeCache(kcc.pdbWatch, &wg, cancel)
 		go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
+		go initializeCache(kcc.resourceQuotasWatch, &wg, cancel)
 	}
 
 	wg.Wait()
@@ -125,6 +128,7 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.jobsWatch.Run(1, stopCh)
 	go kcc.pdbWatch.Run(1, stopCh)
 	go kcc.replicationControllerWatch.Run(1, stopCh)
+	go kcc.resourceQuotasWatch.Run(1, stopCh)
 
 	kcc.stop = stopCh
 }
@@ -263,3 +267,13 @@ func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*cc.Replicat
 	}
 	return rcs
 }
+
+// GetAllResourceQuotas returns all cached resource quotas
+func (kcc *KubernetesClusterCache) GetAllResourceQuotas() []*cc.ResourceQuota {
+	var rqs []*cc.ResourceQuota
+	items := kcc.resourceQuotasWatch.GetAll()
+	for _, rq := range items {
+		rqs = append(rqs, cc.TransformResourceQuota(rq.(*v1.ResourceQuota)))
+	}
+	return rqs
+}

+ 8 - 1
pkg/clustercache/clustercache2.go

@@ -28,6 +28,7 @@ type KubernetesClusterCacheV2 struct {
 	replicationControllerStore *GenericStore[*v1.ReplicationController, *cc.ReplicationController]
 	replicaSetStore            *GenericStore[*appsv1.ReplicaSet, *cc.ReplicaSet]
 	pdbStore                   *GenericStore[*policyv1.PodDisruptionBudget, *cc.PodDisruptionBudget]
+	resourceQuotasStore        *GenericStore[*v1.ResourceQuota, *cc.ResourceQuota]
 	stopCh                     chan struct{}
 }
 
@@ -47,6 +48,7 @@ func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClus
 		storageClassStore:          CreateStore(clientset.StorageV1().RESTClient(), "storageclasses", cc.TransformStorageClass),
 		jobStore:                   CreateStore(clientset.BatchV1().RESTClient(), "jobs", cc.TransformJob),
 		pdbStore:                   CreateStore(clientset.PolicyV1().RESTClient(), "poddisruptionbudgets", cc.TransformPodDisruptionBudget),
+		resourceQuotasStore:        CreateStore(clientset.CoreV1().RESTClient(), "resourcequotas", cc.TransformResourceQuota),
 		stopCh:                     make(chan struct{}),
 	}
 }
@@ -54,8 +56,8 @@ func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClus
 func (kcc *KubernetesClusterCacheV2) Run() {
 	var wg sync.WaitGroup
 
-	wg.Add(14)
 	if env.HasKubernetesResourceAccess() {
+		wg.Add(15)
 		kcc.namespaceStore.Watch(kcc.stopCh, wg.Done)
 		kcc.nodeStore.Watch(kcc.stopCh, wg.Done)
 		kcc.persistentVolumeClaimStore.Watch(kcc.stopCh, wg.Done)
@@ -70,6 +72,7 @@ func (kcc *KubernetesClusterCacheV2) Run() {
 		kcc.storageClassStore.Watch(kcc.stopCh, wg.Done)
 		kcc.jobStore.Watch(kcc.stopCh, wg.Done)
 		kcc.pdbStore.Watch(kcc.stopCh, wg.Done)
+		kcc.resourceQuotasStore.Watch(kcc.stopCh, wg.Done)
 	}
 	wg.Wait()
 }
@@ -137,3 +140,7 @@ func (kcc *KubernetesClusterCacheV2) GetAllReplicaSets() []*cc.ReplicaSet {
 func (kcc *KubernetesClusterCacheV2) GetAllPodDisruptionBudgets() []*cc.PodDisruptionBudget {
 	return kcc.pdbStore.GetAll()
 }
+
+func (kcc *KubernetesClusterCacheV2) GetAllResourceQuotas() []*cc.ResourceQuota {
+	return kcc.resourceQuotasStore.GetAll()
+}

+ 2 - 0
pkg/clustercache/clusterexporter.go

@@ -26,6 +26,7 @@ type clusterEncoding struct {
 	Jobs                   []*cc.Job                   `json:"jobs,omitempty"`
 	PodDisruptionBudgets   []*cc.PodDisruptionBudget   `json:"podDisruptionBudgets,omitempty"`
 	ReplicationControllers []*cc.ReplicationController `json:"replicationController,omitempty"`
+	ResourceQuotas         []*cc.ResourceQuota         `json:"resourceQuotas,omitempty"`
 }
 
 // ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
@@ -97,6 +98,7 @@ func (ce *ClusterExporter) Export() error {
 		Jobs:                   c.GetAllJobs(),
 		PodDisruptionBudgets:   c.GetAllPodDisruptionBudgets(),
 		ReplicationControllers: c.GetAllReplicationControllers(),
+		ResourceQuotas:         c.GetAllResourceQuotas(),
 	}
 
 	data, err := json.Marshal(encoding)

+ 7 - 0
pkg/clustercache/clusterimporter.go

@@ -197,3 +197,10 @@ func (ci *ClusterImporter) GetAllReplicationControllers() []*cc.ReplicationContr
 
 	return slices.Clone(ci.data.ReplicationControllers)
 }
+
+func (ci *ClusterImporter) GetAllResourceQuotas() []*cc.ResourceQuota {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	return slices.Clone(ci.data.ResourceQuotas)
+}