clustercache.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package clustercache
  2. import (
  3. "sync"
  4. cc "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/pkg/env"
  7. appsv1 "k8s.io/api/apps/v1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. policyv1 "k8s.io/api/policy/v1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. // KubernetesClusterCache is the implementation of ClusterCache
  16. type KubernetesClusterCache struct {
  17. client kubernetes.Interface
  18. namespaceWatch WatchController
  19. nodeWatch WatchController
  20. podWatch WatchController
  21. serviceWatch WatchController
  22. daemonsetsWatch WatchController
  23. deploymentsWatch WatchController
  24. statefulsetWatch WatchController
  25. replicasetWatch WatchController
  26. pvWatch WatchController
  27. pvcWatch WatchController
  28. storageClassWatch WatchController
  29. jobsWatch WatchController
  30. cronjobsWatch WatchController
  31. pdbWatch WatchController
  32. replicationControllerWatch WatchController
  33. resourceQuotasWatch WatchController
  34. stop chan struct{}
  35. }
  36. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  37. defer wg.Done()
  38. wc.WarmUp(cancel)
  39. }
  40. func NewKubernetesClusterCache(client kubernetes.Interface) cc.ClusterCache {
  41. if env.GetUseCacheV1() {
  42. return NewKubernetesClusterCacheV1(client)
  43. }
  44. return NewKubernetesClusterCacheV2(client)
  45. }
  46. func NewKubernetesClusterCacheV1(client kubernetes.Interface) cc.ClusterCache {
  47. coreRestClient := client.CoreV1().RESTClient()
  48. appsRestClient := client.AppsV1().RESTClient()
  49. storageRestClient := client.StorageV1().RESTClient()
  50. batchClient := client.BatchV1().RESTClient()
  51. pdbClient := client.PolicyV1().RESTClient()
  52. installNamespace := env.GetOpencostNamespace()
  53. log.Infof("NAMESPACE: %s", installNamespace)
  54. kcc := &KubernetesClusterCache{
  55. client: client,
  56. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  57. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  58. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  59. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  60. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  61. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  62. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  63. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  64. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  65. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  66. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  67. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  68. cronjobsWatch: NewCachingWatcher(batchClient, "cronjobs", &batchv1.CronJob{}, "", fields.Everything()),
  69. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  70. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  71. resourceQuotasWatch: NewCachingWatcher(coreRestClient, "resourcequotas", &v1.ResourceQuota{}, "", fields.Everything()),
  72. }
  73. // Wait for each caching watcher to initialize
  74. cancel := make(chan struct{})
  75. var wg sync.WaitGroup
  76. if env.HasKubernetesResourceAccess() {
  77. wg.Add(16)
  78. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  79. go initializeCache(kcc.nodeWatch, &wg, cancel)
  80. go initializeCache(kcc.podWatch, &wg, cancel)
  81. go initializeCache(kcc.serviceWatch, &wg, cancel)
  82. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  83. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  84. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  85. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  86. go initializeCache(kcc.pvWatch, &wg, cancel)
  87. go initializeCache(kcc.pvcWatch, &wg, cancel)
  88. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  89. go initializeCache(kcc.jobsWatch, &wg, cancel)
  90. go initializeCache(kcc.cronjobsWatch, &wg, cancel)
  91. go initializeCache(kcc.pdbWatch, &wg, cancel)
  92. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  93. go initializeCache(kcc.resourceQuotasWatch, &wg, cancel)
  94. }
  95. wg.Wait()
  96. log.Infof("Done waiting")
  97. return kcc
  98. }
  99. func (kcc *KubernetesClusterCache) Run() {
  100. if kcc.stop != nil {
  101. return
  102. }
  103. stopCh := make(chan struct{})
  104. go kcc.namespaceWatch.Run(1, stopCh)
  105. go kcc.nodeWatch.Run(1, stopCh)
  106. go kcc.podWatch.Run(1, stopCh)
  107. go kcc.serviceWatch.Run(1, stopCh)
  108. go kcc.daemonsetsWatch.Run(1, stopCh)
  109. go kcc.deploymentsWatch.Run(1, stopCh)
  110. go kcc.statefulsetWatch.Run(1, stopCh)
  111. go kcc.replicasetWatch.Run(1, stopCh)
  112. go kcc.pvWatch.Run(1, stopCh)
  113. go kcc.pvcWatch.Run(1, stopCh)
  114. go kcc.storageClassWatch.Run(1, stopCh)
  115. go kcc.jobsWatch.Run(1, stopCh)
  116. go kcc.cronjobsWatch.Run(1, stopCh)
  117. go kcc.pdbWatch.Run(1, stopCh)
  118. go kcc.replicationControllerWatch.Run(1, stopCh)
  119. go kcc.resourceQuotasWatch.Run(1, stopCh)
  120. kcc.stop = stopCh
  121. }
  122. func (kcc *KubernetesClusterCache) Stop() {
  123. if kcc.stop == nil {
  124. return
  125. }
  126. close(kcc.stop)
  127. kcc.stop = nil
  128. }
  129. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*cc.Namespace {
  130. var namespaces []*cc.Namespace
  131. items := kcc.namespaceWatch.GetAll()
  132. for _, ns := range items {
  133. namespaces = append(namespaces, cc.TransformNamespace(ns.(*v1.Namespace)))
  134. }
  135. return namespaces
  136. }
  137. func (kcc *KubernetesClusterCache) GetAllNodes() []*cc.Node {
  138. var nodes []*cc.Node
  139. items := kcc.nodeWatch.GetAll()
  140. for _, node := range items {
  141. nodes = append(nodes, cc.TransformNode(node.(*v1.Node)))
  142. }
  143. return nodes
  144. }
  145. func (kcc *KubernetesClusterCache) GetAllPods() []*cc.Pod {
  146. var pods []*cc.Pod
  147. items := kcc.podWatch.GetAll()
  148. for _, pod := range items {
  149. pods = append(pods, cc.TransformPod(pod.(*v1.Pod)))
  150. }
  151. return pods
  152. }
  153. func (kcc *KubernetesClusterCache) GetAllServices() []*cc.Service {
  154. var services []*cc.Service
  155. items := kcc.serviceWatch.GetAll()
  156. for _, service := range items {
  157. services = append(services, cc.TransformService(service.(*v1.Service)))
  158. }
  159. return services
  160. }
  161. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*cc.DaemonSet {
  162. var daemonsets []*cc.DaemonSet
  163. items := kcc.daemonsetsWatch.GetAll()
  164. for _, daemonset := range items {
  165. daemonsets = append(daemonsets, cc.TransformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  166. }
  167. return daemonsets
  168. }
  169. func (kcc *KubernetesClusterCache) GetAllDeployments() []*cc.Deployment {
  170. var deployments []*cc.Deployment
  171. items := kcc.deploymentsWatch.GetAll()
  172. for _, deployment := range items {
  173. deployments = append(deployments, cc.TransformDeployment(deployment.(*appsv1.Deployment)))
  174. }
  175. return deployments
  176. }
  177. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*cc.StatefulSet {
  178. var statefulsets []*cc.StatefulSet
  179. items := kcc.statefulsetWatch.GetAll()
  180. for _, statefulset := range items {
  181. statefulsets = append(statefulsets, cc.TransformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  182. }
  183. return statefulsets
  184. }
  185. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*cc.ReplicaSet {
  186. var replicasets []*cc.ReplicaSet
  187. items := kcc.replicasetWatch.GetAll()
  188. for _, replicaset := range items {
  189. replicasets = append(replicasets, cc.TransformReplicaSet(replicaset.(*appsv1.ReplicaSet)))
  190. }
  191. return replicasets
  192. }
  193. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*cc.PersistentVolume {
  194. var pvs []*cc.PersistentVolume
  195. items := kcc.pvWatch.GetAll()
  196. for _, pv := range items {
  197. pvs = append(pvs, cc.TransformPersistentVolume(pv.(*v1.PersistentVolume)))
  198. }
  199. return pvs
  200. }
  201. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*cc.PersistentVolumeClaim {
  202. var pvcs []*cc.PersistentVolumeClaim
  203. items := kcc.pvcWatch.GetAll()
  204. for _, pvc := range items {
  205. pvcs = append(pvcs, cc.TransformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  206. }
  207. return pvcs
  208. }
  209. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*cc.StorageClass {
  210. var storageClasses []*cc.StorageClass
  211. items := kcc.storageClassWatch.GetAll()
  212. for _, stc := range items {
  213. storageClasses = append(storageClasses, cc.TransformStorageClass(stc.(*stv1.StorageClass)))
  214. }
  215. return storageClasses
  216. }
  217. func (kcc *KubernetesClusterCache) GetAllJobs() []*cc.Job {
  218. var jobs []*cc.Job
  219. items := kcc.jobsWatch.GetAll()
  220. for _, job := range items {
  221. jobs = append(jobs, cc.TransformJob(job.(*batchv1.Job)))
  222. }
  223. return jobs
  224. }
  225. func (kcc *KubernetesClusterCache) GetAllCronJobs() []*cc.CronJob {
  226. var cronjobs []*cc.CronJob
  227. items := kcc.cronjobsWatch.GetAll()
  228. for _, cronjob := range items {
  229. cronjobs = append(cronjobs, cc.TransformCronJob(cronjob.(*batchv1.CronJob)))
  230. }
  231. return cronjobs
  232. }
  233. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*cc.PodDisruptionBudget {
  234. var pdbs []*cc.PodDisruptionBudget
  235. items := kcc.pdbWatch.GetAll()
  236. for _, pdb := range items {
  237. pdbs = append(pdbs, cc.TransformPodDisruptionBudget(pdb.(*policyv1.PodDisruptionBudget)))
  238. }
  239. return pdbs
  240. }
  241. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*cc.ReplicationController {
  242. var rcs []*cc.ReplicationController
  243. items := kcc.replicationControllerWatch.GetAll()
  244. for _, rc := range items {
  245. rcs = append(rcs, cc.TransformReplicationController(rc.(*v1.ReplicationController)))
  246. }
  247. return rcs
  248. }
  249. // GetAllResourceQuotas returns all cached resource quotas
  250. func (kcc *KubernetesClusterCache) GetAllResourceQuotas() []*cc.ResourceQuota {
  251. var rqs []*cc.ResourceQuota
  252. items := kcc.resourceQuotasWatch.GetAll()
  253. for _, rq := range items {
  254. rqs = append(rqs, cc.TransformResourceQuota(rq.(*v1.ResourceQuota)))
  255. }
  256. return rqs
  257. }