clustercache.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/env"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. v1 "k8s.io/api/core/v1"
  8. stv1 "k8s.io/api/storage/v1"
  9. "k8s.io/apimachinery/pkg/fields"
  10. "k8s.io/client-go/kubernetes"
  11. )
  12. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  13. // up to date resources using watchers
  14. type ClusterCache interface {
  15. // Run starts the watcher processes
  16. Run()
  17. // Stops the watcher processes
  18. Stop()
  19. // Gets the underlying clientset
  20. // TODO: Remove once we support all cached cluster components
  21. GetClient() kubernetes.Interface
  22. // GetAllNamespaces returns all the cached namespaces
  23. GetAllNamespaces() []*v1.Namespace
  24. // GetAllNodes returns all the cached nodes
  25. GetAllNodes() []*v1.Node
  26. // GetAllPods returns all the cached pods
  27. GetAllPods() []*v1.Pod
  28. // GetAllServices returns all the cached services
  29. GetAllServices() []*v1.Service
  30. // GetAllDaemonSets returns all the cached DaemonSets
  31. GetAllDaemonSets() []*appsv1.DaemonSet
  32. // GetAllDeployments returns all the cached deployments
  33. GetAllDeployments() []*appsv1.Deployment
  34. // GetAllStatfulSets returns all the cached StatefulSets
  35. GetAllStatefulSets() []*appsv1.StatefulSet
  36. // GetAllReplicaSets returns all the cached ReplicaSets
  37. GetAllReplicaSets() []*appsv1.ReplicaSet
  38. // GetAllPersistentVolumes returns all the cached persistent volumes
  39. GetAllPersistentVolumes() []*v1.PersistentVolume
  40. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  41. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  42. // GetAllStorageClasses returns all the cached storage classes
  43. GetAllStorageClasses() []*stv1.StorageClass
  44. // SetConfigMapUpdateFunc sets the configmap update function
  45. SetConfigMapUpdateFunc(func(interface{}))
  46. }
  47. // KubernetesClusterCache is the implementation of ClusterCache
  48. type KubernetesClusterCache struct {
  49. client kubernetes.Interface
  50. namespaceWatch WatchController
  51. nodeWatch WatchController
  52. podWatch WatchController
  53. kubecostConfigMapWatch WatchController
  54. serviceWatch WatchController
  55. daemonsetsWatch WatchController
  56. deploymentsWatch WatchController
  57. statefulsetWatch WatchController
  58. replicasetWatch WatchController
  59. pvWatch WatchController
  60. pvcWatch WatchController
  61. storageClassWatch WatchController
  62. stop chan struct{}
  63. }
  64. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  65. defer wg.Done()
  66. wc.WarmUp(cancel)
  67. }
  68. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  69. coreRestClient := client.CoreV1().RESTClient()
  70. appsRestClient := client.AppsV1().RESTClient()
  71. storageRestClient := client.StorageV1().RESTClient()
  72. kubecostNamespace := env.GetKubecostNamespace()
  73. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  74. kcc := &KubernetesClusterCache{
  75. client: client,
  76. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  77. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  78. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  79. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  80. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  81. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  82. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  83. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  84. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  85. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  86. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  87. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  88. }
  89. // Wait for each caching watcher to initialize
  90. var wg sync.WaitGroup
  91. wg.Add(12)
  92. cancel := make(chan struct{})
  93. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  94. go initializeCache(kcc.nodeWatch, &wg, cancel)
  95. go initializeCache(kcc.podWatch, &wg, cancel)
  96. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  97. go initializeCache(kcc.serviceWatch, &wg, cancel)
  98. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  99. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  100. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  101. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  102. go initializeCache(kcc.pvWatch, &wg, cancel)
  103. go initializeCache(kcc.pvcWatch, &wg, cancel)
  104. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  105. wg.Wait()
  106. return kcc
  107. }
  108. func (kcc *KubernetesClusterCache) Run() {
  109. if kcc.stop != nil {
  110. return
  111. }
  112. stopCh := make(chan struct{})
  113. go kcc.namespaceWatch.Run(1, stopCh)
  114. go kcc.nodeWatch.Run(1, stopCh)
  115. go kcc.podWatch.Run(1, stopCh)
  116. go kcc.serviceWatch.Run(1, stopCh)
  117. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  118. go kcc.daemonsetsWatch.Run(1, stopCh)
  119. go kcc.deploymentsWatch.Run(1, stopCh)
  120. go kcc.statefulsetWatch.Run(1, stopCh)
  121. go kcc.replicasetWatch.Run(1, stopCh)
  122. go kcc.pvWatch.Run(1, stopCh)
  123. go kcc.pvcWatch.Run(1, stopCh)
  124. go kcc.storageClassWatch.Run(1, stopCh)
  125. kcc.stop = stopCh
  126. }
  127. func (kcc *KubernetesClusterCache) Stop() {
  128. if kcc.stop == nil {
  129. return
  130. }
  131. close(kcc.stop)
  132. kcc.stop = nil
  133. }
  134. func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
  135. return kcc.client
  136. }
  137. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  138. var namespaces []*v1.Namespace
  139. items := kcc.namespaceWatch.GetAll()
  140. for _, ns := range items {
  141. namespaces = append(namespaces, ns.(*v1.Namespace))
  142. }
  143. return namespaces
  144. }
  145. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  146. var nodes []*v1.Node
  147. items := kcc.nodeWatch.GetAll()
  148. for _, node := range items {
  149. nodes = append(nodes, node.(*v1.Node))
  150. }
  151. return nodes
  152. }
  153. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  154. var pods []*v1.Pod
  155. items := kcc.podWatch.GetAll()
  156. for _, pod := range items {
  157. pods = append(pods, pod.(*v1.Pod))
  158. }
  159. return pods
  160. }
  161. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  162. var services []*v1.Service
  163. items := kcc.serviceWatch.GetAll()
  164. for _, service := range items {
  165. services = append(services, service.(*v1.Service))
  166. }
  167. return services
  168. }
  169. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  170. var daemonsets []*appsv1.DaemonSet
  171. items := kcc.daemonsetsWatch.GetAll()
  172. for _, daemonset := range items {
  173. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  174. }
  175. return daemonsets
  176. }
  177. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  178. var deployments []*appsv1.Deployment
  179. items := kcc.deploymentsWatch.GetAll()
  180. for _, deployment := range items {
  181. deployments = append(deployments, deployment.(*appsv1.Deployment))
  182. }
  183. return deployments
  184. }
  185. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  186. var statefulsets []*appsv1.StatefulSet
  187. items := kcc.statefulsetWatch.GetAll()
  188. for _, statefulset := range items {
  189. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  190. }
  191. return statefulsets
  192. }
  193. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  194. var replicasets []*appsv1.ReplicaSet
  195. items := kcc.replicasetWatch.GetAll()
  196. for _, replicaset := range items {
  197. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  198. }
  199. return replicasets
  200. }
  201. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  202. var pvs []*v1.PersistentVolume
  203. items := kcc.pvWatch.GetAll()
  204. for _, pv := range items {
  205. pvs = append(pvs, pv.(*v1.PersistentVolume))
  206. }
  207. return pvs
  208. }
  209. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  210. var pvcs []*v1.PersistentVolumeClaim
  211. items := kcc.pvcWatch.GetAll()
  212. for _, pvc := range items {
  213. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  214. }
  215. return pvcs
  216. }
  217. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  218. var storageClasses []*stv1.StorageClass
  219. items := kcc.storageClassWatch.GetAll()
  220. for _, stc := range items {
  221. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  222. }
  223. return storageClasses
  224. }
  225. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  226. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  227. }