clustercache.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/env"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. autoscaling "k8s.io/api/autoscaling/v2beta1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/api/policy/v1beta1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  16. // up to date resources using watchers
  17. type ClusterCache interface {
  18. // Run starts the watcher processes
  19. Run()
  20. // Stops the watcher processes
  21. Stop()
  22. // GetAllNamespaces returns all the cached namespaces
  23. GetAllNamespaces() []*v1.Namespace
  24. // GetAllNodes returns all the cached nodes
  25. GetAllNodes() []*v1.Node
  26. // GetAllPods returns all the cached pods
  27. GetAllPods() []*v1.Pod
  28. // GetAllServices returns all the cached services
  29. GetAllServices() []*v1.Service
  30. // GetAllDaemonSets returns all the cached DaemonSets
  31. GetAllDaemonSets() []*appsv1.DaemonSet
  32. // GetAllDeployments returns all the cached deployments
  33. GetAllDeployments() []*appsv1.Deployment
  34. // GetAllStatfulSets returns all the cached StatefulSets
  35. GetAllStatefulSets() []*appsv1.StatefulSet
  36. // GetAllReplicaSets returns all the cached ReplicaSets
  37. GetAllReplicaSets() []*appsv1.ReplicaSet
  38. // GetAllPersistentVolumes returns all the cached persistent volumes
  39. GetAllPersistentVolumes() []*v1.PersistentVolume
  40. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  41. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  42. // GetAllStorageClasses returns all the cached storage classes
  43. GetAllStorageClasses() []*stv1.StorageClass
  44. // GetAllJobs returns all the cached jobs
  45. GetAllJobs() []*batchv1.Job
  46. // GetAllHorizontalPodAutoscalers returns all cached horizontal pod autoscalers
  47. GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler
  48. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  49. GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget
  50. // GetAllReplicationControllers returns all cached replication controllers
  51. GetAllReplicationControllers() []*v1.ReplicationController
  52. // SetConfigMapUpdateFunc sets the configmap update function
  53. SetConfigMapUpdateFunc(func(interface{}))
  54. }
  55. // KubernetesClusterCache is the implementation of ClusterCache
  56. type KubernetesClusterCache struct {
  57. client kubernetes.Interface
  58. namespaceWatch WatchController
  59. nodeWatch WatchController
  60. podWatch WatchController
  61. kubecostConfigMapWatch WatchController
  62. serviceWatch WatchController
  63. daemonsetsWatch WatchController
  64. deploymentsWatch WatchController
  65. statefulsetWatch WatchController
  66. replicasetWatch WatchController
  67. pvWatch WatchController
  68. pvcWatch WatchController
  69. storageClassWatch WatchController
  70. jobsWatch WatchController
  71. hpaWatch WatchController
  72. pdbWatch WatchController
  73. replicationControllerWatch WatchController
  74. stop chan struct{}
  75. }
  76. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  77. defer wg.Done()
  78. wc.WarmUp(cancel)
  79. }
  80. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  81. coreRestClient := client.CoreV1().RESTClient()
  82. appsRestClient := client.AppsV1().RESTClient()
  83. storageRestClient := client.StorageV1().RESTClient()
  84. batchClient := client.BatchV1().RESTClient()
  85. autoscalingClient := client.AutoscalingV2beta1().RESTClient()
  86. pdbClient := client.PolicyV1beta1().RESTClient()
  87. kubecostNamespace := env.GetKubecostNamespace()
  88. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  89. kcc := &KubernetesClusterCache{
  90. client: client,
  91. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  92. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  93. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  94. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  95. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  96. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  97. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  98. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  99. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  100. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  101. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  102. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  103. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  104. hpaWatch: NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
  105. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &v1beta1.PodDisruptionBudget{}, "", fields.Everything()),
  106. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  107. }
  108. // Wait for each caching watcher to initialize
  109. var wg sync.WaitGroup
  110. wg.Add(16)
  111. cancel := make(chan struct{})
  112. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  113. go initializeCache(kcc.nodeWatch, &wg, cancel)
  114. go initializeCache(kcc.podWatch, &wg, cancel)
  115. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  116. go initializeCache(kcc.serviceWatch, &wg, cancel)
  117. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  118. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  119. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  120. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  121. go initializeCache(kcc.pvWatch, &wg, cancel)
  122. go initializeCache(kcc.pvcWatch, &wg, cancel)
  123. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  124. go initializeCache(kcc.jobsWatch, &wg, cancel)
  125. go initializeCache(kcc.hpaWatch, &wg, cancel)
  126. go initializeCache(kcc.podWatch, &wg, cancel)
  127. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  128. wg.Wait()
  129. klog.Infof("Done waiting")
  130. return kcc
  131. }
  132. func (kcc *KubernetesClusterCache) Run() {
  133. if kcc.stop != nil {
  134. return
  135. }
  136. stopCh := make(chan struct{})
  137. go kcc.namespaceWatch.Run(1, stopCh)
  138. go kcc.nodeWatch.Run(1, stopCh)
  139. go kcc.podWatch.Run(1, stopCh)
  140. go kcc.serviceWatch.Run(1, stopCh)
  141. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  142. go kcc.daemonsetsWatch.Run(1, stopCh)
  143. go kcc.deploymentsWatch.Run(1, stopCh)
  144. go kcc.statefulsetWatch.Run(1, stopCh)
  145. go kcc.replicasetWatch.Run(1, stopCh)
  146. go kcc.pvWatch.Run(1, stopCh)
  147. go kcc.pvcWatch.Run(1, stopCh)
  148. go kcc.storageClassWatch.Run(1, stopCh)
  149. go kcc.jobsWatch.Run(1, stopCh)
  150. go kcc.hpaWatch.Run(1, stopCh)
  151. go kcc.pdbWatch.Run(1, stopCh)
  152. go kcc.replicationControllerWatch.Run(1, stopCh)
  153. kcc.stop = stopCh
  154. }
  155. func (kcc *KubernetesClusterCache) Stop() {
  156. if kcc.stop == nil {
  157. return
  158. }
  159. close(kcc.stop)
  160. kcc.stop = nil
  161. }
  162. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  163. var namespaces []*v1.Namespace
  164. items := kcc.namespaceWatch.GetAll()
  165. for _, ns := range items {
  166. namespaces = append(namespaces, ns.(*v1.Namespace))
  167. }
  168. return namespaces
  169. }
  170. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  171. var nodes []*v1.Node
  172. items := kcc.nodeWatch.GetAll()
  173. for _, node := range items {
  174. nodes = append(nodes, node.(*v1.Node))
  175. }
  176. return nodes
  177. }
  178. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  179. var pods []*v1.Pod
  180. items := kcc.podWatch.GetAll()
  181. for _, pod := range items {
  182. pods = append(pods, pod.(*v1.Pod))
  183. }
  184. return pods
  185. }
  186. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  187. var services []*v1.Service
  188. items := kcc.serviceWatch.GetAll()
  189. for _, service := range items {
  190. services = append(services, service.(*v1.Service))
  191. }
  192. return services
  193. }
  194. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  195. var daemonsets []*appsv1.DaemonSet
  196. items := kcc.daemonsetsWatch.GetAll()
  197. for _, daemonset := range items {
  198. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  199. }
  200. return daemonsets
  201. }
  202. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  203. var deployments []*appsv1.Deployment
  204. items := kcc.deploymentsWatch.GetAll()
  205. for _, deployment := range items {
  206. deployments = append(deployments, deployment.(*appsv1.Deployment))
  207. }
  208. return deployments
  209. }
  210. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  211. var statefulsets []*appsv1.StatefulSet
  212. items := kcc.statefulsetWatch.GetAll()
  213. for _, statefulset := range items {
  214. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  215. }
  216. return statefulsets
  217. }
  218. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  219. var replicasets []*appsv1.ReplicaSet
  220. items := kcc.replicasetWatch.GetAll()
  221. for _, replicaset := range items {
  222. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  223. }
  224. return replicasets
  225. }
  226. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  227. var pvs []*v1.PersistentVolume
  228. items := kcc.pvWatch.GetAll()
  229. for _, pv := range items {
  230. pvs = append(pvs, pv.(*v1.PersistentVolume))
  231. }
  232. return pvs
  233. }
  234. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  235. var pvcs []*v1.PersistentVolumeClaim
  236. items := kcc.pvcWatch.GetAll()
  237. for _, pvc := range items {
  238. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  239. }
  240. return pvcs
  241. }
  242. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  243. var storageClasses []*stv1.StorageClass
  244. items := kcc.storageClassWatch.GetAll()
  245. for _, stc := range items {
  246. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  247. }
  248. return storageClasses
  249. }
  250. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  251. var jobs []*batchv1.Job
  252. items := kcc.jobsWatch.GetAll()
  253. for _, job := range items {
  254. jobs = append(jobs, job.(*batchv1.Job))
  255. }
  256. return jobs
  257. }
  258. func (kcc *KubernetesClusterCache) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
  259. var hpas []*autoscaling.HorizontalPodAutoscaler
  260. items := kcc.hpaWatch.GetAll()
  261. for _, hpa := range items {
  262. hpas = append(hpas, hpa.(*autoscaling.HorizontalPodAutoscaler))
  263. }
  264. return hpas
  265. }
  266. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
  267. var pdbs []*v1beta1.PodDisruptionBudget
  268. items := kcc.pdbWatch.GetAll()
  269. for _, pdb := range items {
  270. pdbs = append(pdbs, pdb.(*v1beta1.PodDisruptionBudget))
  271. }
  272. return pdbs
  273. }
  274. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
  275. var rcs []*v1.ReplicationController
  276. items := kcc.replicationControllerWatch.GetAll()
  277. for _, rc := range items {
  278. rcs = append(rcs, rc.(*v1.ReplicationController))
  279. }
  280. return rcs
  281. }
  282. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  283. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  284. }