clustercache.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/env"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. autoscaling "k8s.io/api/autoscaling/v2beta1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. stv1 "k8s.io/api/storage/v1"
  11. "k8s.io/apimachinery/pkg/fields"
  12. "k8s.io/client-go/kubernetes"
  13. )
  14. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  15. // up to date resources using watchers
  16. type ClusterCache interface {
  17. // Run starts the watcher processes
  18. Run()
  19. // Stops the watcher processes
  20. Stop()
  21. // Gets the underlying clientset
  22. // TODO: Remove once we support all cached cluster components
  23. GetClient() kubernetes.Interface
  24. // GetAllNamespaces returns all the cached namespaces
  25. GetAllNamespaces() []*v1.Namespace
  26. // GetAllNodes returns all the cached nodes
  27. GetAllNodes() []*v1.Node
  28. // GetAllPods returns all the cached pods
  29. GetAllPods() []*v1.Pod
  30. // GetAllServices returns all the cached services
  31. GetAllServices() []*v1.Service
  32. // GetAllDaemonSets returns all the cached DaemonSets
  33. GetAllDaemonSets() []*appsv1.DaemonSet
  34. // GetAllDeployments returns all the cached deployments
  35. GetAllDeployments() []*appsv1.Deployment
  36. // GetAllStatfulSets returns all the cached StatefulSets
  37. GetAllStatefulSets() []*appsv1.StatefulSet
  38. // GetAllReplicaSets returns all the cached ReplicaSets
  39. GetAllReplicaSets() []*appsv1.ReplicaSet
  40. // GetAllPersistentVolumes returns all the cached persistent volumes
  41. GetAllPersistentVolumes() []*v1.PersistentVolume
  42. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  43. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  44. // GetAllStorageClasses returns all the cached storage classes
  45. GetAllStorageClasses() []*stv1.StorageClass
  46. // GetAllJobs returns all the cached jobs
  47. GetAllJobs() []*batchv1.Job
  48. // GetAllHorizontalPodAutoscalers() returns all cached horizontal pod autoscalers
  49. GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler
  50. // SetConfigMapUpdateFunc sets the configmap update function
  51. SetConfigMapUpdateFunc(func(interface{}))
  52. }
  53. // KubernetesClusterCache is the implementation of ClusterCache
  54. type KubernetesClusterCache struct {
  55. client kubernetes.Interface
  56. namespaceWatch WatchController
  57. nodeWatch WatchController
  58. podWatch WatchController
  59. kubecostConfigMapWatch WatchController
  60. serviceWatch WatchController
  61. daemonsetsWatch WatchController
  62. deploymentsWatch WatchController
  63. statefulsetWatch WatchController
  64. replicasetWatch WatchController
  65. pvWatch WatchController
  66. pvcWatch WatchController
  67. storageClassWatch WatchController
  68. jobsWatch WatchController
  69. hpaWatch WatchController
  70. stop chan struct{}
  71. }
  72. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  73. defer wg.Done()
  74. wc.WarmUp(cancel)
  75. }
  76. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  77. coreRestClient := client.CoreV1().RESTClient()
  78. appsRestClient := client.AppsV1().RESTClient()
  79. storageRestClient := client.StorageV1().RESTClient()
  80. batchClient := client.BatchV1().RESTClient()
  81. autoscalingClient := client.AutoscalingV2beta1().RESTClient()
  82. kubecostNamespace := env.GetKubecostNamespace()
  83. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  84. kcc := &KubernetesClusterCache{
  85. client: client,
  86. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  87. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  88. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  89. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  90. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  91. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  92. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  93. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  94. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  95. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  96. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  97. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  98. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  99. hpaWatch: NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
  100. }
  101. // Wait for each caching watcher to initialize
  102. var wg sync.WaitGroup
  103. wg.Add(14)
  104. cancel := make(chan struct{})
  105. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  106. go initializeCache(kcc.nodeWatch, &wg, cancel)
  107. go initializeCache(kcc.podWatch, &wg, cancel)
  108. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  109. go initializeCache(kcc.serviceWatch, &wg, cancel)
  110. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  111. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  112. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  113. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  114. go initializeCache(kcc.pvWatch, &wg, cancel)
  115. go initializeCache(kcc.pvcWatch, &wg, cancel)
  116. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  117. go initializeCache(kcc.jobsWatch, &wg, cancel)
  118. go initializeCache(kcc.hpaWatch, &wg, cancel)
  119. wg.Wait()
  120. return kcc
  121. }
  122. func (kcc *KubernetesClusterCache) Run() {
  123. if kcc.stop != nil {
  124. return
  125. }
  126. stopCh := make(chan struct{})
  127. go kcc.namespaceWatch.Run(1, stopCh)
  128. go kcc.nodeWatch.Run(1, stopCh)
  129. go kcc.podWatch.Run(1, stopCh)
  130. go kcc.serviceWatch.Run(1, stopCh)
  131. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  132. go kcc.daemonsetsWatch.Run(1, stopCh)
  133. go kcc.deploymentsWatch.Run(1, stopCh)
  134. go kcc.statefulsetWatch.Run(1, stopCh)
  135. go kcc.replicasetWatch.Run(1, stopCh)
  136. go kcc.pvWatch.Run(1, stopCh)
  137. go kcc.pvcWatch.Run(1, stopCh)
  138. go kcc.storageClassWatch.Run(1, stopCh)
  139. go kcc.jobsWatch.Run(1, stopCh)
  140. go kcc.hpaWatch.Run(1, stopCh)
  141. kcc.stop = stopCh
  142. }
  143. func (kcc *KubernetesClusterCache) Stop() {
  144. if kcc.stop == nil {
  145. return
  146. }
  147. close(kcc.stop)
  148. kcc.stop = nil
  149. }
  150. func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
  151. return kcc.client
  152. }
  153. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  154. var namespaces []*v1.Namespace
  155. items := kcc.namespaceWatch.GetAll()
  156. for _, ns := range items {
  157. namespaces = append(namespaces, ns.(*v1.Namespace))
  158. }
  159. return namespaces
  160. }
  161. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  162. var nodes []*v1.Node
  163. items := kcc.nodeWatch.GetAll()
  164. for _, node := range items {
  165. nodes = append(nodes, node.(*v1.Node))
  166. }
  167. return nodes
  168. }
  169. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  170. var pods []*v1.Pod
  171. items := kcc.podWatch.GetAll()
  172. for _, pod := range items {
  173. pods = append(pods, pod.(*v1.Pod))
  174. }
  175. return pods
  176. }
  177. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  178. var services []*v1.Service
  179. items := kcc.serviceWatch.GetAll()
  180. for _, service := range items {
  181. services = append(services, service.(*v1.Service))
  182. }
  183. return services
  184. }
  185. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  186. var daemonsets []*appsv1.DaemonSet
  187. items := kcc.daemonsetsWatch.GetAll()
  188. for _, daemonset := range items {
  189. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  190. }
  191. return daemonsets
  192. }
  193. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  194. var deployments []*appsv1.Deployment
  195. items := kcc.deploymentsWatch.GetAll()
  196. for _, deployment := range items {
  197. deployments = append(deployments, deployment.(*appsv1.Deployment))
  198. }
  199. return deployments
  200. }
  201. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  202. var statefulsets []*appsv1.StatefulSet
  203. items := kcc.statefulsetWatch.GetAll()
  204. for _, statefulset := range items {
  205. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  206. }
  207. return statefulsets
  208. }
  209. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  210. var replicasets []*appsv1.ReplicaSet
  211. items := kcc.replicasetWatch.GetAll()
  212. for _, replicaset := range items {
  213. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  214. }
  215. return replicasets
  216. }
  217. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  218. var pvs []*v1.PersistentVolume
  219. items := kcc.pvWatch.GetAll()
  220. for _, pv := range items {
  221. pvs = append(pvs, pv.(*v1.PersistentVolume))
  222. }
  223. return pvs
  224. }
  225. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  226. var pvcs []*v1.PersistentVolumeClaim
  227. items := kcc.pvcWatch.GetAll()
  228. for _, pvc := range items {
  229. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  230. }
  231. return pvcs
  232. }
  233. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  234. var storageClasses []*stv1.StorageClass
  235. items := kcc.storageClassWatch.GetAll()
  236. for _, stc := range items {
  237. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  238. }
  239. return storageClasses
  240. }
  241. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  242. var jobs []*batchv1.Job
  243. items := kcc.jobsWatch.GetAll()
  244. for _, job := range items {
  245. jobs = append(jobs, job.(*batchv1.Job))
  246. }
  247. return jobs
  248. }
  249. func (kcc *KubernetesClusterCache) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
  250. var hpas []*autoscaling.HorizontalPodAutoscaler
  251. items := kcc.hpaWatch.GetAll()
  252. for _, hpa := range items {
  253. hpas = append(hpas, hpa.(*autoscaling.HorizontalPodAutoscaler))
  254. }
  255. return hpas
  256. }
  257. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  258. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  259. }