clustercache.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package clustercache
  2. import (
  3. "os"
  4. "sync"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. // batchv1 "k8s.io/api/batch/v1"
  8. v1 "k8s.io/api/core/v1"
  9. stv1 "k8s.io/api/storage/v1"
  10. "k8s.io/apimachinery/pkg/fields"
  11. "k8s.io/client-go/kubernetes"
  12. )
  13. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  14. // up to date resources using watchers
  15. type ClusterCache interface {
  16. // Run starts the watcher processes
  17. Run()
  18. // Stops the watcher processes
  19. Stop()
  20. // Gets the underlying clientset
  21. // TODO: Remove once we support all cached cluster components
  22. GetClient() kubernetes.Interface
  23. // GetAllNamespaces returns all the cached namespaces
  24. GetAllNamespaces() []*v1.Namespace
  25. // GetAllNodes returns all the cached nodes
  26. GetAllNodes() []*v1.Node
  27. // GetAllPods returns all the cached pods
  28. GetAllPods() []*v1.Pod
  29. // GetAllServices returns all the cached services
  30. GetAllServices() []*v1.Service
  31. // GetAllDaemonSets returns all the cached DaemonSets
  32. GetAllDaemonSets() []*appsv1.DaemonSet
  33. // GetAllDeployments returns all the cached deployments
  34. GetAllDeployments() []*appsv1.Deployment
  35. // GetAllStatfulSets returns all the cached StatefulSets
  36. GetAllStatefulSets() []*appsv1.StatefulSet
  37. // GetAllReplicaSets returns all the cached ReplicaSets
  38. GetAllReplicaSets() []*appsv1.ReplicaSet
  39. // GetAllJobs returns all the cached Jobs
  40. // GetAllJobs() []*batchv1.Job
  41. // GetAllPersistentVolumes returns all the cached persistent volumes
  42. GetAllPersistentVolumes() []*v1.PersistentVolume
  43. // GetAllStorageClasses returns all the cached storage classes
  44. GetAllStorageClasses() []*stv1.StorageClass
  45. // SetConfigMapUpdateFunc sets the configmap update function
  46. SetConfigMapUpdateFunc(func(interface{}))
  47. }
  48. // KubernetesClusterCache is the implementation of ClusterCache
  49. type KubernetesClusterCache struct {
  50. client kubernetes.Interface
  51. namespaceWatch WatchController
  52. nodeWatch WatchController
  53. podWatch WatchController
  54. kubecostConfigMapWatch WatchController
  55. serviceWatch WatchController
  56. daemonsetsWatch WatchController
  57. deploymentsWatch WatchController
  58. statefulsetWatch WatchController
  59. replicasetWatch WatchController
  60. // jobWatch WatchController
  61. pvWatch WatchController
  62. storageClassWatch WatchController
  63. stop chan struct{}
  64. }
  65. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  66. defer wg.Done()
  67. wc.WarmUp(cancel)
  68. }
  69. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  70. coreRestClient := client.CoreV1().RESTClient()
  71. appsRestClient := client.AppsV1().RESTClient()
  72. storageRestClient := client.StorageV1().RESTClient()
  73. kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
  74. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  75. kcc := &KubernetesClusterCache{
  76. client: client,
  77. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  78. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  79. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  80. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  81. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  82. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  83. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  84. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  85. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  86. // jobWatch: NewCachingWatcher(appsRestClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  87. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  88. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  89. }
  90. // Wait for each caching watcher to initialize
  91. var wg sync.WaitGroup
  92. // wg.Add(12)
  93. wg.Add(11)
  94. cancel := make(chan struct{})
  95. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  96. go initializeCache(kcc.nodeWatch, &wg, cancel)
  97. go initializeCache(kcc.podWatch, &wg, cancel)
  98. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  99. go initializeCache(kcc.serviceWatch, &wg, cancel)
  100. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  101. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  102. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  103. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  104. // go initializeCache(kcc.jobWatch, &wg, cancel)
  105. go initializeCache(kcc.pvWatch, &wg, cancel)
  106. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  107. wg.Wait()
  108. return kcc
  109. }
  110. func (kcc *KubernetesClusterCache) Run() {
  111. if kcc.stop != nil {
  112. return
  113. }
  114. stopCh := make(chan struct{})
  115. go kcc.namespaceWatch.Run(1, stopCh)
  116. go kcc.nodeWatch.Run(1, stopCh)
  117. go kcc.podWatch.Run(1, stopCh)
  118. go kcc.serviceWatch.Run(1, stopCh)
  119. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  120. go kcc.daemonsetsWatch.Run(1, stopCh)
  121. go kcc.deploymentsWatch.Run(1, stopCh)
  122. go kcc.statefulsetWatch.Run(1, stopCh)
  123. go kcc.replicasetWatch.Run(1, stopCh)
  124. // go kcc.jobWatch.Run(1, stopCh)
  125. go kcc.pvWatch.Run(1, stopCh)
  126. go kcc.storageClassWatch.Run(1, stopCh)
  127. kcc.stop = stopCh
  128. }
  129. func (kcc *KubernetesClusterCache) Stop() {
  130. if kcc.stop == nil {
  131. return
  132. }
  133. close(kcc.stop)
  134. kcc.stop = nil
  135. }
  136. func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
  137. return kcc.client
  138. }
  139. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  140. var namespaces []*v1.Namespace
  141. items := kcc.namespaceWatch.GetAll()
  142. for _, ns := range items {
  143. namespaces = append(namespaces, ns.(*v1.Namespace))
  144. }
  145. return namespaces
  146. }
  147. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  148. var nodes []*v1.Node
  149. items := kcc.nodeWatch.GetAll()
  150. for _, node := range items {
  151. nodes = append(nodes, node.(*v1.Node))
  152. }
  153. return nodes
  154. }
  155. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  156. var pods []*v1.Pod
  157. items := kcc.podWatch.GetAll()
  158. for _, pod := range items {
  159. pods = append(pods, pod.(*v1.Pod))
  160. }
  161. return pods
  162. }
  163. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  164. var services []*v1.Service
  165. items := kcc.serviceWatch.GetAll()
  166. for _, service := range items {
  167. services = append(services, service.(*v1.Service))
  168. }
  169. return services
  170. }
  171. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  172. var daemonsets []*appsv1.DaemonSet
  173. items := kcc.daemonsetsWatch.GetAll()
  174. for _, daemonset := range items {
  175. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  176. }
  177. return daemonsets
  178. }
  179. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  180. var deployments []*appsv1.Deployment
  181. items := kcc.deploymentsWatch.GetAll()
  182. for _, deployment := range items {
  183. deployments = append(deployments, deployment.(*appsv1.Deployment))
  184. }
  185. return deployments
  186. }
  187. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  188. var statefulsets []*appsv1.StatefulSet
  189. items := kcc.statefulsetWatch.GetAll()
  190. for _, statefulset := range items {
  191. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  192. }
  193. return statefulsets
  194. }
  195. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  196. var replicasets []*appsv1.ReplicaSet
  197. items := kcc.replicasetWatch.GetAll()
  198. for _, replicaset := range items {
  199. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  200. }
  201. return replicasets
  202. }
  203. // func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  204. // var jobs []*batchv1.Job
  205. // items := kcc.jobWatch.GetAll()
  206. // for _, job := range items {
  207. // jobs = append(jobs, job.(*batchv1.Job))
  208. // }
  209. // return jobs
  210. // }
  211. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  212. var pvs []*v1.PersistentVolume
  213. items := kcc.pvWatch.GetAll()
  214. for _, pv := range items {
  215. pvs = append(pvs, pv.(*v1.PersistentVolume))
  216. }
  217. return pvs
  218. }
  219. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  220. var storageClasses []*stv1.StorageClass
  221. items := kcc.storageClassWatch.GetAll()
  222. for _, stc := range items {
  223. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  224. }
  225. return storageClasses
  226. }
  227. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  228. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  229. }