clustercache.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package clustercache
  2. import (
  3. "sync"
  4. cc "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/pkg/env"
  7. appsv1 "k8s.io/api/apps/v1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. policyv1 "k8s.io/api/policy/v1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. // KubernetesClusterCache is the implementation of ClusterCache
  16. type KubernetesClusterCache struct {
  17. client kubernetes.Interface
  18. namespaceWatch WatchController
  19. nodeWatch WatchController
  20. podWatch WatchController
  21. serviceWatch WatchController
  22. daemonsetsWatch WatchController
  23. deploymentsWatch WatchController
  24. statefulsetWatch WatchController
  25. replicasetWatch WatchController
  26. pvWatch WatchController
  27. pvcWatch WatchController
  28. storageClassWatch WatchController
  29. jobsWatch WatchController
  30. pdbWatch WatchController
  31. replicationControllerWatch WatchController
  32. resourceQuotasWatch WatchController
  33. stop chan struct{}
  34. }
  35. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  36. defer wg.Done()
  37. wc.WarmUp(cancel)
  38. }
  39. func NewKubernetesClusterCache(client kubernetes.Interface) cc.ClusterCache {
  40. if env.GetUseCacheV1() {
  41. return NewKubernetesClusterCacheV1(client)
  42. }
  43. return NewKubernetesClusterCacheV2(client)
  44. }
  45. func NewKubernetesClusterCacheV1(client kubernetes.Interface) cc.ClusterCache {
  46. coreRestClient := client.CoreV1().RESTClient()
  47. appsRestClient := client.AppsV1().RESTClient()
  48. storageRestClient := client.StorageV1().RESTClient()
  49. batchClient := client.BatchV1().RESTClient()
  50. pdbClient := client.PolicyV1().RESTClient()
  51. installNamespace := env.GetOpencostNamespace()
  52. log.Infof("NAMESPACE: %s", installNamespace)
  53. kcc := &KubernetesClusterCache{
  54. client: client,
  55. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  56. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  57. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  58. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  59. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  60. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  61. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  62. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  63. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  64. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  65. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  66. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  67. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  68. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  69. resourceQuotasWatch: NewCachingWatcher(coreRestClient, "resourcequotas", &v1.ResourceQuota{}, "", fields.Everything()),
  70. }
  71. // Wait for each caching watcher to initialize
  72. cancel := make(chan struct{})
  73. var wg sync.WaitGroup
  74. if env.HasKubernetesResourceAccess() {
  75. wg.Add(15)
  76. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  77. go initializeCache(kcc.nodeWatch, &wg, cancel)
  78. go initializeCache(kcc.podWatch, &wg, cancel)
  79. go initializeCache(kcc.serviceWatch, &wg, cancel)
  80. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  81. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  82. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  83. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  84. go initializeCache(kcc.pvWatch, &wg, cancel)
  85. go initializeCache(kcc.pvcWatch, &wg, cancel)
  86. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  87. go initializeCache(kcc.jobsWatch, &wg, cancel)
  88. go initializeCache(kcc.pdbWatch, &wg, cancel)
  89. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  90. go initializeCache(kcc.resourceQuotasWatch, &wg, cancel)
  91. }
  92. wg.Wait()
  93. log.Infof("Done waiting")
  94. return kcc
  95. }
  96. func (kcc *KubernetesClusterCache) Run() {
  97. if kcc.stop != nil {
  98. return
  99. }
  100. stopCh := make(chan struct{})
  101. go kcc.namespaceWatch.Run(1, stopCh)
  102. go kcc.nodeWatch.Run(1, stopCh)
  103. go kcc.podWatch.Run(1, stopCh)
  104. go kcc.serviceWatch.Run(1, stopCh)
  105. go kcc.daemonsetsWatch.Run(1, stopCh)
  106. go kcc.deploymentsWatch.Run(1, stopCh)
  107. go kcc.statefulsetWatch.Run(1, stopCh)
  108. go kcc.replicasetWatch.Run(1, stopCh)
  109. go kcc.pvWatch.Run(1, stopCh)
  110. go kcc.pvcWatch.Run(1, stopCh)
  111. go kcc.storageClassWatch.Run(1, stopCh)
  112. go kcc.jobsWatch.Run(1, stopCh)
  113. go kcc.pdbWatch.Run(1, stopCh)
  114. go kcc.replicationControllerWatch.Run(1, stopCh)
  115. go kcc.resourceQuotasWatch.Run(1, stopCh)
  116. kcc.stop = stopCh
  117. }
  118. func (kcc *KubernetesClusterCache) Stop() {
  119. if kcc.stop == nil {
  120. return
  121. }
  122. close(kcc.stop)
  123. kcc.stop = nil
  124. }
  125. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*cc.Namespace {
  126. var namespaces []*cc.Namespace
  127. items := kcc.namespaceWatch.GetAll()
  128. for _, ns := range items {
  129. namespaces = append(namespaces, cc.TransformNamespace(ns.(*v1.Namespace)))
  130. }
  131. return namespaces
  132. }
  133. func (kcc *KubernetesClusterCache) GetAllNodes() []*cc.Node {
  134. var nodes []*cc.Node
  135. items := kcc.nodeWatch.GetAll()
  136. for _, node := range items {
  137. nodes = append(nodes, cc.TransformNode(node.(*v1.Node)))
  138. }
  139. return nodes
  140. }
  141. func (kcc *KubernetesClusterCache) GetAllPods() []*cc.Pod {
  142. var pods []*cc.Pod
  143. items := kcc.podWatch.GetAll()
  144. for _, pod := range items {
  145. pods = append(pods, cc.TransformPod(pod.(*v1.Pod)))
  146. }
  147. return pods
  148. }
  149. func (kcc *KubernetesClusterCache) GetAllServices() []*cc.Service {
  150. var services []*cc.Service
  151. items := kcc.serviceWatch.GetAll()
  152. for _, service := range items {
  153. services = append(services, cc.TransformService(service.(*v1.Service)))
  154. }
  155. return services
  156. }
  157. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*cc.DaemonSet {
  158. var daemonsets []*cc.DaemonSet
  159. items := kcc.daemonsetsWatch.GetAll()
  160. for _, daemonset := range items {
  161. daemonsets = append(daemonsets, cc.TransformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  162. }
  163. return daemonsets
  164. }
  165. func (kcc *KubernetesClusterCache) GetAllDeployments() []*cc.Deployment {
  166. var deployments []*cc.Deployment
  167. items := kcc.deploymentsWatch.GetAll()
  168. for _, deployment := range items {
  169. deployments = append(deployments, cc.TransformDeployment(deployment.(*appsv1.Deployment)))
  170. }
  171. return deployments
  172. }
  173. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*cc.StatefulSet {
  174. var statefulsets []*cc.StatefulSet
  175. items := kcc.statefulsetWatch.GetAll()
  176. for _, statefulset := range items {
  177. statefulsets = append(statefulsets, cc.TransformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  178. }
  179. return statefulsets
  180. }
  181. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*cc.ReplicaSet {
  182. var replicasets []*cc.ReplicaSet
  183. items := kcc.replicasetWatch.GetAll()
  184. for _, replicaset := range items {
  185. replicasets = append(replicasets, cc.TransformReplicaSet(replicaset.(*appsv1.ReplicaSet)))
  186. }
  187. return replicasets
  188. }
  189. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*cc.PersistentVolume {
  190. var pvs []*cc.PersistentVolume
  191. items := kcc.pvWatch.GetAll()
  192. for _, pv := range items {
  193. pvs = append(pvs, cc.TransformPersistentVolume(pv.(*v1.PersistentVolume)))
  194. }
  195. return pvs
  196. }
  197. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*cc.PersistentVolumeClaim {
  198. var pvcs []*cc.PersistentVolumeClaim
  199. items := kcc.pvcWatch.GetAll()
  200. for _, pvc := range items {
  201. pvcs = append(pvcs, cc.TransformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  202. }
  203. return pvcs
  204. }
  205. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*cc.StorageClass {
  206. var storageClasses []*cc.StorageClass
  207. items := kcc.storageClassWatch.GetAll()
  208. for _, stc := range items {
  209. storageClasses = append(storageClasses, cc.TransformStorageClass(stc.(*stv1.StorageClass)))
  210. }
  211. return storageClasses
  212. }
  213. func (kcc *KubernetesClusterCache) GetAllJobs() []*cc.Job {
  214. var jobs []*cc.Job
  215. items := kcc.jobsWatch.GetAll()
  216. for _, job := range items {
  217. jobs = append(jobs, cc.TransformJob(job.(*batchv1.Job)))
  218. }
  219. return jobs
  220. }
  221. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*cc.PodDisruptionBudget {
  222. var pdbs []*cc.PodDisruptionBudget
  223. items := kcc.pdbWatch.GetAll()
  224. for _, pdb := range items {
  225. pdbs = append(pdbs, cc.TransformPodDisruptionBudget(pdb.(*policyv1.PodDisruptionBudget)))
  226. }
  227. return pdbs
  228. }
  229. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*cc.ReplicationController {
  230. var rcs []*cc.ReplicationController
  231. items := kcc.replicationControllerWatch.GetAll()
  232. for _, rc := range items {
  233. rcs = append(rcs, cc.TransformReplicationController(rc.(*v1.ReplicationController)))
  234. }
  235. return rcs
  236. }
  237. // GetAllResourceQuotas returns all cached resource quotas
  238. func (kcc *KubernetesClusterCache) GetAllResourceQuotas() []*cc.ResourceQuota {
  239. var rqs []*cc.ResourceQuota
  240. items := kcc.resourceQuotasWatch.GetAll()
  241. for _, rq := range items {
  242. rqs = append(rqs, cc.TransformResourceQuota(rq.(*v1.ResourceQuota)))
  243. }
  244. return rqs
  245. }