clustercache.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/pkg/env"
  5. "github.com/opencost/opencost/pkg/log"
  6. appsv1 "k8s.io/api/apps/v1"
  7. autoscaling "k8s.io/api/autoscaling/v2beta1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/api/policy/v1beta1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  16. // up to date resources using watchers
  17. type ClusterCache interface {
  18. // Run starts the watcher processes
  19. Run()
  20. // Stops the watcher processes
  21. Stop()
  22. // GetAllNamespaces returns all the cached namespaces
  23. GetAllNamespaces() []*v1.Namespace
  24. // GetAllNodes returns all the cached nodes
  25. GetAllNodes() []*v1.Node
  26. // GetAllPods returns all the cached pods
  27. GetAllPods() []*v1.Pod
  28. // GetAllServices returns all the cached services
  29. GetAllServices() []*v1.Service
  30. // GetAllDaemonSets returns all the cached DaemonSets
  31. GetAllDaemonSets() []*appsv1.DaemonSet
  32. // GetAllDeployments returns all the cached deployments
  33. GetAllDeployments() []*appsv1.Deployment
  34. // GetAllStatfulSets returns all the cached StatefulSets
  35. GetAllStatefulSets() []*appsv1.StatefulSet
  36. // GetAllReplicaSets returns all the cached ReplicaSets
  37. GetAllReplicaSets() []*appsv1.ReplicaSet
  38. // GetAllPersistentVolumes returns all the cached persistent volumes
  39. GetAllPersistentVolumes() []*v1.PersistentVolume
  40. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  41. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  42. // GetAllStorageClasses returns all the cached storage classes
  43. GetAllStorageClasses() []*stv1.StorageClass
  44. // GetAllJobs returns all the cached jobs
  45. GetAllJobs() []*batchv1.Job
  46. // GetAllHorizontalPodAutoscalers returns all cached horizontal pod autoscalers
  47. GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler
  48. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  49. GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget
  50. // GetAllReplicationControllers returns all cached replication controllers
  51. GetAllReplicationControllers() []*v1.ReplicationController
  52. // SetConfigMapUpdateFunc sets the configmap update function
  53. SetConfigMapUpdateFunc(func(interface{}))
  54. }
  55. // KubernetesClusterCache is the implementation of ClusterCache
  56. type KubernetesClusterCache struct {
  57. client kubernetes.Interface
  58. namespaceWatch WatchController
  59. nodeWatch WatchController
  60. podWatch WatchController
  61. kubecostConfigMapWatch WatchController
  62. serviceWatch WatchController
  63. daemonsetsWatch WatchController
  64. deploymentsWatch WatchController
  65. statefulsetWatch WatchController
  66. replicasetWatch WatchController
  67. pvWatch WatchController
  68. pvcWatch WatchController
  69. storageClassWatch WatchController
  70. jobsWatch WatchController
  71. hpaWatch WatchController
  72. pdbWatch WatchController
  73. replicationControllerWatch WatchController
  74. stop chan struct{}
  75. }
  76. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  77. defer wg.Done()
  78. wc.WarmUp(cancel)
  79. }
  80. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  81. coreRestClient := client.CoreV1().RESTClient()
  82. appsRestClient := client.AppsV1().RESTClient()
  83. storageRestClient := client.StorageV1().RESTClient()
  84. batchClient := client.BatchV1().RESTClient()
  85. autoscalingClient := client.AutoscalingV2beta1().RESTClient()
  86. pdbClient := client.PolicyV1beta1().RESTClient()
  87. kubecostNamespace := env.GetKubecostNamespace()
  88. log.Infof("NAMESPACE: %s", kubecostNamespace)
  89. kcc := &KubernetesClusterCache{
  90. client: client,
  91. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  92. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  93. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  94. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  95. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  96. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  97. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  98. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  99. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  100. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  101. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  102. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  103. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  104. hpaWatch: NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
  105. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &v1beta1.PodDisruptionBudget{}, "", fields.Everything()),
  106. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  107. }
  108. // Wait for each caching watcher to initialize
  109. cancel := make(chan struct{})
  110. var wg sync.WaitGroup
  111. if env.IsETLReadOnlyMode() {
  112. wg.Add(1)
  113. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  114. } else {
  115. wg.Add(16)
  116. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  117. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  118. go initializeCache(kcc.nodeWatch, &wg, cancel)
  119. go initializeCache(kcc.podWatch, &wg, cancel)
  120. go initializeCache(kcc.serviceWatch, &wg, cancel)
  121. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  122. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  123. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  124. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  125. go initializeCache(kcc.pvWatch, &wg, cancel)
  126. go initializeCache(kcc.pvcWatch, &wg, cancel)
  127. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  128. go initializeCache(kcc.jobsWatch, &wg, cancel)
  129. go initializeCache(kcc.hpaWatch, &wg, cancel)
  130. go initializeCache(kcc.podWatch, &wg, cancel)
  131. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  132. }
  133. wg.Wait()
  134. log.Infof("Done waiting")
  135. return kcc
  136. }
  137. func (kcc *KubernetesClusterCache) Run() {
  138. if kcc.stop != nil {
  139. return
  140. }
  141. stopCh := make(chan struct{})
  142. go kcc.namespaceWatch.Run(1, stopCh)
  143. go kcc.nodeWatch.Run(1, stopCh)
  144. go kcc.podWatch.Run(1, stopCh)
  145. go kcc.serviceWatch.Run(1, stopCh)
  146. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  147. go kcc.daemonsetsWatch.Run(1, stopCh)
  148. go kcc.deploymentsWatch.Run(1, stopCh)
  149. go kcc.statefulsetWatch.Run(1, stopCh)
  150. go kcc.replicasetWatch.Run(1, stopCh)
  151. go kcc.pvWatch.Run(1, stopCh)
  152. go kcc.pvcWatch.Run(1, stopCh)
  153. go kcc.storageClassWatch.Run(1, stopCh)
  154. go kcc.jobsWatch.Run(1, stopCh)
  155. go kcc.hpaWatch.Run(1, stopCh)
  156. go kcc.pdbWatch.Run(1, stopCh)
  157. go kcc.replicationControllerWatch.Run(1, stopCh)
  158. kcc.stop = stopCh
  159. }
  160. func (kcc *KubernetesClusterCache) Stop() {
  161. if kcc.stop == nil {
  162. return
  163. }
  164. close(kcc.stop)
  165. kcc.stop = nil
  166. }
  167. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  168. var namespaces []*v1.Namespace
  169. items := kcc.namespaceWatch.GetAll()
  170. for _, ns := range items {
  171. namespaces = append(namespaces, ns.(*v1.Namespace))
  172. }
  173. return namespaces
  174. }
  175. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  176. var nodes []*v1.Node
  177. items := kcc.nodeWatch.GetAll()
  178. for _, node := range items {
  179. nodes = append(nodes, node.(*v1.Node))
  180. }
  181. return nodes
  182. }
  183. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  184. var pods []*v1.Pod
  185. items := kcc.podWatch.GetAll()
  186. for _, pod := range items {
  187. pods = append(pods, pod.(*v1.Pod))
  188. }
  189. return pods
  190. }
  191. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  192. var services []*v1.Service
  193. items := kcc.serviceWatch.GetAll()
  194. for _, service := range items {
  195. services = append(services, service.(*v1.Service))
  196. }
  197. return services
  198. }
  199. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  200. var daemonsets []*appsv1.DaemonSet
  201. items := kcc.daemonsetsWatch.GetAll()
  202. for _, daemonset := range items {
  203. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  204. }
  205. return daemonsets
  206. }
  207. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  208. var deployments []*appsv1.Deployment
  209. items := kcc.deploymentsWatch.GetAll()
  210. for _, deployment := range items {
  211. deployments = append(deployments, deployment.(*appsv1.Deployment))
  212. }
  213. return deployments
  214. }
  215. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  216. var statefulsets []*appsv1.StatefulSet
  217. items := kcc.statefulsetWatch.GetAll()
  218. for _, statefulset := range items {
  219. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  220. }
  221. return statefulsets
  222. }
  223. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  224. var replicasets []*appsv1.ReplicaSet
  225. items := kcc.replicasetWatch.GetAll()
  226. for _, replicaset := range items {
  227. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  228. }
  229. return replicasets
  230. }
  231. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  232. var pvs []*v1.PersistentVolume
  233. items := kcc.pvWatch.GetAll()
  234. for _, pv := range items {
  235. pvs = append(pvs, pv.(*v1.PersistentVolume))
  236. }
  237. return pvs
  238. }
  239. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  240. var pvcs []*v1.PersistentVolumeClaim
  241. items := kcc.pvcWatch.GetAll()
  242. for _, pvc := range items {
  243. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  244. }
  245. return pvcs
  246. }
  247. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  248. var storageClasses []*stv1.StorageClass
  249. items := kcc.storageClassWatch.GetAll()
  250. for _, stc := range items {
  251. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  252. }
  253. return storageClasses
  254. }
  255. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  256. var jobs []*batchv1.Job
  257. items := kcc.jobsWatch.GetAll()
  258. for _, job := range items {
  259. jobs = append(jobs, job.(*batchv1.Job))
  260. }
  261. return jobs
  262. }
  263. func (kcc *KubernetesClusterCache) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
  264. var hpas []*autoscaling.HorizontalPodAutoscaler
  265. items := kcc.hpaWatch.GetAll()
  266. for _, hpa := range items {
  267. hpas = append(hpas, hpa.(*autoscaling.HorizontalPodAutoscaler))
  268. }
  269. return hpas
  270. }
  271. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
  272. var pdbs []*v1beta1.PodDisruptionBudget
  273. items := kcc.pdbWatch.GetAll()
  274. for _, pdb := range items {
  275. pdbs = append(pdbs, pdb.(*v1beta1.PodDisruptionBudget))
  276. }
  277. return pdbs
  278. }
  279. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
  280. var rcs []*v1.ReplicationController
  281. items := kcc.replicationControllerWatch.GetAll()
  282. for _, rc := range items {
  283. rcs = append(rcs, rc.(*v1.ReplicationController))
  284. }
  285. return rcs
  286. }
  287. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  288. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  289. }