clustercache.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. "k8s.io/apimachinery/pkg/fields"
  14. "k8s.io/client-go/kubernetes"
  15. )
  16. type Namespace struct {
  17. Name string
  18. Labels map[string]string
  19. Annotations map[string]string
  20. }
  21. func transformNamespace(input *v1.Namespace) *Namespace {
  22. return &Namespace{
  23. Name: input.Name,
  24. Annotations: input.Annotations,
  25. Labels: input.Labels,
  26. }
  27. }
  28. type Pod struct {
  29. UID types.UID
  30. Name string
  31. Namespace string
  32. Labels map[string]string
  33. Annotations map[string]string
  34. OwnerReferences []metav1.OwnerReference
  35. Status PodStatus
  36. Spec PodSpec
  37. }
  38. type PodStatus struct {
  39. Phase v1.PodPhase
  40. ContainerStatuses []v1.ContainerStatus
  41. }
  42. type PodSpec struct {
  43. NodeName string
  44. Containers []Container
  45. Volumes []v1.Volume
  46. }
  47. type Container struct {
  48. Name string
  49. Resources v1.ResourceRequirements
  50. }
  51. func transformPodContainer(input v1.Container) Container {
  52. return Container{
  53. Name: input.Name,
  54. Resources: input.Resources,
  55. }
  56. }
  57. func transformPodStatus(input v1.PodStatus) PodStatus {
  58. return PodStatus{
  59. Phase: input.Phase,
  60. ContainerStatuses: input.ContainerStatuses,
  61. }
  62. }
  63. func transformPodSpec(input v1.PodSpec) PodSpec {
  64. containers := make([]Container, len(input.Containers))
  65. for i, container := range input.Containers {
  66. containers[i] = transformPodContainer(container)
  67. }
  68. return PodSpec{
  69. NodeName: input.NodeName,
  70. Containers: containers,
  71. Volumes: input.Volumes,
  72. }
  73. }
  74. func transformPod(input *v1.Pod) *Pod {
  75. return &Pod{
  76. UID: input.UID,
  77. Name: input.Name,
  78. Namespace: input.Namespace,
  79. Labels: input.Labels,
  80. Annotations: input.Annotations,
  81. OwnerReferences: input.OwnerReferences,
  82. Spec: transformPodSpec(input.Spec),
  83. Status: transformPodStatus(input.Status),
  84. }
  85. }
  86. type Node struct {
  87. Name string
  88. Labels map[string]string
  89. Annotations map[string]string
  90. Status v1.NodeStatus
  91. SpecProviderID string
  92. }
  93. func transformNode(input *v1.Node) *Node {
  94. return &Node{
  95. Name: input.Name,
  96. Labels: input.Labels,
  97. Annotations: input.Annotations,
  98. Status: input.Status,
  99. SpecProviderID: input.Spec.ProviderID,
  100. }
  101. }
  102. type Service struct {
  103. Name string
  104. Namespace string
  105. Selector map[string]string
  106. Type v1.ServiceType
  107. Status v1.ServiceStatus
  108. }
  109. func transformService(input *v1.Service) *Service {
  110. return &Service{
  111. Name: input.Name,
  112. Namespace: input.Namespace,
  113. Selector: input.Spec.Selector,
  114. Type: input.Spec.Type,
  115. Status: input.Status,
  116. }
  117. }
  118. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  119. // up to date resources using watchers
  120. type ClusterCache interface {
  121. // Run starts the watcher processes
  122. Run()
  123. // Stops the watcher processes
  124. Stop()
  125. // GetAllNamespaces returns all the cached namespaces
  126. GetAllNamespaces() []*Namespace
  127. // GetAllNodes returns all the cached nodes
  128. GetAllNodes() []*Node
  129. // GetAllPods returns all the cached pods
  130. GetAllPods() []*Pod
  131. // GetAllServices returns all the cached services
  132. GetAllServices() []*Service
  133. // GetAllDaemonSets returns all the cached DaemonSets
  134. GetAllDaemonSets() []*appsv1.DaemonSet
  135. // GetAllDeployments returns all the cached deployments
  136. GetAllDeployments() []*appsv1.Deployment
  137. // GetAllStatfulSets returns all the cached StatefulSets
  138. GetAllStatefulSets() []*appsv1.StatefulSet
  139. // GetAllReplicaSets returns all the cached ReplicaSets
  140. GetAllReplicaSets() []*appsv1.ReplicaSet
  141. // GetAllPersistentVolumes returns all the cached persistent volumes
  142. GetAllPersistentVolumes() []*v1.PersistentVolume
  143. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  144. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  145. // GetAllStorageClasses returns all the cached storage classes
  146. GetAllStorageClasses() []*stv1.StorageClass
  147. // GetAllJobs returns all the cached jobs
  148. GetAllJobs() []*batchv1.Job
  149. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  150. GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget
  151. // GetAllReplicationControllers returns all cached replication controllers
  152. GetAllReplicationControllers() []*v1.ReplicationController
  153. // SetConfigMapUpdateFunc sets the configmap update function
  154. SetConfigMapUpdateFunc(func(interface{}))
  155. }
  156. // KubernetesClusterCache is the implementation of ClusterCache
  157. type KubernetesClusterCache struct {
  158. client kubernetes.Interface
  159. namespaceWatch WatchController
  160. nodeWatch WatchController
  161. podWatch WatchController
  162. kubecostConfigMapWatch WatchController
  163. serviceWatch WatchController
  164. daemonsetsWatch WatchController
  165. deploymentsWatch WatchController
  166. statefulsetWatch WatchController
  167. replicasetWatch WatchController
  168. pvWatch WatchController
  169. pvcWatch WatchController
  170. storageClassWatch WatchController
  171. jobsWatch WatchController
  172. pdbWatch WatchController
  173. replicationControllerWatch WatchController
  174. stop chan struct{}
  175. }
  176. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  177. defer wg.Done()
  178. wc.WarmUp(cancel)
  179. }
  180. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  181. coreRestClient := client.CoreV1().RESTClient()
  182. appsRestClient := client.AppsV1().RESTClient()
  183. storageRestClient := client.StorageV1().RESTClient()
  184. batchClient := client.BatchV1().RESTClient()
  185. pdbClient := client.PolicyV1().RESTClient()
  186. kubecostNamespace := env.GetKubecostNamespace()
  187. log.Infof("NAMESPACE: %s", kubecostNamespace)
  188. kcc := &KubernetesClusterCache{
  189. client: client,
  190. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  191. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  192. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  193. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  194. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  195. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  196. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  197. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  198. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  199. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  200. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  201. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  202. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  203. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  204. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  205. }
  206. // Wait for each caching watcher to initialize
  207. cancel := make(chan struct{})
  208. var wg sync.WaitGroup
  209. if env.IsETLReadOnlyMode() {
  210. wg.Add(1)
  211. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  212. } else {
  213. wg.Add(15)
  214. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  215. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  216. go initializeCache(kcc.nodeWatch, &wg, cancel)
  217. go initializeCache(kcc.podWatch, &wg, cancel)
  218. go initializeCache(kcc.serviceWatch, &wg, cancel)
  219. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  220. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  221. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  222. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  223. go initializeCache(kcc.pvWatch, &wg, cancel)
  224. go initializeCache(kcc.pvcWatch, &wg, cancel)
  225. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  226. go initializeCache(kcc.jobsWatch, &wg, cancel)
  227. go initializeCache(kcc.pdbWatch, &wg, cancel)
  228. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  229. }
  230. wg.Wait()
  231. log.Infof("Done waiting")
  232. return kcc
  233. }
  234. func (kcc *KubernetesClusterCache) Run() {
  235. if kcc.stop != nil {
  236. return
  237. }
  238. stopCh := make(chan struct{})
  239. go kcc.namespaceWatch.Run(1, stopCh)
  240. go kcc.nodeWatch.Run(1, stopCh)
  241. go kcc.podWatch.Run(1, stopCh)
  242. go kcc.serviceWatch.Run(1, stopCh)
  243. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  244. go kcc.daemonsetsWatch.Run(1, stopCh)
  245. go kcc.deploymentsWatch.Run(1, stopCh)
  246. go kcc.statefulsetWatch.Run(1, stopCh)
  247. go kcc.replicasetWatch.Run(1, stopCh)
  248. go kcc.pvWatch.Run(1, stopCh)
  249. go kcc.pvcWatch.Run(1, stopCh)
  250. go kcc.storageClassWatch.Run(1, stopCh)
  251. go kcc.jobsWatch.Run(1, stopCh)
  252. go kcc.pdbWatch.Run(1, stopCh)
  253. go kcc.replicationControllerWatch.Run(1, stopCh)
  254. kcc.stop = stopCh
  255. }
  256. func (kcc *KubernetesClusterCache) Stop() {
  257. if kcc.stop == nil {
  258. return
  259. }
  260. close(kcc.stop)
  261. kcc.stop = nil
  262. }
  263. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  264. var namespaces []*Namespace
  265. items := kcc.namespaceWatch.GetAll()
  266. for _, ns := range items {
  267. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  268. }
  269. return namespaces
  270. }
  271. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  272. var nodes []*Node
  273. items := kcc.nodeWatch.GetAll()
  274. for _, node := range items {
  275. nodes = append(nodes, transformNode(node.(*v1.Node)))
  276. }
  277. return nodes
  278. }
  279. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  280. var pods []*Pod
  281. items := kcc.podWatch.GetAll()
  282. for _, pod := range items {
  283. pods = append(pods, transformPod(pod.(*v1.Pod)))
  284. }
  285. return pods
  286. }
  287. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  288. var services []*Service
  289. items := kcc.serviceWatch.GetAll()
  290. for _, service := range items {
  291. services = append(services, transformService(service.(*v1.Service)))
  292. }
  293. return services
  294. }
  295. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*appsv1.DaemonSet {
  296. var daemonsets []*appsv1.DaemonSet
  297. items := kcc.daemonsetsWatch.GetAll()
  298. for _, daemonset := range items {
  299. daemonsets = append(daemonsets, daemonset.(*appsv1.DaemonSet))
  300. }
  301. return daemonsets
  302. }
  303. func (kcc *KubernetesClusterCache) GetAllDeployments() []*appsv1.Deployment {
  304. var deployments []*appsv1.Deployment
  305. items := kcc.deploymentsWatch.GetAll()
  306. for _, deployment := range items {
  307. deployments = append(deployments, deployment.(*appsv1.Deployment))
  308. }
  309. return deployments
  310. }
  311. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  312. var statefulsets []*appsv1.StatefulSet
  313. items := kcc.statefulsetWatch.GetAll()
  314. for _, statefulset := range items {
  315. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  316. }
  317. return statefulsets
  318. }
  319. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  320. var replicasets []*appsv1.ReplicaSet
  321. items := kcc.replicasetWatch.GetAll()
  322. for _, replicaset := range items {
  323. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  324. }
  325. return replicasets
  326. }
  327. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  328. var pvs []*v1.PersistentVolume
  329. items := kcc.pvWatch.GetAll()
  330. for _, pv := range items {
  331. pvs = append(pvs, pv.(*v1.PersistentVolume))
  332. }
  333. return pvs
  334. }
  335. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  336. var pvcs []*v1.PersistentVolumeClaim
  337. items := kcc.pvcWatch.GetAll()
  338. for _, pvc := range items {
  339. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  340. }
  341. return pvcs
  342. }
  343. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  344. var storageClasses []*stv1.StorageClass
  345. items := kcc.storageClassWatch.GetAll()
  346. for _, stc := range items {
  347. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  348. }
  349. return storageClasses
  350. }
  351. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  352. var jobs []*batchv1.Job
  353. items := kcc.jobsWatch.GetAll()
  354. for _, job := range items {
  355. jobs = append(jobs, job.(*batchv1.Job))
  356. }
  357. return jobs
  358. }
  359. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
  360. var pdbs []*policyv1.PodDisruptionBudget
  361. items := kcc.pdbWatch.GetAll()
  362. for _, pdb := range items {
  363. pdbs = append(pdbs, pdb.(*policyv1.PodDisruptionBudget))
  364. }
  365. return pdbs
  366. }
  367. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
  368. var rcs []*v1.ReplicationController
  369. items := kcc.replicationControllerWatch.GetAll()
  370. for _, rc := range items {
  371. rcs = append(rcs, rc.(*v1.ReplicationController))
  372. }
  373. return rcs
  374. }
  375. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  376. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  377. }