clustercache.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package clustercache
  2. import (
  3. "sync"
  4. appsv1 "k8s.io/api/apps/v1"
  5. v1 "k8s.io/api/core/v1"
  6. stv1 "k8s.io/api/storage/v1"
  7. "k8s.io/apimachinery/pkg/fields"
  8. "k8s.io/client-go/kubernetes"
  9. )
  10. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  11. // up to date resources using watchers
  12. type ClusterCache interface {
  13. // Run starts the watcher processes
  14. Run()
  15. // Stops the watcher processes
  16. Stop()
  17. // Gets the underlying clientset
  18. // TODO: Remove once we support all cached cluster components
  19. GetClient() kubernetes.Interface
  20. // GetAllNamespaces returns all the cached namespaces
  21. GetAllNamespaces() []*v1.Namespace
  22. // GetAllNodes returns all the cached nodes
  23. GetAllNodes() []*v1.Node
  24. // GetAllPods returns all the cached pods
  25. GetAllPods() []*v1.Pod
  26. // GetAllServices returns all the cached services
  27. GetAllServices() []*v1.Service
  28. // GetAllDeployments returns all the cached deployments
  29. GetAllDeployments() []*appsv1.Deployment
  30. // GetAllPersistentVolumes returns all the cached persistent volumes
  31. GetAllPersistentVolumes() []*v1.PersistentVolume
  32. // GetAllStorageClasses returns all the cached storage classes
  33. GetAllStorageClasses() []*stv1.StorageClass
  34. }
  35. // KubernetesClusterCache is the implementation of ClusterCache
  36. type KubernetesClusterCache struct {
  37. client kubernetes.Interface
  38. namespaceWatch WatchController
  39. nodeWatch WatchController
  40. podWatch WatchController
  41. serviceWatch WatchController
  42. deploymentsWatch WatchController
  43. pvWatch WatchController
  44. storageClassWatch WatchController
  45. stop chan struct{}
  46. }
  47. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  48. defer wg.Done()
  49. wc.WarmUp(cancel)
  50. }
  51. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  52. coreRestClient := client.CoreV1().RESTClient()
  53. appsRestClient := client.AppsV1().RESTClient()
  54. storageRestClient := client.StorageV1().RESTClient()
  55. kcc := &KubernetesClusterCache{
  56. client: client,
  57. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  58. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  59. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  60. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  61. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  62. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  63. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  64. }
  65. // Wait for each caching watcher to initialize
  66. var wg sync.WaitGroup
  67. wg.Add(7)
  68. cancel := make(chan struct{})
  69. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  70. go initializeCache(kcc.nodeWatch, &wg, cancel)
  71. go initializeCache(kcc.podWatch, &wg, cancel)
  72. go initializeCache(kcc.serviceWatch, &wg, cancel)
  73. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  74. go initializeCache(kcc.pvWatch, &wg, cancel)
  75. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  76. wg.Wait()
  77. return kcc
  78. }
  79. func (kcc *KubernetesClusterCache) Run() {
  80. if kcc.stop != nil {
  81. return
  82. }
  83. stopCh := make(chan struct{})
  84. go kcc.namespaceWatch.Run(1, stopCh)
  85. go kcc.nodeWatch.Run(1, stopCh)
  86. go kcc.podWatch.Run(1, stopCh)
  87. go kcc.serviceWatch.Run(1, stopCh)
  88. go kcc.deploymentsWatch.Run(1, stopCh)
  89. go kcc.pvWatch.Run(1, stopCh)
  90. go kcc.storageClassWatch.Run(1, stopCh)
  91. kcc.stop = stopCh
  92. }
  93. func (kcc *KubernetesClusterCache) Stop() {
  94. if kcc.stop == nil {
  95. return
  96. }
  97. close(kcc.stop)
  98. kcc.stop = nil
  99. }
  100. func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
  101. return kcc.client
  102. }
  103. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  104. var namespaces []*v1.Namespace
  105. items := kcc.namespaceWatch.GetAll()
  106. for _, ns := range items {
  107. namespaces = append(namespaces, ns.(*v1.Namespace))
  108. }
  109. return namespaces
  110. }
  111. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  112. var nodes []*v1.Node
  113. items := kcc.nodeWatch.GetAll()
  114. for _, node := range items {
  115. nodes = append(nodes, node.(*v1.Node))
  116. }
  117. return nodes
  118. }
  119. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  120. var pods []*v1.Pod
  121. items := kcc.podWatch.GetAll()
  122. for _, pod := range items {
  123. pods = append(pods, pod.(*v1.Pod))
  124. }
  125. return pods
  126. }
  127. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  128. var services []*v1.Service
  129. items := kcc.serviceWatch.GetAll()
  130. for _, service := range items {
  131. services = append(services, service.(*v1.Service))
  132. }
  133. return services
  134. }
  135. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  136. var deployments []*appsv1.Deployment
  137. items := kcc.deploymentsWatch.GetAll()
  138. for _, deployment := range items {
  139. deployments = append(deployments, deployment.(*appsv1.Deployment))
  140. }
  141. return deployments
  142. }
  143. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  144. var pvs []*v1.PersistentVolume
  145. items := kcc.pvWatch.GetAll()
  146. for _, pv := range items {
  147. pvs = append(pvs, pv.(*v1.PersistentVolume))
  148. }
  149. return pvs
  150. }
  151. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  152. var storageClasses []*stv1.StorageClass
  153. items := kcc.storageClassWatch.GetAll()
  154. for _, stc := range items {
  155. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  156. }
  157. return storageClasses
  158. }