clustercache.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/env"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. batchv1 "k8s.io/api/batch/v1"
  8. v1 "k8s.io/api/core/v1"
  9. stv1 "k8s.io/api/storage/v1"
  10. "k8s.io/apimachinery/pkg/fields"
  11. "k8s.io/client-go/kubernetes"
  12. )
  13. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  14. // up to date resources using watchers
  15. type ClusterCache interface {
  16. // Run starts the watcher processes
  17. Run()
  18. // Stops the watcher processes
  19. Stop()
  20. // Gets the underlying clientset
  21. // TODO: Remove once we support all cached cluster components
  22. GetClient() kubernetes.Interface
  23. // GetAllNamespaces returns all the cached namespaces
  24. GetAllNamespaces() []*v1.Namespace
  25. // GetAllNodes returns all the cached nodes
  26. GetAllNodes() []*v1.Node
  27. // GetAllPods returns all the cached pods
  28. GetAllPods() []*v1.Pod
  29. // GetAllServices returns all the cached services
  30. GetAllServices() []*v1.Service
  31. // GetAllDaemonSets returns all the cached DaemonSets
  32. GetAllDaemonSets() []*appsv1.DaemonSet
  33. // GetAllDeployments returns all the cached deployments
  34. GetAllDeployments() []*appsv1.Deployment
  35. // GetAllStatfulSets returns all the cached StatefulSets
  36. GetAllStatefulSets() []*appsv1.StatefulSet
  37. // GetAllReplicaSets returns all the cached ReplicaSets
  38. GetAllReplicaSets() []*appsv1.ReplicaSet
  39. // GetAllPersistentVolumes returns all the cached persistent volumes
  40. GetAllPersistentVolumes() []*v1.PersistentVolume
  41. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  42. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  43. // GetAllStorageClasses returns all the cached storage classes
  44. GetAllStorageClasses() []*stv1.StorageClass
  45. // GetAllJobs returns all the cached jobs
  46. GetAllJobs() []*batchv1.Job
  47. // SetConfigMapUpdateFunc sets the configmap update function
  48. SetConfigMapUpdateFunc(func(interface{}))
  49. }
  50. // KubernetesClusterCache is the implementation of ClusterCache
  51. type KubernetesClusterCache struct {
  52. client kubernetes.Interface
  53. namespaceWatch WatchController
  54. nodeWatch WatchController
  55. podWatch WatchController
  56. kubecostConfigMapWatch WatchController
  57. serviceWatch WatchController
  58. daemonsetsWatch WatchController
  59. deploymentsWatch WatchController
  60. statefulsetWatch WatchController
  61. replicasetWatch WatchController
  62. pvWatch WatchController
  63. pvcWatch WatchController
  64. storageClassWatch WatchController
  65. jobsWatch WatchController
  66. stop chan struct{}
  67. }
  68. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  69. defer wg.Done()
  70. wc.WarmUp(cancel)
  71. }
  72. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  73. coreRestClient := client.CoreV1().RESTClient()
  74. appsRestClient := client.AppsV1().RESTClient()
  75. storageRestClient := client.StorageV1().RESTClient()
  76. batchClient := client.BatchV1().RESTClient()
  77. kubecostNamespace := env.GetKubecostNamespace()
  78. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  79. kcc := &KubernetesClusterCache{
  80. client: client,
  81. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  82. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  83. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  84. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  85. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  86. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  87. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  88. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  89. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  90. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  91. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  92. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  93. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  94. }
  95. // Wait for each caching watcher to initialize
  96. var wg sync.WaitGroup
  97. wg.Add(13)
  98. cancel := make(chan struct{})
  99. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  100. go initializeCache(kcc.nodeWatch, &wg, cancel)
  101. go initializeCache(kcc.podWatch, &wg, cancel)
  102. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  103. go initializeCache(kcc.serviceWatch, &wg, cancel)
  104. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  105. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  106. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  107. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  108. go initializeCache(kcc.pvWatch, &wg, cancel)
  109. go initializeCache(kcc.pvcWatch, &wg, cancel)
  110. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  111. go initializeCache(kcc.jobsWatch, &wg, cancel)
  112. wg.Wait()
  113. return kcc
  114. }
  115. func (kcc *KubernetesClusterCache) Run() {
  116. if kcc.stop != nil {
  117. return
  118. }
  119. stopCh := make(chan struct{})
  120. go kcc.namespaceWatch.Run(1, stopCh)
  121. go kcc.nodeWatch.Run(1, stopCh)
  122. go kcc.podWatch.Run(1, stopCh)
  123. go kcc.serviceWatch.Run(1, stopCh)
  124. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  125. go kcc.daemonsetsWatch.Run(1, stopCh)
  126. go kcc.deploymentsWatch.Run(1, stopCh)
  127. go kcc.statefulsetWatch.Run(1, stopCh)
  128. go kcc.replicasetWatch.Run(1, stopCh)
  129. go kcc.pvWatch.Run(1, stopCh)
  130. go kcc.pvcWatch.Run(1, stopCh)
  131. go kcc.storageClassWatch.Run(1, stopCh)
  132. go kcc.jobsWatch.Run(1, stopCh)
  133. kcc.stop = stopCh
  134. }
  135. func (kcc *KubernetesClusterCache) Stop() {
  136. if kcc.stop == nil {
  137. return
  138. }
  139. close(kcc.stop)
  140. kcc.stop = nil
  141. }
  142. func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
  143. return kcc.client
  144. }
  145. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  146. var namespaces []*v1.Namespace
  147. items := kcc.namespaceWatch.GetAll()
  148. for _, ns := range items {
  149. namespaces = append(namespaces, ns.(*v1.Namespace))
  150. }
  151. return namespaces
  152. }
  153. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  154. var nodes []*v1.Node
  155. items := kcc.nodeWatch.GetAll()
  156. for _, node := range items {
  157. nodes = append(nodes, node.(*v1.Node))
  158. }
  159. return nodes
  160. }
  161. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  162. var pods []*v1.Pod
  163. items := kcc.podWatch.GetAll()
  164. for _, pod := range items {
  165. pods = append(pods, pod.(*v1.Pod))
  166. }
  167. return pods
  168. }
  169. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  170. var services []*v1.Service
  171. items := kcc.serviceWatch.GetAll()
  172. for _, service := range items {
  173. services = append(services, service.(*v1.Service))
  174. }
  175. return services
  176. }
  177. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  178. var daemonsets []*appsv1.DaemonSet
  179. items := kcc.daemonsetsWatch.GetAll()
  180. for _, daemonset := range items {
  181. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  182. }
  183. return daemonsets
  184. }
  185. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  186. var deployments []*appsv1.Deployment
  187. items := kcc.deploymentsWatch.GetAll()
  188. for _, deployment := range items {
  189. deployments = append(deployments, deployment.(*appsv1.Deployment))
  190. }
  191. return deployments
  192. }
  193. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  194. var statefulsets []*appsv1.StatefulSet
  195. items := kcc.statefulsetWatch.GetAll()
  196. for _, statefulset := range items {
  197. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  198. }
  199. return statefulsets
  200. }
  201. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  202. var replicasets []*appsv1.ReplicaSet
  203. items := kcc.replicasetWatch.GetAll()
  204. for _, replicaset := range items {
  205. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  206. }
  207. return replicasets
  208. }
  209. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  210. var pvs []*v1.PersistentVolume
  211. items := kcc.pvWatch.GetAll()
  212. for _, pv := range items {
  213. pvs = append(pvs, pv.(*v1.PersistentVolume))
  214. }
  215. return pvs
  216. }
  217. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  218. var pvcs []*v1.PersistentVolumeClaim
  219. items := kcc.pvcWatch.GetAll()
  220. for _, pvc := range items {
  221. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  222. }
  223. return pvcs
  224. }
  225. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  226. var storageClasses []*stv1.StorageClass
  227. items := kcc.storageClassWatch.GetAll()
  228. for _, stc := range items {
  229. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  230. }
  231. return storageClasses
  232. }
  233. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  234. var jobs []*batchv1.Job
  235. items := kcc.jobsWatch.GetAll()
  236. for _, job := range items {
  237. jobs = append(jobs, job.(*batchv1.Job))
  238. }
  239. return jobs
  240. }
  241. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  242. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  243. }