clustercache.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package clustercache
  2. import (
  3. "sync"
  4. cc "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/pkg/env"
  7. appsv1 "k8s.io/api/apps/v1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. policyv1 "k8s.io/api/policy/v1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. // KubernetesClusterCache is the implementation of ClusterCache
  16. type KubernetesClusterCache struct {
  17. client kubernetes.Interface
  18. namespaceWatch WatchController
  19. nodeWatch WatchController
  20. podWatch WatchController
  21. serviceWatch WatchController
  22. daemonsetsWatch WatchController
  23. deploymentsWatch WatchController
  24. statefulsetWatch WatchController
  25. replicasetWatch WatchController
  26. pvWatch WatchController
  27. pvcWatch WatchController
  28. storageClassWatch WatchController
  29. jobsWatch WatchController
  30. pdbWatch WatchController
  31. replicationControllerWatch WatchController
  32. stop chan struct{}
  33. }
  34. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  35. defer wg.Done()
  36. wc.WarmUp(cancel)
  37. }
  38. func NewKubernetesClusterCache(client kubernetes.Interface) cc.ClusterCache {
  39. if env.GetUseCacheV1() {
  40. return NewKubernetesClusterCacheV1(client)
  41. }
  42. return NewKubernetesClusterCacheV2(client)
  43. }
  44. func NewKubernetesClusterCacheV1(client kubernetes.Interface) cc.ClusterCache {
  45. coreRestClient := client.CoreV1().RESTClient()
  46. appsRestClient := client.AppsV1().RESTClient()
  47. storageRestClient := client.StorageV1().RESTClient()
  48. batchClient := client.BatchV1().RESTClient()
  49. pdbClient := client.PolicyV1().RESTClient()
  50. installNamespace := env.GetInstallNamespace()
  51. log.Infof("NAMESPACE: %s", installNamespace)
  52. kcc := &KubernetesClusterCache{
  53. client: client,
  54. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  55. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  56. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  57. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  58. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  59. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  60. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  61. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  62. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  63. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  64. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  65. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  66. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  67. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  68. }
  69. // Wait for each caching watcher to initialize
  70. cancel := make(chan struct{})
  71. var wg sync.WaitGroup
  72. if !env.IsETLReadOnlyMode() {
  73. wg.Add(14)
  74. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  75. go initializeCache(kcc.nodeWatch, &wg, cancel)
  76. go initializeCache(kcc.podWatch, &wg, cancel)
  77. go initializeCache(kcc.serviceWatch, &wg, cancel)
  78. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  79. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  80. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  81. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  82. go initializeCache(kcc.pvWatch, &wg, cancel)
  83. go initializeCache(kcc.pvcWatch, &wg, cancel)
  84. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  85. go initializeCache(kcc.jobsWatch, &wg, cancel)
  86. go initializeCache(kcc.pdbWatch, &wg, cancel)
  87. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  88. }
  89. wg.Wait()
  90. log.Infof("Done waiting")
  91. return kcc
  92. }
  93. func (kcc *KubernetesClusterCache) Run() {
  94. if kcc.stop != nil {
  95. return
  96. }
  97. stopCh := make(chan struct{})
  98. go kcc.namespaceWatch.Run(1, stopCh)
  99. go kcc.nodeWatch.Run(1, stopCh)
  100. go kcc.podWatch.Run(1, stopCh)
  101. go kcc.serviceWatch.Run(1, stopCh)
  102. go kcc.daemonsetsWatch.Run(1, stopCh)
  103. go kcc.deploymentsWatch.Run(1, stopCh)
  104. go kcc.statefulsetWatch.Run(1, stopCh)
  105. go kcc.replicasetWatch.Run(1, stopCh)
  106. go kcc.pvWatch.Run(1, stopCh)
  107. go kcc.pvcWatch.Run(1, stopCh)
  108. go kcc.storageClassWatch.Run(1, stopCh)
  109. go kcc.jobsWatch.Run(1, stopCh)
  110. go kcc.pdbWatch.Run(1, stopCh)
  111. go kcc.replicationControllerWatch.Run(1, stopCh)
  112. kcc.stop = stopCh
  113. }
  114. func (kcc *KubernetesClusterCache) Stop() {
  115. if kcc.stop == nil {
  116. return
  117. }
  118. close(kcc.stop)
  119. kcc.stop = nil
  120. }
  121. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*cc.Namespace {
  122. var namespaces []*cc.Namespace
  123. items := kcc.namespaceWatch.GetAll()
  124. for _, ns := range items {
  125. namespaces = append(namespaces, cc.TransformNamespace(ns.(*v1.Namespace)))
  126. }
  127. return namespaces
  128. }
  129. func (kcc *KubernetesClusterCache) GetAllNodes() []*cc.Node {
  130. var nodes []*cc.Node
  131. items := kcc.nodeWatch.GetAll()
  132. for _, node := range items {
  133. nodes = append(nodes, cc.TransformNode(node.(*v1.Node)))
  134. }
  135. return nodes
  136. }
  137. func (kcc *KubernetesClusterCache) GetAllPods() []*cc.Pod {
  138. var pods []*cc.Pod
  139. items := kcc.podWatch.GetAll()
  140. for _, pod := range items {
  141. pods = append(pods, cc.TransformPod(pod.(*v1.Pod)))
  142. }
  143. return pods
  144. }
  145. func (kcc *KubernetesClusterCache) GetAllServices() []*cc.Service {
  146. var services []*cc.Service
  147. items := kcc.serviceWatch.GetAll()
  148. for _, service := range items {
  149. services = append(services, cc.TransformService(service.(*v1.Service)))
  150. }
  151. return services
  152. }
  153. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*cc.DaemonSet {
  154. var daemonsets []*cc.DaemonSet
  155. items := kcc.daemonsetsWatch.GetAll()
  156. for _, daemonset := range items {
  157. daemonsets = append(daemonsets, cc.TransformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  158. }
  159. return daemonsets
  160. }
  161. func (kcc *KubernetesClusterCache) GetAllDeployments() []*cc.Deployment {
  162. var deployments []*cc.Deployment
  163. items := kcc.deploymentsWatch.GetAll()
  164. for _, deployment := range items {
  165. deployments = append(deployments, cc.TransformDeployment(deployment.(*appsv1.Deployment)))
  166. }
  167. return deployments
  168. }
  169. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*cc.StatefulSet {
  170. var statefulsets []*cc.StatefulSet
  171. items := kcc.statefulsetWatch.GetAll()
  172. for _, statefulset := range items {
  173. statefulsets = append(statefulsets, cc.TransformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  174. }
  175. return statefulsets
  176. }
  177. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*cc.ReplicaSet {
  178. var replicasets []*cc.ReplicaSet
  179. items := kcc.replicasetWatch.GetAll()
  180. for _, replicaset := range items {
  181. replicasets = append(replicasets, cc.TransformReplicaSet(replicaset.(*appsv1.ReplicaSet)))
  182. }
  183. return replicasets
  184. }
  185. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*cc.PersistentVolume {
  186. var pvs []*cc.PersistentVolume
  187. items := kcc.pvWatch.GetAll()
  188. for _, pv := range items {
  189. pvs = append(pvs, cc.TransformPersistentVolume(pv.(*v1.PersistentVolume)))
  190. }
  191. return pvs
  192. }
  193. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*cc.PersistentVolumeClaim {
  194. var pvcs []*cc.PersistentVolumeClaim
  195. items := kcc.pvcWatch.GetAll()
  196. for _, pvc := range items {
  197. pvcs = append(pvcs, cc.TransformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  198. }
  199. return pvcs
  200. }
  201. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*cc.StorageClass {
  202. var storageClasses []*cc.StorageClass
  203. items := kcc.storageClassWatch.GetAll()
  204. for _, stc := range items {
  205. storageClasses = append(storageClasses, cc.TransformStorageClass(stc.(*stv1.StorageClass)))
  206. }
  207. return storageClasses
  208. }
  209. func (kcc *KubernetesClusterCache) GetAllJobs() []*cc.Job {
  210. var jobs []*cc.Job
  211. items := kcc.jobsWatch.GetAll()
  212. for _, job := range items {
  213. jobs = append(jobs, cc.TransformJob(job.(*batchv1.Job)))
  214. }
  215. return jobs
  216. }
  217. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*cc.PodDisruptionBudget {
  218. var pdbs []*cc.PodDisruptionBudget
  219. items := kcc.pdbWatch.GetAll()
  220. for _, pdb := range items {
  221. pdbs = append(pdbs, cc.TransformPodDisruptionBudget(pdb.(*policyv1.PodDisruptionBudget)))
  222. }
  223. return pdbs
  224. }
  225. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*cc.ReplicationController {
  226. var rcs []*cc.ReplicationController
  227. items := kcc.replicationControllerWatch.GetAll()
  228. for _, rc := range items {
  229. rcs = append(rcs, cc.TransformReplicationController(rc.(*v1.ReplicationController)))
  230. }
  231. return rcs
  232. }