clustercache.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package costmodel
  2. import (
  3. "sync"
  4. appsv1 "k8s.io/api/apps/v1"
  5. v1 "k8s.io/api/core/v1"
  6. stv1 "k8s.io/api/storage/v1"
  7. "k8s.io/apimachinery/pkg/fields"
  8. "k8s.io/client-go/kubernetes"
  9. )
  10. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  11. // up to date resources using watchers
  12. type ClusterCache interface {
  13. // Run starts the watcher processes
  14. Run(stopCh chan struct{})
  15. // GetAllNamespaces returns all the cached namespaces
  16. GetAllNamespaces() []*v1.Namespace
  17. // GetAllNodes returns all the cached nodes
  18. GetAllNodes() []*v1.Node
  19. // GetAllPods returns all the cached pods
  20. GetAllPods() []*v1.Pod
  21. // GetAllServices returns all the cached services
  22. GetAllServices() []*v1.Service
  23. // GetAllDeployments returns all the cached deployments
  24. GetAllDeployments() []*appsv1.Deployment
  25. // GetAllPersistentVolumes returns all the cached persistent volumes
  26. GetAllPersistentVolumes() []*v1.PersistentVolume
  27. // GetAllStorageClasses returns all the cached storage classes
  28. GetAllStorageClasses() []*stv1.StorageClass
  29. }
  30. // KubernetesClusterCache is the implementation of ClusterCache
  31. type KubernetesClusterCache struct {
  32. client kubernetes.Interface
  33. namespaceWatch WatchController
  34. nodeWatch WatchController
  35. podWatch WatchController
  36. serviceWatch WatchController
  37. deploymentsWatch WatchController
  38. pvWatch WatchController
  39. storageClassWatch WatchController
  40. }
  41. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  42. defer wg.Done()
  43. wc.WarmUp(cancel)
  44. }
  45. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  46. coreRestClient := client.CoreV1().RESTClient()
  47. appsRestClient := client.AppsV1().RESTClient()
  48. storageRestClient := client.StorageV1().RESTClient()
  49. kcc := &KubernetesClusterCache{
  50. client: client,
  51. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  52. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  53. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  54. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  55. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  56. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  57. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  58. }
  59. // Wait for each caching watcher to initialize
  60. var wg sync.WaitGroup
  61. wg.Add(7)
  62. cancel := make(chan struct{})
  63. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  64. go initializeCache(kcc.nodeWatch, &wg, cancel)
  65. go initializeCache(kcc.podWatch, &wg, cancel)
  66. go initializeCache(kcc.serviceWatch, &wg, cancel)
  67. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  68. go initializeCache(kcc.pvWatch, &wg, cancel)
  69. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  70. wg.Wait()
  71. return kcc
  72. }
  73. func (kcc *KubernetesClusterCache) Run(stopCh chan struct{}) {
  74. go kcc.namespaceWatch.Run(1, stopCh)
  75. go kcc.nodeWatch.Run(1, stopCh)
  76. go kcc.podWatch.Run(1, stopCh)
  77. go kcc.serviceWatch.Run(1, stopCh)
  78. go kcc.deploymentsWatch.Run(1, stopCh)
  79. go kcc.pvWatch.Run(1, stopCh)
  80. go kcc.storageClassWatch.Run(1, stopCh)
  81. }
  82. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  83. var namespaces []*v1.Namespace
  84. items := kcc.namespaceWatch.GetAll()
  85. for _, ns := range items {
  86. namespaces = append(namespaces, ns.(*v1.Namespace))
  87. }
  88. return namespaces
  89. }
  90. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  91. var nodes []*v1.Node
  92. items := kcc.nodeWatch.GetAll()
  93. for _, node := range items {
  94. nodes = append(nodes, node.(*v1.Node))
  95. }
  96. return nodes
  97. }
  98. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  99. var pods []*v1.Pod
  100. items := kcc.podWatch.GetAll()
  101. for _, pod := range items {
  102. pods = append(pods, pod.(*v1.Pod))
  103. }
  104. return pods
  105. }
  106. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  107. var services []*v1.Service
  108. items := kcc.serviceWatch.GetAll()
  109. for _, service := range items {
  110. services = append(services, service.(*v1.Service))
  111. }
  112. return services
  113. }
  114. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  115. var deployments []*appsv1.Deployment
  116. items := kcc.deploymentsWatch.GetAll()
  117. for _, deployment := range items {
  118. deployments = append(deployments, deployment.(*appsv1.Deployment))
  119. }
  120. return deployments
  121. }
  122. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  123. var pvs []*v1.PersistentVolume
  124. items := kcc.pvWatch.GetAll()
  125. for _, pv := range items {
  126. pvs = append(pvs, pv.(*v1.PersistentVolume))
  127. }
  128. return pvs
  129. }
  130. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  131. var storageClasses []*stv1.StorageClass
  132. items := kcc.storageClassWatch.GetAll()
  133. for _, stc := range items {
  134. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  135. }
  136. return storageClasses
  137. }