clustercache.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package clustercache
  2. import (
  3. "os"
  4. "sync"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. v1 "k8s.io/api/core/v1"
  8. stv1 "k8s.io/api/storage/v1"
  9. "k8s.io/apimachinery/pkg/fields"
  10. "k8s.io/client-go/kubernetes"
  11. )
  12. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  13. // up to date resources using watchers
  14. type ClusterCache interface {
  15. // Run starts the watcher processes
  16. Run()
  17. // Stops the watcher processes
  18. Stop()
  19. // Gets the underlying clientset
  20. // TODO: Remove once we support all cached cluster components
  21. GetClient() kubernetes.Interface
  22. // GetAllNamespaces returns all the cached namespaces
  23. GetAllNamespaces() []*v1.Namespace
  24. // GetAllNodes returns all the cached nodes
  25. GetAllNodes() []*v1.Node
  26. // GetAllPods returns all the cached pods
  27. GetAllPods() []*v1.Pod
  28. // GetAllServices returns all the cached services
  29. GetAllServices() []*v1.Service
  30. // GetAllDeployments returns all the cached deployments
  31. GetAllDeployments() []*appsv1.Deployment
  32. // GetAllDeployments returns all the cached deployments
  33. GetAllStatefulSets() []*appsv1.StatefulSet
  34. // GetAllPersistentVolumes returns all the cached persistent volumes
  35. GetAllPersistentVolumes() []*v1.PersistentVolume
  36. // GetAllStorageClasses returns all the cached storage classes
  37. GetAllStorageClasses() []*stv1.StorageClass
  38. // SetConfigMapUpdateFunc sets the configmap update function
  39. SetConfigMapUpdateFunc(func(interface{}))
  40. }
  41. // KubernetesClusterCache is the implementation of ClusterCache
  42. type KubernetesClusterCache struct {
  43. client kubernetes.Interface
  44. namespaceWatch WatchController
  45. nodeWatch WatchController
  46. podWatch WatchController
  47. kubecostConfigMapWatch WatchController
  48. serviceWatch WatchController
  49. deploymentsWatch WatchController
  50. statefulsetWatch WatchController
  51. pvWatch WatchController
  52. storageClassWatch WatchController
  53. stop chan struct{}
  54. }
  55. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  56. defer wg.Done()
  57. wc.WarmUp(cancel)
  58. }
  59. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  60. coreRestClient := client.CoreV1().RESTClient()
  61. appsRestClient := client.AppsV1().RESTClient()
  62. storageRestClient := client.StorageV1().RESTClient()
  63. kubecostNamespace := os.Getenv("KUBECOST_NAMESPACE")
  64. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  65. kcc := &KubernetesClusterCache{
  66. client: client,
  67. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  68. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  69. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  70. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  71. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  72. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  73. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  74. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  75. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  76. }
  77. // Wait for each caching watcher to initialize
  78. var wg sync.WaitGroup
  79. wg.Add(9)
  80. cancel := make(chan struct{})
  81. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  82. go initializeCache(kcc.nodeWatch, &wg, cancel)
  83. go initializeCache(kcc.podWatch, &wg, cancel)
  84. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  85. go initializeCache(kcc.serviceWatch, &wg, cancel)
  86. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  87. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  88. go initializeCache(kcc.pvWatch, &wg, cancel)
  89. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  90. wg.Wait()
  91. return kcc
  92. }
  93. func (kcc *KubernetesClusterCache) Run() {
  94. if kcc.stop != nil {
  95. return
  96. }
  97. stopCh := make(chan struct{})
  98. go kcc.namespaceWatch.Run(1, stopCh)
  99. go kcc.nodeWatch.Run(1, stopCh)
  100. go kcc.podWatch.Run(1, stopCh)
  101. go kcc.serviceWatch.Run(1, stopCh)
  102. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  103. go kcc.deploymentsWatch.Run(1, stopCh)
  104. go kcc.statefulsetWatch.Run(1, stopCh)
  105. go kcc.pvWatch.Run(1, stopCh)
  106. go kcc.storageClassWatch.Run(1, stopCh)
  107. kcc.stop = stopCh
  108. }
  109. func (kcc *KubernetesClusterCache) Stop() {
  110. if kcc.stop == nil {
  111. return
  112. }
  113. close(kcc.stop)
  114. kcc.stop = nil
  115. }
  116. func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
  117. return kcc.client
  118. }
  119. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  120. var namespaces []*v1.Namespace
  121. items := kcc.namespaceWatch.GetAll()
  122. for _, ns := range items {
  123. namespaces = append(namespaces, ns.(*v1.Namespace))
  124. }
  125. return namespaces
  126. }
  127. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  128. var nodes []*v1.Node
  129. items := kcc.nodeWatch.GetAll()
  130. for _, node := range items {
  131. nodes = append(nodes, node.(*v1.Node))
  132. }
  133. return nodes
  134. }
  135. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  136. var pods []*v1.Pod
  137. items := kcc.podWatch.GetAll()
  138. for _, pod := range items {
  139. pods = append(pods, pod.(*v1.Pod))
  140. }
  141. return pods
  142. }
  143. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  144. var services []*v1.Service
  145. items := kcc.serviceWatch.GetAll()
  146. for _, service := range items {
  147. services = append(services, service.(*v1.Service))
  148. }
  149. return services
  150. }
  151. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  152. var deployments []*appsv1.Deployment
  153. items := kcc.deploymentsWatch.GetAll()
  154. for _, deployment := range items {
  155. deployments = append(deployments, deployment.(*appsv1.Deployment))
  156. }
  157. return deployments
  158. }
  159. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  160. var statefulsets []*appsv1.StatefulSet
  161. items := kcc.statefulsetWatch.GetAll()
  162. for _, statefulset := range items {
  163. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  164. }
  165. return statefulsets
  166. }
  167. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  168. var pvs []*v1.PersistentVolume
  169. items := kcc.pvWatch.GetAll()
  170. for _, pv := range items {
  171. pvs = append(pvs, pv.(*v1.PersistentVolume))
  172. }
  173. return pvs
  174. }
  175. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  176. var storageClasses []*stv1.StorageClass
  177. items := kcc.storageClassWatch.GetAll()
  178. for _, stc := range items {
  179. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  180. }
  181. return storageClasses
  182. }
  183. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  184. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  185. }