clustercache.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/env"
  5. "k8s.io/klog"
  6. appsv1 "k8s.io/api/apps/v1"
  7. autoscaling "k8s.io/api/autoscaling/v2beta1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/api/policy/v1beta1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  16. // up to date resources using watchers
  17. type ClusterCache interface {
  18. // Run starts the watcher processes
  19. Run()
  20. // Stops the watcher processes
  21. Stop()
  22. // GetAllNamespaces returns all the cached namespaces
  23. GetAllNamespaces() []*v1.Namespace
  24. // GetAllNodes returns all the cached nodes
  25. GetAllNodes() []*v1.Node
  26. // GetAllPods returns all the cached pods
  27. GetAllPods() []*v1.Pod
  28. // GetAllServices returns all the cached services
  29. GetAllServices() []*v1.Service
  30. // GetAllDaemonSets returns all the cached DaemonSets
  31. GetAllDaemonSets() []*appsv1.DaemonSet
  32. // GetAllDeployments returns all the cached deployments
  33. GetAllDeployments() []*appsv1.Deployment
  34. // GetAllStatfulSets returns all the cached StatefulSets
  35. GetAllStatefulSets() []*appsv1.StatefulSet
  36. // GetAllReplicaSets returns all the cached ReplicaSets
  37. GetAllReplicaSets() []*appsv1.ReplicaSet
  38. // GetAllPersistentVolumes returns all the cached persistent volumes
  39. GetAllPersistentVolumes() []*v1.PersistentVolume
  40. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  41. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  42. // GetAllStorageClasses returns all the cached storage classes
  43. GetAllStorageClasses() []*stv1.StorageClass
  44. // GetAllJobs returns all the cached jobs
  45. GetAllJobs() []*batchv1.Job
  46. // GetAllHorizontalPodAutoscalers returns all cached horizontal pod autoscalers
  47. GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler
  48. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  49. GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget
  50. // GetAllReplicationControllers returns all cached replication controllers
  51. GetAllReplicationControllers() []*v1.ReplicationController
  52. // SetConfigMapUpdateFunc sets the configmap update function
  53. SetConfigMapUpdateFunc(func(interface{}))
  54. }
  55. // KubernetesClusterCache is the implementation of ClusterCache
  56. type KubernetesClusterCache struct {
  57. client kubernetes.Interface
  58. namespaceWatch WatchController
  59. nodeWatch WatchController
  60. podWatch WatchController
  61. kubecostConfigMapWatch WatchController
  62. serviceWatch WatchController
  63. daemonsetsWatch WatchController
  64. deploymentsWatch WatchController
  65. statefulsetWatch WatchController
  66. replicasetWatch WatchController
  67. pvWatch WatchController
  68. pvcWatch WatchController
  69. storageClassWatch WatchController
  70. jobsWatch WatchController
  71. hpaWatch WatchController
  72. pdbWatch WatchController
  73. stop chan struct{}
  74. }
  75. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  76. defer wg.Done()
  77. wc.WarmUp(cancel)
  78. }
  79. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  80. coreRestClient := client.CoreV1().RESTClient()
  81. appsRestClient := client.AppsV1().RESTClient()
  82. storageRestClient := client.StorageV1().RESTClient()
  83. batchClient := client.BatchV1().RESTClient()
  84. autoscalingClient := client.AutoscalingV2beta1().RESTClient()
  85. pdbClient := client.PolicyV1beta1().RESTClient()
  86. kubecostNamespace := env.GetKubecostNamespace()
  87. klog.Infof("NAMESPACE: %s", kubecostNamespace)
  88. kcc := &KubernetesClusterCache{
  89. client: client,
  90. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  91. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  92. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  93. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  94. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  95. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  96. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  97. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  98. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  99. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  100. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  101. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  102. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  103. hpaWatch: NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
  104. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &v1beta1.PodDisruptionBudget{}, "", fields.Everything()),
  105. }
  106. // Wait for each caching watcher to initialize
  107. var wg sync.WaitGroup
  108. wg.Add(15)
  109. cancel := make(chan struct{})
  110. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  111. go initializeCache(kcc.nodeWatch, &wg, cancel)
  112. go initializeCache(kcc.podWatch, &wg, cancel)
  113. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  114. go initializeCache(kcc.serviceWatch, &wg, cancel)
  115. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  116. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  117. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  118. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  119. go initializeCache(kcc.pvWatch, &wg, cancel)
  120. go initializeCache(kcc.pvcWatch, &wg, cancel)
  121. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  122. go initializeCache(kcc.jobsWatch, &wg, cancel)
  123. go initializeCache(kcc.hpaWatch, &wg, cancel)
  124. go initializeCache(kcc.podWatch, &wg, cancel)
  125. wg.Wait()
  126. return kcc
  127. }
  128. func (kcc *KubernetesClusterCache) Run() {
  129. if kcc.stop != nil {
  130. return
  131. }
  132. stopCh := make(chan struct{})
  133. go kcc.namespaceWatch.Run(1, stopCh)
  134. go kcc.nodeWatch.Run(1, stopCh)
  135. go kcc.podWatch.Run(1, stopCh)
  136. go kcc.serviceWatch.Run(1, stopCh)
  137. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  138. go kcc.daemonsetsWatch.Run(1, stopCh)
  139. go kcc.deploymentsWatch.Run(1, stopCh)
  140. go kcc.statefulsetWatch.Run(1, stopCh)
  141. go kcc.replicasetWatch.Run(1, stopCh)
  142. go kcc.pvWatch.Run(1, stopCh)
  143. go kcc.pvcWatch.Run(1, stopCh)
  144. go kcc.storageClassWatch.Run(1, stopCh)
  145. go kcc.jobsWatch.Run(1, stopCh)
  146. go kcc.hpaWatch.Run(1, stopCh)
  147. go kcc.pdbWatch.Run(1, stopCh)
  148. kcc.stop = stopCh
  149. }
  150. func (kcc *KubernetesClusterCache) Stop() {
  151. if kcc.stop == nil {
  152. return
  153. }
  154. close(kcc.stop)
  155. kcc.stop = nil
  156. }
  157. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
  158. var namespaces []*v1.Namespace
  159. items := kcc.namespaceWatch.GetAll()
  160. for _, ns := range items {
  161. namespaces = append(namespaces, ns.(*v1.Namespace))
  162. }
  163. return namespaces
  164. }
  165. func (kcc *KubernetesClusterCache) GetAllNodes() []*v1.Node {
  166. var nodes []*v1.Node
  167. items := kcc.nodeWatch.GetAll()
  168. for _, node := range items {
  169. nodes = append(nodes, node.(*v1.Node))
  170. }
  171. return nodes
  172. }
  173. func (kcc *KubernetesClusterCache) GetAllPods() []*v1.Pod {
  174. var pods []*v1.Pod
  175. items := kcc.podWatch.GetAll()
  176. for _, pod := range items {
  177. pods = append(pods, pod.(*v1.Pod))
  178. }
  179. return pods
  180. }
  181. func (kcc *KubernetesClusterCache) GetAllServices() []*v1.Service {
  182. var services []*v1.Service
  183. items := kcc.serviceWatch.GetAll()
  184. for _, service := range items {
  185. services = append(services, service.(*v1.Service))
  186. }
  187. return services
  188. }
  189. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  190. var daemonsets []*appsv1.DaemonSet
  191. items := kcc.daemonsetsWatch.GetAll()
  192. for _, daemonset := range items {
  193. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  194. }
  195. return daemonsets
  196. }
  197. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  198. var deployments []*appsv1.Deployment
  199. items := kcc.deploymentsWatch.GetAll()
  200. for _, deployment := range items {
  201. deployments = append(deployments, deployment.(*appsv1.Deployment))
  202. }
  203. return deployments
  204. }
  205. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  206. var statefulsets []*appsv1.StatefulSet
  207. items := kcc.statefulsetWatch.GetAll()
  208. for _, statefulset := range items {
  209. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  210. }
  211. return statefulsets
  212. }
  213. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  214. var replicasets []*appsv1.ReplicaSet
  215. items := kcc.replicasetWatch.GetAll()
  216. for _, replicaset := range items {
  217. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  218. }
  219. return replicasets
  220. }
  221. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  222. var pvs []*v1.PersistentVolume
  223. items := kcc.pvWatch.GetAll()
  224. for _, pv := range items {
  225. pvs = append(pvs, pv.(*v1.PersistentVolume))
  226. }
  227. return pvs
  228. }
  229. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  230. var pvcs []*v1.PersistentVolumeClaim
  231. items := kcc.pvcWatch.GetAll()
  232. for _, pvc := range items {
  233. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  234. }
  235. return pvcs
  236. }
  237. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  238. var storageClasses []*stv1.StorageClass
  239. items := kcc.storageClassWatch.GetAll()
  240. for _, stc := range items {
  241. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  242. }
  243. return storageClasses
  244. }
  245. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  246. var jobs []*batchv1.Job
  247. items := kcc.jobsWatch.GetAll()
  248. for _, job := range items {
  249. jobs = append(jobs, job.(*batchv1.Job))
  250. }
  251. return jobs
  252. }
  253. func (kcc *KubernetesClusterCache) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
  254. var hpas []*autoscaling.HorizontalPodAutoscaler
  255. items := kcc.hpaWatch.GetAll()
  256. for _, hpa := range items {
  257. hpas = append(hpas, hpa.(*autoscaling.HorizontalPodAutoscaler))
  258. }
  259. return hpas
  260. }
  261. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
  262. var pdbs []*v1beta1.PodDisruptionBudget
  263. items := kcc.pdbWatch.GetAll()
  264. for _, pdb := range items {
  265. pdbs = append(pdbs, pdb.(*v1beta1.PodDisruptionBudget))
  266. }
  267. return pdbs
  268. }
  269. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
  270. var rcs []*v1.ReplicationController
  271. //TODO: Implement
  272. return rcs
  273. }
  274. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  275. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  276. }