clustercache.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. "k8s.io/apimachinery/pkg/fields"
  14. "k8s.io/client-go/kubernetes"
  15. )
  16. type Namespace struct {
  17. Name string
  18. Labels map[string]string
  19. Annotations map[string]string
  20. }
  21. func transformNamespace(input *v1.Namespace) *Namespace {
  22. return &Namespace{
  23. Name: input.Name,
  24. Annotations: input.Annotations,
  25. Labels: input.Labels,
  26. }
  27. }
  28. type Pod struct {
  29. UID types.UID
  30. Name string
  31. Namespace string
  32. Labels map[string]string
  33. Annotations map[string]string
  34. OwnerReferences []metav1.OwnerReference
  35. Status PodStatus
  36. Spec PodSpec
  37. }
  38. type PodStatus struct {
  39. Phase v1.PodPhase
  40. ContainerStatuses []v1.ContainerStatus
  41. }
  42. type PodSpec struct {
  43. NodeName string
  44. Containers []Container
  45. Volumes []v1.Volume
  46. }
  47. type Container struct {
  48. Name string
  49. Resources v1.ResourceRequirements
  50. }
  51. func transformPodContainer(input v1.Container) Container {
  52. return Container{
  53. Name: input.Name,
  54. Resources: input.Resources,
  55. }
  56. }
  57. func transformPodStatus(input v1.PodStatus) PodStatus {
  58. return PodStatus{
  59. Phase: input.Phase,
  60. ContainerStatuses: input.ContainerStatuses,
  61. }
  62. }
  63. func transformPodSpec(input v1.PodSpec) PodSpec {
  64. containers := make([]Container, len(input.Containers))
  65. for i, container := range input.Containers {
  66. containers[i] = transformPodContainer(container)
  67. }
  68. return PodSpec{
  69. NodeName: input.NodeName,
  70. Containers: containers,
  71. Volumes: input.Volumes,
  72. }
  73. }
  74. func transformPod(input *v1.Pod) *Pod {
  75. return &Pod{
  76. UID: input.UID,
  77. Name: input.Name,
  78. Namespace: input.Namespace,
  79. Labels: input.Labels,
  80. Annotations: input.Annotations,
  81. OwnerReferences: input.OwnerReferences,
  82. Spec: transformPodSpec(input.Spec),
  83. Status: transformPodStatus(input.Status),
  84. }
  85. }
  86. type Node struct {
  87. Name string
  88. Labels map[string]string
  89. Annotations map[string]string
  90. Status v1.NodeStatus
  91. SpecProviderID string
  92. }
  93. func transformNode(input *v1.Node) *Node {
  94. return &Node{
  95. Name: input.Name,
  96. Labels: input.Labels,
  97. Annotations: input.Annotations,
  98. Status: input.Status,
  99. SpecProviderID: input.Spec.ProviderID,
  100. }
  101. }
  102. type Service struct {
  103. Name string
  104. Namespace string
  105. Selector map[string]string
  106. Type v1.ServiceType
  107. Status v1.ServiceStatus
  108. }
  109. func transformService(input *v1.Service) *Service {
  110. return &Service{
  111. Name: input.Name,
  112. Namespace: input.Namespace,
  113. Selector: input.Spec.Selector,
  114. Type: input.Spec.Type,
  115. Status: input.Status,
  116. }
  117. }
  118. type DaemonSet struct {
  119. Name string
  120. Namespace string
  121. Labels map[string]string
  122. SpecContainers []v1.Container
  123. }
  124. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  125. return &DaemonSet{
  126. Name: input.Name,
  127. Namespace: input.Namespace,
  128. Labels: input.Labels,
  129. SpecContainers: input.Spec.Template.Spec.Containers,
  130. }
  131. }
  132. type Deployment struct {
  133. Name string
  134. Namespace string
  135. Labels map[string]string
  136. MatchLabels map[string]string
  137. SpecSelector *metav1.LabelSelector
  138. SpecReplicas *int32
  139. StatusAvailableReplicas int32
  140. }
  141. func transformDeployment(input *appsv1.Deployment) *Deployment {
  142. return &Deployment{
  143. Name: input.Name,
  144. Namespace: input.Namespace,
  145. Labels: input.Labels,
  146. MatchLabels: input.Spec.Selector.MatchLabels,
  147. SpecReplicas: input.Spec.Replicas,
  148. SpecSelector: input.Spec.Selector,
  149. StatusAvailableReplicas: input.Status.AvailableReplicas,
  150. }
  151. }
  152. type StatefulSet struct {
  153. Name string
  154. Namespace string
  155. SpecSelector *metav1.LabelSelector
  156. }
  157. func transformStatefulSet(input *appsv1.StatefulSet) *StatefulSet {
  158. return &StatefulSet{
  159. Name: input.Name,
  160. Namespace: input.Namespace,
  161. SpecSelector: input.Spec.Selector,
  162. }
  163. }
  164. type PersistentVolume struct {
  165. Name string
  166. Namespace string
  167. Labels map[string]string
  168. Annotations map[string]string
  169. Spec v1.PersistentVolumeSpec
  170. Status v1.PersistentVolumeStatus
  171. }
  172. func transformPersistentVolume(input *v1.PersistentVolume) *PersistentVolume {
  173. return &PersistentVolume{
  174. Name: input.Name,
  175. Namespace: input.Namespace,
  176. Labels: input.Labels,
  177. Annotations: input.Annotations,
  178. Spec: input.Spec,
  179. Status: input.Status,
  180. }
  181. }
  182. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  183. // up to date resources using watchers
  184. type ClusterCache interface {
  185. // Run starts the watcher processes
  186. Run()
  187. // Stops the watcher processes
  188. Stop()
  189. // GetAllNamespaces returns all the cached namespaces
  190. GetAllNamespaces() []*Namespace
  191. // GetAllNodes returns all the cached nodes
  192. GetAllNodes() []*Node
  193. // GetAllPods returns all the cached pods
  194. GetAllPods() []*Pod
  195. // GetAllServices returns all the cached services
  196. GetAllServices() []*Service
  197. // GetAllDaemonSets returns all the cached DaemonSets
  198. GetAllDaemonSets() []*DaemonSet
  199. // GetAllDeployments returns all the cached deployments
  200. GetAllDeployments() []*Deployment
  201. // GetAllStatfulSets returns all the cached StatefulSets
  202. GetAllStatefulSets() []*StatefulSet
  203. // GetAllPersistentVolumes returns all the cached persistent volumes
  204. GetAllPersistentVolumes() []*PersistentVolume
  205. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  206. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  207. // GetAllStorageClasses returns all the cached storage classes
  208. GetAllStorageClasses() []*stv1.StorageClass
  209. // GetAllJobs returns all the cached jobs
  210. GetAllJobs() []*batchv1.Job
  211. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  212. GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget
  213. // SetConfigMapUpdateFunc sets the configmap update function
  214. SetConfigMapUpdateFunc(func(interface{}))
  215. }
  216. // KubernetesClusterCache is the implementation of ClusterCache
  217. type KubernetesClusterCache struct {
  218. client kubernetes.Interface
  219. namespaceWatch WatchController
  220. nodeWatch WatchController
  221. podWatch WatchController
  222. kubecostConfigMapWatch WatchController
  223. serviceWatch WatchController
  224. daemonsetsWatch WatchController
  225. deploymentsWatch WatchController
  226. statefulsetWatch WatchController
  227. pvWatch WatchController
  228. pvcWatch WatchController
  229. storageClassWatch WatchController
  230. jobsWatch WatchController
  231. pdbWatch WatchController
  232. stop chan struct{}
  233. }
  234. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  235. defer wg.Done()
  236. wc.WarmUp(cancel)
  237. }
  238. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  239. coreRestClient := client.CoreV1().RESTClient()
  240. appsRestClient := client.AppsV1().RESTClient()
  241. storageRestClient := client.StorageV1().RESTClient()
  242. batchClient := client.BatchV1().RESTClient()
  243. pdbClient := client.PolicyV1().RESTClient()
  244. kubecostNamespace := env.GetKubecostNamespace()
  245. log.Infof("NAMESPACE: %s", kubecostNamespace)
  246. kcc := &KubernetesClusterCache{
  247. client: client,
  248. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  249. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  250. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  251. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  252. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  253. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  254. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  255. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  256. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  257. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  258. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  259. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  260. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  261. }
  262. // Wait for each caching watcher to initialize
  263. cancel := make(chan struct{})
  264. var wg sync.WaitGroup
  265. if env.IsETLReadOnlyMode() {
  266. wg.Add(1)
  267. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  268. } else {
  269. wg.Add(13)
  270. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  271. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  272. go initializeCache(kcc.nodeWatch, &wg, cancel)
  273. go initializeCache(kcc.podWatch, &wg, cancel)
  274. go initializeCache(kcc.serviceWatch, &wg, cancel)
  275. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  276. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  277. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  278. go initializeCache(kcc.pvWatch, &wg, cancel)
  279. go initializeCache(kcc.pvcWatch, &wg, cancel)
  280. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  281. go initializeCache(kcc.jobsWatch, &wg, cancel)
  282. go initializeCache(kcc.pdbWatch, &wg, cancel)
  283. }
  284. wg.Wait()
  285. log.Infof("Done waiting")
  286. return kcc
  287. }
  288. func (kcc *KubernetesClusterCache) Run() {
  289. if kcc.stop != nil {
  290. return
  291. }
  292. stopCh := make(chan struct{})
  293. go kcc.namespaceWatch.Run(1, stopCh)
  294. go kcc.nodeWatch.Run(1, stopCh)
  295. go kcc.podWatch.Run(1, stopCh)
  296. go kcc.serviceWatch.Run(1, stopCh)
  297. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  298. go kcc.daemonsetsWatch.Run(1, stopCh)
  299. go kcc.deploymentsWatch.Run(1, stopCh)
  300. go kcc.statefulsetWatch.Run(1, stopCh)
  301. go kcc.pvWatch.Run(1, stopCh)
  302. go kcc.pvcWatch.Run(1, stopCh)
  303. go kcc.storageClassWatch.Run(1, stopCh)
  304. go kcc.jobsWatch.Run(1, stopCh)
  305. go kcc.pdbWatch.Run(1, stopCh)
  306. kcc.stop = stopCh
  307. }
  308. func (kcc *KubernetesClusterCache) Stop() {
  309. if kcc.stop == nil {
  310. return
  311. }
  312. close(kcc.stop)
  313. kcc.stop = nil
  314. }
  315. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  316. var namespaces []*Namespace
  317. items := kcc.namespaceWatch.GetAll()
  318. for _, ns := range items {
  319. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  320. }
  321. return namespaces
  322. }
  323. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  324. var nodes []*Node
  325. items := kcc.nodeWatch.GetAll()
  326. for _, node := range items {
  327. nodes = append(nodes, transformNode(node.(*v1.Node)))
  328. }
  329. return nodes
  330. }
  331. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  332. var pods []*Pod
  333. items := kcc.podWatch.GetAll()
  334. for _, pod := range items {
  335. pods = append(pods, transformPod(pod.(*v1.Pod)))
  336. }
  337. return pods
  338. }
  339. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  340. var services []*Service
  341. items := kcc.serviceWatch.GetAll()
  342. for _, service := range items {
  343. services = append(services, transformService(service.(*v1.Service)))
  344. }
  345. return services
  346. }
  347. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  348. var daemonsets []*DaemonSet
  349. items := kcc.daemonsetsWatch.GetAll()
  350. for _, daemonset := range items {
  351. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  352. }
  353. return daemonsets
  354. }
  355. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  356. var deployments []*Deployment
  357. items := kcc.deploymentsWatch.GetAll()
  358. for _, deployment := range items {
  359. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  360. }
  361. return deployments
  362. }
  363. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
  364. var statefulsets []*StatefulSet
  365. items := kcc.statefulsetWatch.GetAll()
  366. for _, statefulset := range items {
  367. statefulsets = append(statefulsets, transformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  368. }
  369. return statefulsets
  370. }
  371. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
  372. var pvs []*PersistentVolume
  373. items := kcc.pvWatch.GetAll()
  374. for _, pv := range items {
  375. pvs = append(pvs, transformPersistentVolume(pv.(*v1.PersistentVolume)))
  376. }
  377. return pvs
  378. }
  379. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  380. var pvcs []*v1.PersistentVolumeClaim
  381. items := kcc.pvcWatch.GetAll()
  382. for _, pvc := range items {
  383. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  384. }
  385. return pvcs
  386. }
  387. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  388. var storageClasses []*stv1.StorageClass
  389. items := kcc.storageClassWatch.GetAll()
  390. for _, stc := range items {
  391. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  392. }
  393. return storageClasses
  394. }
  395. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  396. var jobs []*batchv1.Job
  397. items := kcc.jobsWatch.GetAll()
  398. for _, job := range items {
  399. jobs = append(jobs, job.(*batchv1.Job))
  400. }
  401. return jobs
  402. }
  403. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
  404. var pdbs []*policyv1.PodDisruptionBudget
  405. items := kcc.pdbWatch.GetAll()
  406. for _, pdb := range items {
  407. pdbs = append(pdbs, pdb.(*policyv1.PodDisruptionBudget))
  408. }
  409. return pdbs
  410. }
  411. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  412. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  413. }