clustercache.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/env"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. autoscaling "k8s.io/api/autoscaling/v2beta1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/api/policy/v1beta1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  16. // up to date resources using watchers
  17. type ClusterCache interface {
  18. // Run starts the watcher processes
  19. Run()
  20. // Stops the watcher processes
  21. Stop()
  22. // GetAllNamespaces returns all the cached namespaces
  23. GetAllNamespaces() []*v1.Namespace
  24. // GetAllNodes returns all the cached nodes
  25. GetAllNodes() []*v1.Node
  26. // GetAllPods returns all the cached pods
  27. GetAllPods() []*v1.Pod
  28. // GetAllServices returns all the cached services
  29. GetAllServices() []*v1.Service
  30. // GetAllDaemonSets returns all the cached DaemonSets
  31. GetAllDaemonSets() []*appsv1.DaemonSet
  32. // GetAllDeployments returns all the cached deployments
  33. GetAllDeployments() []*appsv1.Deployment
  34. // GetAllStatfulSets returns all the cached StatefulSets
  35. GetAllStatefulSets() []*appsv1.StatefulSet
  36. // GetAllReplicaSets returns all the cached ReplicaSets
  37. GetAllReplicaSets() []*appsv1.ReplicaSet
  38. // GetAllPersistentVolumes returns all the cached persistent volumes
  39. GetAllPersistentVolumes() []*v1.PersistentVolume
  40. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  41. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  42. // GetAllStorageClasses returns all the cached storage classes
  43. GetAllStorageClasses() []*stv1.StorageClass
  44. // GetAllJobs returns all the cached jobs
  45. GetAllJobs() []*batchv1.Job
  46. // GetAllHorizontalPodAutoscalers returns all cached horizontal pod autoscalers
  47. GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler
  48. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  49. GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget
  50. // SetConfigMapUpdateFunc sets the configmap update function
  51. SetConfigMapUpdateFunc(func(interface{}))
  52. }
  53. // KubernetesClusterCache is the implementation of ClusterCache
  54. type KubernetesClusterCache struct {
  55. client kubernetes.Interface
  56. namespaceWatch WatchController
  57. nodeWatch WatchController
  58. podWatch WatchController
  59. kubecostConfigMapWatch WatchController
  60. serviceWatch WatchController
  61. daemonsetsWatch WatchController
  62. deploymentsWatch WatchController
  63. statefulsetWatch WatchController
  64. replicasetWatch WatchController
  65. pvWatch WatchController
  66. pvcWatch WatchController
  67. storageClassWatch WatchController
  68. jobsWatch WatchController
  69. hpaWatch WatchController
  70. pdbWatch WatchController
  71. stop chan struct{}
  72. }
  73. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  74. defer wg.Done()
  75. wc.WarmUp(cancel)
  76. }
  77. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  78. coreRestClient := client.CoreV1().RESTClient()
  79. appsRestClient := client.AppsV1().RESTClient()
  80. storageRestClient := client.StorageV1().RESTClient()
  81. batchClient := client.BatchV1().RESTClient()
  82. autoscalingClient := client.AutoscalingV2beta1().RESTClient()
  83. pdbClient := client.PolicyV1beta1().RESTClient()
  84. kubecostNamespace := env.GetKubecostNamespace()
  85. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  86. kcc := &KubernetesClusterCache{
  87. client: client,
  88. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  89. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  90. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  91. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  92. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  93. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  94. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  95. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  96. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  97. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  98. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  99. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  100. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  101. hpaWatch: NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
  102. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &v1beta1.PodDisruptionBudget{}, "", fields.Everything()),
  103. }
  104. // Wait for each caching watcher to initialize
  105. var wg sync.WaitGroup
  106. wg.Add(15)
  107. cancel := make(chan struct{})
  108. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  109. go initializeCache(kcc.nodeWatch, &wg, cancel)
  110. go initializeCache(kcc.podWatch, &wg, cancel)
  111. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  112. go initializeCache(kcc.serviceWatch, &wg, cancel)
  113. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  114. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  115. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  116. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  117. go initializeCache(kcc.pvWatch, &wg, cancel)
  118. go initializeCache(kcc.pvcWatch, &wg, cancel)
  119. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  120. go initializeCache(kcc.jobsWatch, &wg, cancel)
  121. go initializeCache(kcc.hpaWatch, &wg, cancel)
  122. go initializeCache(kcc.podWatch, &wg, cancel)
  123. wg.Wait()
  124. return kcc
  125. }
  126. func (kcc *KubernetesClusterCache) Run() {
  127. if kcc.stop != nil {
  128. return
  129. }
  130. stopCh := make(chan struct{})
  131. go kcc.namespaceWatch.Run(1, stopCh)
  132. go kcc.nodeWatch.Run(1, stopCh)
  133. go kcc.podWatch.Run(1, stopCh)
  134. go kcc.serviceWatch.Run(1, stopCh)
  135. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  136. go kcc.daemonsetsWatch.Run(1, stopCh)
  137. go kcc.deploymentsWatch.Run(1, stopCh)
  138. go kcc.statefulsetWatch.Run(1, stopCh)
  139. go kcc.replicasetWatch.Run(1, stopCh)
  140. go kcc.pvWatch.Run(1, stopCh)
  141. go kcc.pvcWatch.Run(1, stopCh)
  142. go kcc.storageClassWatch.Run(1, stopCh)
  143. go kcc.jobsWatch.Run(1, stopCh)
  144. go kcc.hpaWatch.Run(1, stopCh)
  145. go kcc.pdbWatch.Run(1, stopCh)
  146. kcc.stop = stopCh
  147. }
  148. func (kcc *KubernetesClusterCache) Stop() {
  149. if kcc.stop == nil {
  150. return
  151. }
  152. close(kcc.stop)
  153. kcc.stop = nil
  154. }
  155. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  156. var namespaces []*v1.Namespace
  157. items := kcc.namespaceWatch.GetAll()
  158. for _, ns := range items {
  159. namespaces = append(namespaces, ns.(*v1.Namespace))
  160. }
  161. return namespaces
  162. }
  163. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  164. var nodes []*v1.Node
  165. items := kcc.nodeWatch.GetAll()
  166. for _, node := range items {
  167. nodes = append(nodes, node.(*v1.Node))
  168. }
  169. return nodes
  170. }
  171. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  172. var pods []*v1.Pod
  173. items := kcc.podWatch.GetAll()
  174. for _, pod := range items {
  175. pods = append(pods, pod.(*v1.Pod))
  176. }
  177. return pods
  178. }
  179. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  180. var services []*v1.Service
  181. items := kcc.serviceWatch.GetAll()
  182. for _, service := range items {
  183. services = append(services, service.(*v1.Service))
  184. }
  185. return services
  186. }
  187. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  188. var daemonsets []*appsv1.DaemonSet
  189. items := kcc.daemonsetsWatch.GetAll()
  190. for _, daemonset := range items {
  191. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  192. }
  193. return daemonsets
  194. }
  195. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  196. var deployments []*appsv1.Deployment
  197. items := kcc.deploymentsWatch.GetAll()
  198. for _, deployment := range items {
  199. deployments = append(deployments, deployment.(*appsv1.Deployment))
  200. }
  201. return deployments
  202. }
  203. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  204. var statefulsets []*appsv1.StatefulSet
  205. items := kcc.statefulsetWatch.GetAll()
  206. for _, statefulset := range items {
  207. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  208. }
  209. return statefulsets
  210. }
  211. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  212. var replicasets []*appsv1.ReplicaSet
  213. items := kcc.replicasetWatch.GetAll()
  214. for _, replicaset := range items {
  215. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  216. }
  217. return replicasets
  218. }
  219. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  220. var pvs []*v1.PersistentVolume
  221. items := kcc.pvWatch.GetAll()
  222. for _, pv := range items {
  223. pvs = append(pvs, pv.(*v1.PersistentVolume))
  224. }
  225. return pvs
  226. }
  227. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  228. var pvcs []*v1.PersistentVolumeClaim
  229. items := kcc.pvcWatch.GetAll()
  230. for _, pvc := range items {
  231. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  232. }
  233. return pvcs
  234. }
  235. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  236. var storageClasses []*stv1.StorageClass
  237. items := kcc.storageClassWatch.GetAll()
  238. for _, stc := range items {
  239. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  240. }
  241. return storageClasses
  242. }
  243. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  244. var jobs []*batchv1.Job
  245. items := kcc.jobsWatch.GetAll()
  246. for _, job := range items {
  247. jobs = append(jobs, job.(*batchv1.Job))
  248. }
  249. return jobs
  250. }
  251. func (kcc *KubernetesClusterCache) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
  252. var hpas []*autoscaling.HorizontalPodAutoscaler
  253. items := kcc.hpaWatch.GetAll()
  254. for _, hpa := range items {
  255. hpas = append(hpas, hpa.(*autoscaling.HorizontalPodAutoscaler))
  256. }
  257. return hpas
  258. }
  259. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
  260. var pdbs []*v1beta1.PodDisruptionBudget
  261. items := kcc.pdbWatch.GetAll()
  262. for _, pdb := range items {
  263. pdbs = append(pdbs, pdb.(*v1beta1.PodDisruptionBudget))
  264. }
  265. return pdbs
  266. }
  267. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  268. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  269. }