clustercache.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/env"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. v1 "k8s.io/api/core/v1"
  8. stv1 "k8s.io/api/storage/v1"
  9. "k8s.io/apimachinery/pkg/fields"
  10. "k8s.io/client-go/kubernetes"
  11. )
  12. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  13. // up to date resources using watchers
  14. type ClusterCache interface {
  15. // Run starts the watcher processes
  16. Run()
  17. // Stops the watcher processes
  18. Stop()
  19. // Gets the underlying clientset
  20. // TODO: Remove once we support all cached cluster components
  21. GetClient() kubernetes.Interface
  22. // GetAllNamespaces returns all the cached namespaces
  23. GetAllNamespaces() []*v1.Namespace
  24. // GetAllNodes returns all the cached nodes
  25. GetAllNodes() []*v1.Node
  26. // GetAllPods returns all the cached pods
  27. GetAllPods() []*v1.Pod
  28. // GetAllServices returns all the cached services
  29. GetAllServices() []*v1.Service
  30. // GetAllDaemonSets returns all the cached DaemonSets
  31. GetAllDaemonSets() []*appsv1.DaemonSet
  32. // GetAllDeployments returns all the cached deployments
  33. GetAllDeployments() []*appsv1.Deployment
  34. // GetAllStatfulSets returns all the cached StatefulSets
  35. GetAllStatefulSets() []*appsv1.StatefulSet
  36. // GetAllReplicaSets returns all the cached ReplicaSets
  37. GetAllReplicaSets() []*appsv1.ReplicaSet
  38. // GetAllPersistentVolumes returns all the cached persistent volumes
  39. GetAllPersistentVolumes() []*v1.PersistentVolume
  40. // GetAllStorageClasses returns all the cached storage classes
  41. GetAllStorageClasses() []*stv1.StorageClass
  42. // SetConfigMapUpdateFunc sets the configmap update function
  43. SetConfigMapUpdateFunc(func(interface{}))
  44. }
  45. // KubernetesClusterCache is the implementation of ClusterCache
  46. type KubernetesClusterCache struct {
  47. client kubernetes.Interface
  48. namespaceWatch WatchController
  49. nodeWatch WatchController
  50. podWatch WatchController
  51. kubecostConfigMapWatch WatchController
  52. serviceWatch WatchController
  53. daemonsetsWatch WatchController
  54. deploymentsWatch WatchController
  55. statefulsetWatch WatchController
  56. replicasetWatch WatchController
  57. pvWatch WatchController
  58. storageClassWatch WatchController
  59. stop chan struct{}
  60. }
  61. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  62. defer wg.Done()
  63. wc.WarmUp(cancel)
  64. }
  65. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  66. coreRestClient := client.CoreV1().RESTClient()
  67. appsRestClient := client.AppsV1().RESTClient()
  68. storageRestClient := client.StorageV1().RESTClient()
  69. kubecostNamespace := env.GetKubecostNamespace()
  70. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  71. kcc := &KubernetesClusterCache{
  72. client: client,
  73. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  74. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  75. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  76. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  77. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  78. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  79. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  80. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  81. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  82. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  83. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  84. }
  85. // Wait for each caching watcher to initialize
  86. var wg sync.WaitGroup
  87. wg.Add(11)
  88. cancel := make(chan struct{})
  89. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  90. go initializeCache(kcc.nodeWatch, &wg, cancel)
  91. go initializeCache(kcc.podWatch, &wg, cancel)
  92. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  93. go initializeCache(kcc.serviceWatch, &wg, cancel)
  94. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  95. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  96. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  97. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  98. go initializeCache(kcc.pvWatch, &wg, cancel)
  99. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  100. wg.Wait()
  101. return kcc
  102. }
  103. func (kcc *KubernetesClusterCache) Run() {
  104. if kcc.stop != nil {
  105. return
  106. }
  107. stopCh := make(chan struct{})
  108. go kcc.namespaceWatch.Run(1, stopCh)
  109. go kcc.nodeWatch.Run(1, stopCh)
  110. go kcc.podWatch.Run(1, stopCh)
  111. go kcc.serviceWatch.Run(1, stopCh)
  112. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  113. go kcc.daemonsetsWatch.Run(1, stopCh)
  114. go kcc.deploymentsWatch.Run(1, stopCh)
  115. go kcc.statefulsetWatch.Run(1, stopCh)
  116. go kcc.replicasetWatch.Run(1, stopCh)
  117. go kcc.pvWatch.Run(1, stopCh)
  118. go kcc.storageClassWatch.Run(1, stopCh)
  119. kcc.stop = stopCh
  120. }
  121. func (kcc *KubernetesClusterCache) Stop() {
  122. if kcc.stop == nil {
  123. return
  124. }
  125. close(kcc.stop)
  126. kcc.stop = nil
  127. }
  128. func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
  129. return kcc.client
  130. }
  131. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  132. var namespaces []*v1.Namespace
  133. items := kcc.namespaceWatch.GetAll()
  134. for _, ns := range items {
  135. namespaces = append(namespaces, ns.(*v1.Namespace))
  136. }
  137. return namespaces
  138. }
  139. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  140. var nodes []*v1.Node
  141. items := kcc.nodeWatch.GetAll()
  142. for _, node := range items {
  143. nodes = append(nodes, node.(*v1.Node))
  144. }
  145. return nodes
  146. }
  147. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  148. var pods []*v1.Pod
  149. items := kcc.podWatch.GetAll()
  150. for _, pod := range items {
  151. pods = append(pods, pod.(*v1.Pod))
  152. }
  153. return pods
  154. }
  155. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  156. var services []*v1.Service
  157. items := kcc.serviceWatch.GetAll()
  158. for _, service := range items {
  159. services = append(services, service.(*v1.Service))
  160. }
  161. return services
  162. }
  163. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  164. var daemonsets []*appsv1.DaemonSet
  165. items := kcc.daemonsetsWatch.GetAll()
  166. for _, daemonset := range items {
  167. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  168. }
  169. return daemonsets
  170. }
  171. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  172. var deployments []*appsv1.Deployment
  173. items := kcc.deploymentsWatch.GetAll()
  174. for _, deployment := range items {
  175. deployments = append(deployments, deployment.(*appsv1.Deployment))
  176. }
  177. return deployments
  178. }
  179. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  180. var statefulsets []*appsv1.StatefulSet
  181. items := kcc.statefulsetWatch.GetAll()
  182. for _, statefulset := range items {
  183. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  184. }
  185. return statefulsets
  186. }
  187. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  188. var replicasets []*appsv1.ReplicaSet
  189. items := kcc.replicasetWatch.GetAll()
  190. for _, replicaset := range items {
  191. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  192. }
  193. return replicasets
  194. }
  195. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  196. var pvs []*v1.PersistentVolume
  197. items := kcc.pvWatch.GetAll()
  198. for _, pv := range items {
  199. pvs = append(pvs, pv.(*v1.PersistentVolume))
  200. }
  201. return pvs
  202. }
  203. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  204. var storageClasses []*stv1.StorageClass
  205. items := kcc.storageClassWatch.GetAll()
  206. for _, stc := range items {
  207. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  208. }
  209. return storageClasses
  210. }
  211. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  212. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  213. }