Selaa lähdekoodia

Add HPAs to cluster cache for Kaelan's incoming metric work.

Matt Bolt 4 vuotta sitten
vanhempi
sitoutus
350c20d740
1 muutettua tiedostoa jossa 19 lisäystä ja 1 poistoa
  1. 19 1
      pkg/clustercache/clustercache.go

+ 19 - 1
pkg/clustercache/clustercache.go

@@ -7,6 +7,7 @@ import (
 	"k8s.io/klog"
 
 	appsv1 "k8s.io/api/apps/v1"
+	autoscaling "k8s.io/api/autoscaling/v2beta1"
 	batchv1 "k8s.io/api/batch/v1"
 	v1 "k8s.io/api/core/v1"
 	stv1 "k8s.io/api/storage/v1"
@@ -63,6 +64,9 @@ type ClusterCache interface {
 	// GetAllJobs returns all the cached jobs
 	GetAllJobs() []*batchv1.Job
 
+	// GetAllHorizontalPodAutoscalers() returns all cached horizontal pod autoscalers
+	GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler
+
 	// SetConfigMapUpdateFunc sets the configmap update function
 	SetConfigMapUpdateFunc(func(interface{}))
 }
@@ -84,6 +88,7 @@ type KubernetesClusterCache struct {
 	pvcWatch               WatchController
 	storageClassWatch      WatchController
 	jobsWatch              WatchController
+	hpaWatch               WatchController
 	stop                   chan struct{}
 }
 
@@ -97,6 +102,7 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	appsRestClient := client.AppsV1().RESTClient()
 	storageRestClient := client.StorageV1().RESTClient()
 	batchClient := client.BatchV1().RESTClient()
+	autoscalingClient := client.AutoscalingV2beta1().RESTClient()
 
 	kubecostNamespace := env.GetKubecostNamespace()
 	klog.Infof("NAMESPACE: %s", kubecostNamespace)
@@ -116,11 +122,12 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 		pvcWatch:               NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
 		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
 		jobsWatch:              NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
+		hpaWatch:               NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
 	}
 
 	// Wait for each caching watcher to initialize
 	var wg sync.WaitGroup
-	wg.Add(13)
+	wg.Add(14)
 
 	cancel := make(chan struct{})
 
@@ -137,6 +144,7 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	go initializeCache(kcc.pvcWatch, &wg, cancel)
 	go initializeCache(kcc.storageClassWatch, &wg, cancel)
 	go initializeCache(kcc.jobsWatch, &wg, cancel)
+	go initializeCache(kcc.hpaWatch, &wg, cancel)
 
 	wg.Wait()
 
@@ -162,6 +170,7 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.pvcWatch.Run(1, stopCh)
 	go kcc.storageClassWatch.Run(1, stopCh)
 	go kcc.jobsWatch.Run(1, stopCh)
+	go kcc.hpaWatch.Run(1, stopCh)
 
 	kcc.stop = stopCh
 }
@@ -287,6 +296,15 @@ func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
 	return jobs
 }
 
+func (kcc *KubernetesClusterCache) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
+	var hpas []*autoscaling.HorizontalPodAutoscaler
+	items := kcc.hpaWatch.GetAll()
+	for _, hpa := range items {
+		hpas = append(hpas, hpa.(*autoscaling.HorizontalPodAutoscaler))
+	}
+	return hpas
+}
+
 func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
 	kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
 }