clustercache.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. appsv1 "k8s.io/api/apps/v1"
  7. batchv1 "k8s.io/api/batch/v1"
  8. v1 "k8s.io/api/core/v1"
  9. policyv1 "k8s.io/api/policy/v1"
  10. stv1 "k8s.io/api/storage/v1"
  11. "k8s.io/apimachinery/pkg/fields"
  12. "k8s.io/client-go/kubernetes"
  13. )
  14. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  15. // up to date resources using watchers
  16. type ClusterCache interface {
  17. // Run starts the watcher processes
  18. Run()
  19. // Stops the watcher processes
  20. Stop()
  21. // GetAllNamespaces returns all the cached namespaces
  22. GetAllNamespaces() []*v1.Namespace
  23. // GetAllNodes returns all the cached nodes
  24. GetAllNodes() []*v1.Node
  25. // GetAllPods returns all the cached pods
  26. GetAllPods() []*v1.Pod
  27. // GetAllServices returns all the cached services
  28. GetAllServices() []*v1.Service
  29. // GetAllDaemonSets returns all the cached DaemonSets
  30. GetAllDaemonSets() []*appsv1.DaemonSet
  31. // GetAllDeployments returns all the cached deployments
  32. GetAllDeployments() []*appsv1.Deployment
  33. // GetAllStatfulSets returns all the cached StatefulSets
  34. GetAllStatefulSets() []*appsv1.StatefulSet
  35. // GetAllReplicaSets returns all the cached ReplicaSets
  36. GetAllReplicaSets() []*appsv1.ReplicaSet
  37. // GetAllPersistentVolumes returns all the cached persistent volumes
  38. GetAllPersistentVolumes() []*v1.PersistentVolume
  39. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  40. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  41. // GetAllStorageClasses returns all the cached storage classes
  42. GetAllStorageClasses() []*stv1.StorageClass
  43. // GetAllJobs returns all the cached jobs
  44. GetAllJobs() []*batchv1.Job
  45. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  46. GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget
  47. // GetAllReplicationControllers returns all cached replication controllers
  48. GetAllReplicationControllers() []*v1.ReplicationController
  49. // SetConfigMapUpdateFunc sets the configmap update function
  50. SetConfigMapUpdateFunc(func(interface{}))
  51. }
  52. // KubernetesClusterCache is the implementation of ClusterCache
  53. type KubernetesClusterCache struct {
  54. client kubernetes.Interface
  55. namespaceWatch WatchController
  56. nodeWatch WatchController
  57. podWatch WatchController
  58. kubecostConfigMapWatch WatchController
  59. serviceWatch WatchController
  60. daemonsetsWatch WatchController
  61. deploymentsWatch WatchController
  62. statefulsetWatch WatchController
  63. replicasetWatch WatchController
  64. pvWatch WatchController
  65. pvcWatch WatchController
  66. storageClassWatch WatchController
  67. jobsWatch WatchController
  68. pdbWatch WatchController
  69. replicationControllerWatch WatchController
  70. stop chan struct{}
  71. }
  72. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  73. defer wg.Done()
  74. wc.WarmUp(cancel)
  75. }
  76. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  77. coreRestClient := client.CoreV1().RESTClient()
  78. appsRestClient := client.AppsV1().RESTClient()
  79. storageRestClient := client.StorageV1().RESTClient()
  80. batchClient := client.BatchV1().RESTClient()
  81. pdbClient := client.PolicyV1().RESTClient()
  82. kubecostNamespace := env.GetKubecostNamespace()
  83. log.Infof("NAMESPACE: %s", kubecostNamespace)
  84. kcc := &KubernetesClusterCache{
  85. client: client,
  86. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  87. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  88. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  89. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  90. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  91. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  92. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  93. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  94. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  95. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  96. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  97. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  98. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  99. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  100. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  101. }
  102. // Wait for each caching watcher to initialize
  103. cancel := make(chan struct{})
  104. var wg sync.WaitGroup
  105. if env.IsETLReadOnlyMode() {
  106. wg.Add(1)
  107. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  108. } else {
  109. wg.Add(15)
  110. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  111. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  112. go initializeCache(kcc.nodeWatch, &wg, cancel)
  113. go initializeCache(kcc.podWatch, &wg, cancel)
  114. go initializeCache(kcc.serviceWatch, &wg, cancel)
  115. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  116. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  117. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  118. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  119. go initializeCache(kcc.pvWatch, &wg, cancel)
  120. go initializeCache(kcc.pvcWatch, &wg, cancel)
  121. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  122. go initializeCache(kcc.jobsWatch, &wg, cancel)
  123. go initializeCache(kcc.podWatch, &wg, cancel)
  124. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  125. }
  126. wg.Wait()
  127. log.Infof("Done waiting")
  128. return kcc
  129. }
  130. func (kcc *KubernetesClusterCache) Run() {
  131. if kcc.stop != nil {
  132. return
  133. }
  134. stopCh := make(chan struct{})
  135. go kcc.namespaceWatch.Run(1, stopCh)
  136. go kcc.nodeWatch.Run(1, stopCh)
  137. go kcc.podWatch.Run(1, stopCh)
  138. go kcc.serviceWatch.Run(1, stopCh)
  139. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  140. go kcc.daemonsetsWatch.Run(1, stopCh)
  141. go kcc.deploymentsWatch.Run(1, stopCh)
  142. go kcc.statefulsetWatch.Run(1, stopCh)
  143. go kcc.replicasetWatch.Run(1, stopCh)
  144. go kcc.pvWatch.Run(1, stopCh)
  145. go kcc.pvcWatch.Run(1, stopCh)
  146. go kcc.storageClassWatch.Run(1, stopCh)
  147. go kcc.jobsWatch.Run(1, stopCh)
  148. go kcc.pdbWatch.Run(1, stopCh)
  149. go kcc.replicationControllerWatch.Run(1, stopCh)
  150. kcc.stop = stopCh
  151. }
  152. func (kcc *KubernetesClusterCache) Stop() {
  153. if kcc.stop == nil {
  154. return
  155. }
  156. close(kcc.stop)
  157. kcc.stop = nil
  158. }
  159. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  160. var namespaces []*v1.Namespace
  161. items := kcc.namespaceWatch.GetAll()
  162. for _, ns := range items {
  163. namespaces = append(namespaces, ns.(*v1.Namespace))
  164. }
  165. return namespaces
  166. }
  167. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  168. var nodes []*v1.Node
  169. items := kcc.nodeWatch.GetAll()
  170. for _, node := range items {
  171. nodes = append(nodes, node.(*v1.Node))
  172. }
  173. return nodes
  174. }
  175. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  176. var pods []*v1.Pod
  177. items := kcc.podWatch.GetAll()
  178. for _, pod := range items {
  179. pods = append(pods, pod.(*v1.Pod))
  180. }
  181. return pods
  182. }
  183. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  184. var services []*v1.Service
  185. items := kcc.serviceWatch.GetAll()
  186. for _, service := range items {
  187. services = append(services, service.(*v1.Service))
  188. }
  189. return services
  190. }
  191. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  192. var daemonsets []*appsv1.DaemonSet
  193. items := kcc.daemonsetsWatch.GetAll()
  194. for _, daemonset := range items {
  195. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  196. }
  197. return daemonsets
  198. }
  199. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  200. var deployments []*appsv1.Deployment
  201. items := kcc.deploymentsWatch.GetAll()
  202. for _, deployment := range items {
  203. deployments = append(deployments, deployment.(*appsv1.Deployment))
  204. }
  205. return deployments
  206. }
  207. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  208. var statefulsets []*appsv1.StatefulSet
  209. items := kcc.statefulsetWatch.GetAll()
  210. for _, statefulset := range items {
  211. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  212. }
  213. return statefulsets
  214. }
  215. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  216. var replicasets []*appsv1.ReplicaSet
  217. items := kcc.replicasetWatch.GetAll()
  218. for _, replicaset := range items {
  219. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  220. }
  221. return replicasets
  222. }
  223. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  224. var pvs []*v1.PersistentVolume
  225. items := kcc.pvWatch.GetAll()
  226. for _, pv := range items {
  227. pvs = append(pvs, pv.(*v1.PersistentVolume))
  228. }
  229. return pvs
  230. }
  231. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  232. var pvcs []*v1.PersistentVolumeClaim
  233. items := kcc.pvcWatch.GetAll()
  234. for _, pvc := range items {
  235. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  236. }
  237. return pvcs
  238. }
  239. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  240. var storageClasses []*stv1.StorageClass
  241. items := kcc.storageClassWatch.GetAll()
  242. for _, stc := range items {
  243. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  244. }
  245. return storageClasses
  246. }
  247. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  248. var jobs []*batchv1.Job
  249. items := kcc.jobsWatch.GetAll()
  250. for _, job := range items {
  251. jobs = append(jobs, job.(*batchv1.Job))
  252. }
  253. return jobs
  254. }
  255. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
  256. var pdbs []*policyv1.PodDisruptionBudget
  257. items := kcc.pdbWatch.GetAll()
  258. for _, pdb := range items {
  259. pdbs = append(pdbs, pdb.(*policyv1.PodDisruptionBudget))
  260. }
  261. return pdbs
  262. }
  263. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
  264. var rcs []*v1.ReplicationController
  265. items := kcc.replicationControllerWatch.GetAll()
  266. for _, rc := range items {
  267. rcs = append(rcs, rc.(*v1.ReplicationController))
  268. }
  269. return rcs
  270. }
  271. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  272. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  273. }