clustercache.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. "k8s.io/apimachinery/pkg/fields"
  14. "k8s.io/client-go/kubernetes"
  15. )
  16. type Namespace struct {
  17. Name string
  18. Labels map[string]string
  19. Annotations map[string]string
  20. }
  21. type Pod struct {
  22. UID types.UID
  23. Name string
  24. Namespace string
  25. Labels map[string]string
  26. Annotations map[string]string
  27. OwnerReferences []metav1.OwnerReference
  28. Status PodStatus
  29. Spec PodSpec
  30. }
  31. type PodStatus struct {
  32. Phase v1.PodPhase
  33. ContainerStatuses []v1.ContainerStatus
  34. }
  35. type PodSpec struct {
  36. NodeName string
  37. Containers []Container
  38. Volumes []v1.Volume
  39. }
  40. type Container struct {
  41. Name string
  42. Resources v1.ResourceRequirements
  43. }
  44. type Node struct {
  45. Name string
  46. Labels map[string]string
  47. Annotations map[string]string
  48. Status v1.NodeStatus
  49. SpecProviderID string
  50. }
  51. type Service struct {
  52. Name string
  53. Namespace string
  54. SpecSelector map[string]string
  55. Type v1.ServiceType
  56. Status v1.ServiceStatus
  57. }
  58. type DaemonSet struct {
  59. Name string
  60. Namespace string
  61. Labels map[string]string
  62. SpecContainers []v1.Container
  63. }
  64. type Deployment struct {
  65. Name string
  66. Namespace string
  67. Labels map[string]string
  68. MatchLabels map[string]string
  69. SpecSelector *metav1.LabelSelector
  70. SpecReplicas *int32
  71. StatusAvailableReplicas int32
  72. }
  73. type StatefulSet struct {
  74. Name string
  75. Namespace string
  76. SpecSelector *metav1.LabelSelector
  77. }
  78. type PersistentVolumeClaim struct {
  79. Name string
  80. Namespace string
  81. Spec v1.PersistentVolumeClaimSpec
  82. Annotations map[string]string
  83. }
  84. type StorageClass struct {
  85. Name string
  86. Annotations map[string]string
  87. Parameters map[string]string
  88. Provisioner string
  89. }
  90. type Job struct {
  91. Name string
  92. Namespace string
  93. Status batchv1.JobStatus
  94. }
  95. type PersistentVolume struct {
  96. Name string
  97. Namespace string
  98. Labels map[string]string
  99. Annotations map[string]string
  100. Spec v1.PersistentVolumeSpec
  101. Status v1.PersistentVolumeStatus
  102. }
  103. type ReplicationController struct{}
  104. type PodDisruptionBudget struct{}
  105. type ReplicaSet struct {
  106. SpecSelector *metav1.LabelSelector
  107. }
  108. func transformNamespace(input *v1.Namespace) *Namespace {
  109. return &Namespace{
  110. Name: input.Name,
  111. Annotations: input.Annotations,
  112. Labels: input.Labels,
  113. }
  114. }
  115. func transformPodContainer(input v1.Container) Container {
  116. return Container{
  117. Name: input.Name,
  118. Resources: input.Resources,
  119. }
  120. }
  121. func transformPodStatus(input v1.PodStatus) PodStatus {
  122. return PodStatus{
  123. Phase: input.Phase,
  124. ContainerStatuses: input.ContainerStatuses,
  125. }
  126. }
  127. func transformPodSpec(input v1.PodSpec) PodSpec {
  128. containers := make([]Container, len(input.Containers))
  129. for i, container := range input.Containers {
  130. containers[i] = transformPodContainer(container)
  131. }
  132. return PodSpec{
  133. NodeName: input.NodeName,
  134. Containers: containers,
  135. Volumes: input.Volumes,
  136. }
  137. }
  138. func transformPod(input *v1.Pod) *Pod {
  139. return &Pod{
  140. UID: input.UID,
  141. Name: input.Name,
  142. Namespace: input.Namespace,
  143. Labels: input.Labels,
  144. Annotations: input.Annotations,
  145. OwnerReferences: input.OwnerReferences,
  146. Spec: transformPodSpec(input.Spec),
  147. Status: transformPodStatus(input.Status),
  148. }
  149. }
  150. func transformNode(input *v1.Node) *Node {
  151. return &Node{
  152. Name: input.Name,
  153. Labels: input.Labels,
  154. Annotations: input.Annotations,
  155. Status: input.Status,
  156. SpecProviderID: input.Spec.ProviderID,
  157. }
  158. }
  159. func transformService(input *v1.Service) *Service {
  160. return &Service{
  161. Name: input.Name,
  162. Namespace: input.Namespace,
  163. SpecSelector: input.Spec.Selector,
  164. Type: input.Spec.Type,
  165. Status: input.Status,
  166. }
  167. }
  168. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  169. return &DaemonSet{
  170. Name: input.Name,
  171. Namespace: input.Namespace,
  172. Labels: input.Labels,
  173. SpecContainers: input.Spec.Template.Spec.Containers,
  174. }
  175. }
  176. func transformDeployment(input *appsv1.Deployment) *Deployment {
  177. return &Deployment{
  178. Name: input.Name,
  179. Namespace: input.Namespace,
  180. Labels: input.Labels,
  181. MatchLabels: input.Spec.Selector.MatchLabels,
  182. SpecReplicas: input.Spec.Replicas,
  183. SpecSelector: input.Spec.Selector,
  184. StatusAvailableReplicas: input.Status.AvailableReplicas,
  185. }
  186. }
  187. func transformStatefulSet(input *appsv1.StatefulSet) *StatefulSet {
  188. return &StatefulSet{
  189. Name: input.Name,
  190. Namespace: input.Namespace,
  191. SpecSelector: input.Spec.Selector,
  192. }
  193. }
  194. func transformPersistentVolume(input *v1.PersistentVolume) *PersistentVolume {
  195. return &PersistentVolume{
  196. Name: input.Name,
  197. Namespace: input.Namespace,
  198. Labels: input.Labels,
  199. Annotations: input.Annotations,
  200. Spec: input.Spec,
  201. Status: input.Status,
  202. }
  203. }
  204. func transformPersistentVolumeClaim(input *v1.PersistentVolumeClaim) *PersistentVolumeClaim {
  205. return &PersistentVolumeClaim{
  206. Name: input.Name,
  207. Namespace: input.Namespace,
  208. Spec: input.Spec,
  209. Annotations: input.Annotations,
  210. }
  211. }
  212. func transformStorageClass(input *stv1.StorageClass) *StorageClass {
  213. return &StorageClass{
  214. Name: input.Name,
  215. Annotations: input.Annotations,
  216. Parameters: input.Parameters,
  217. Provisioner: input.Provisioner,
  218. }
  219. }
  220. func transformJob(input *batchv1.Job) *Job {
  221. return &Job{
  222. Name: input.Name,
  223. Namespace: input.Namespace,
  224. Status: input.Status,
  225. }
  226. }
  227. func transformReplicationController(input *v1.ReplicationController) *ReplicationController {
  228. return &ReplicationController{}
  229. }
  230. func transformPodDisruptionBudget(input *policyv1.PodDisruptionBudget) *PodDisruptionBudget {
  231. return &PodDisruptionBudget{}
  232. }
  233. func transformReplicaSet(input *appsv1.ReplicaSet) *ReplicaSet {
  234. return &ReplicaSet{
  235. SpecSelector: input.Spec.Selector,
  236. }
  237. }
  238. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  239. // up to date resources using watchers
  240. type ClusterCache interface {
  241. // Run starts the watcher processes
  242. Run()
  243. // Stops the watcher processes
  244. Stop()
  245. // GetAllNamespaces returns all the cached namespaces
  246. GetAllNamespaces() []*Namespace
  247. // GetAllNodes returns all the cached nodes
  248. GetAllNodes() []*Node
  249. // GetAllPods returns all the cached pods
  250. GetAllPods() []*Pod
  251. // GetAllServices returns all the cached services
  252. GetAllServices() []*Service
  253. // GetAllDaemonSets returns all the cached DaemonSets
  254. GetAllDaemonSets() []*DaemonSet
  255. // GetAllDeployments returns all the cached deployments
  256. GetAllDeployments() []*Deployment
  257. // GetAllStatfulSets returns all the cached StatefulSets
  258. GetAllStatefulSets() []*StatefulSet
  259. // GetAllReplicaSets returns all the cached ReplicaSets
  260. GetAllReplicaSets() []*ReplicaSet
  261. // GetAllPersistentVolumes returns all the cached persistent volumes
  262. GetAllPersistentVolumes() []*PersistentVolume
  263. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  264. GetAllPersistentVolumeClaims() []*PersistentVolumeClaim
  265. // GetAllStorageClasses returns all the cached storage classes
  266. GetAllStorageClasses() []*StorageClass
  267. // GetAllJobs returns all the cached jobs
  268. GetAllJobs() []*Job
  269. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  270. GetAllPodDisruptionBudgets() []*PodDisruptionBudget
  271. // GetAllReplicationControllers returns all cached replication controllers
  272. GetAllReplicationControllers() []*ReplicationController
  273. // SetConfigMapUpdateFunc sets the configmap update function
  274. SetConfigMapUpdateFunc(func(interface{}))
  275. }
  276. // KubernetesClusterCache is the implementation of ClusterCache
  277. type KubernetesClusterCache struct {
  278. client kubernetes.Interface
  279. namespaceWatch WatchController
  280. nodeWatch WatchController
  281. podWatch WatchController
  282. kubecostConfigMapWatch WatchController
  283. serviceWatch WatchController
  284. daemonsetsWatch WatchController
  285. deploymentsWatch WatchController
  286. statefulsetWatch WatchController
  287. replicasetWatch WatchController
  288. pvWatch WatchController
  289. pvcWatch WatchController
  290. storageClassWatch WatchController
  291. jobsWatch WatchController
  292. pdbWatch WatchController
  293. replicationControllerWatch WatchController
  294. stop chan struct{}
  295. }
  296. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  297. defer wg.Done()
  298. wc.WarmUp(cancel)
  299. }
  300. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  301. return NewKubernetesClusterCacheV2(client)
  302. }
  303. func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
  304. coreRestClient := client.CoreV1().RESTClient()
  305. appsRestClient := client.AppsV1().RESTClient()
  306. storageRestClient := client.StorageV1().RESTClient()
  307. batchClient := client.BatchV1().RESTClient()
  308. pdbClient := client.PolicyV1().RESTClient()
  309. kubecostNamespace := env.GetKubecostNamespace()
  310. log.Infof("NAMESPACE: %s", kubecostNamespace)
  311. kcc := &KubernetesClusterCache{
  312. client: client,
  313. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  314. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  315. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  316. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  317. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  318. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  319. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  320. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  321. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  322. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  323. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  324. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  325. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  326. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  327. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  328. }
  329. // Wait for each caching watcher to initialize
  330. cancel := make(chan struct{})
  331. var wg sync.WaitGroup
  332. if env.IsETLReadOnlyMode() {
  333. wg.Add(1)
  334. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  335. } else {
  336. wg.Add(15)
  337. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  338. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  339. go initializeCache(kcc.nodeWatch, &wg, cancel)
  340. go initializeCache(kcc.podWatch, &wg, cancel)
  341. go initializeCache(kcc.serviceWatch, &wg, cancel)
  342. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  343. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  344. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  345. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  346. go initializeCache(kcc.pvWatch, &wg, cancel)
  347. go initializeCache(kcc.pvcWatch, &wg, cancel)
  348. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  349. go initializeCache(kcc.jobsWatch, &wg, cancel)
  350. go initializeCache(kcc.pdbWatch, &wg, cancel)
  351. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  352. }
  353. wg.Wait()
  354. log.Infof("Done waiting")
  355. return kcc
  356. }
  357. func (kcc *KubernetesClusterCache) Run() {
  358. if kcc.stop != nil {
  359. return
  360. }
  361. stopCh := make(chan struct{})
  362. go kcc.namespaceWatch.Run(1, stopCh)
  363. go kcc.nodeWatch.Run(1, stopCh)
  364. go kcc.podWatch.Run(1, stopCh)
  365. go kcc.serviceWatch.Run(1, stopCh)
  366. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  367. go kcc.daemonsetsWatch.Run(1, stopCh)
  368. go kcc.deploymentsWatch.Run(1, stopCh)
  369. go kcc.statefulsetWatch.Run(1, stopCh)
  370. go kcc.replicasetWatch.Run(1, stopCh)
  371. go kcc.pvWatch.Run(1, stopCh)
  372. go kcc.pvcWatch.Run(1, stopCh)
  373. go kcc.storageClassWatch.Run(1, stopCh)
  374. go kcc.jobsWatch.Run(1, stopCh)
  375. go kcc.pdbWatch.Run(1, stopCh)
  376. go kcc.replicationControllerWatch.Run(1, stopCh)
  377. kcc.stop = stopCh
  378. }
  379. func (kcc *KubernetesClusterCache) Stop() {
  380. if kcc.stop == nil {
  381. return
  382. }
  383. close(kcc.stop)
  384. kcc.stop = nil
  385. }
  386. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  387. var namespaces []*Namespace
  388. items := kcc.namespaceWatch.GetAll()
  389. for _, ns := range items {
  390. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  391. }
  392. return namespaces
  393. }
  394. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  395. var nodes []*Node
  396. items := kcc.nodeWatch.GetAll()
  397. for _, node := range items {
  398. nodes = append(nodes, transformNode(node.(*v1.Node)))
  399. }
  400. return nodes
  401. }
  402. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  403. var pods []*Pod
  404. items := kcc.podWatch.GetAll()
  405. for _, pod := range items {
  406. pods = append(pods, transformPod(pod.(*v1.Pod)))
  407. }
  408. return pods
  409. }
  410. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  411. var services []*Service
  412. items := kcc.serviceWatch.GetAll()
  413. for _, service := range items {
  414. services = append(services, transformService(service.(*v1.Service)))
  415. }
  416. return services
  417. }
  418. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  419. var daemonsets []*DaemonSet
  420. items := kcc.daemonsetsWatch.GetAll()
  421. for _, daemonset := range items {
  422. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  423. }
  424. return daemonsets
  425. }
  426. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  427. var deployments []*Deployment
  428. items := kcc.deploymentsWatch.GetAll()
  429. for _, deployment := range items {
  430. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  431. }
  432. return deployments
  433. }
  434. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
  435. var statefulsets []*StatefulSet
  436. items := kcc.statefulsetWatch.GetAll()
  437. for _, statefulset := range items {
  438. statefulsets = append(statefulsets, transformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  439. }
  440. return statefulsets
  441. }
  442. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*ReplicaSet {
  443. var replicasets []*ReplicaSet
  444. items := kcc.replicasetWatch.GetAll()
  445. for _, replicaset := range items {
  446. replicasets = append(replicasets, transformReplicaSet(replicaset.(*appsv1.ReplicaSet)))
  447. }
  448. return replicasets
  449. }
  450. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
  451. var pvs []*PersistentVolume
  452. items := kcc.pvWatch.GetAll()
  453. for _, pv := range items {
  454. pvs = append(pvs, transformPersistentVolume(pv.(*v1.PersistentVolume)))
  455. }
  456. return pvs
  457. }
  458. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
  459. var pvcs []*PersistentVolumeClaim
  460. items := kcc.pvcWatch.GetAll()
  461. for _, pvc := range items {
  462. pvcs = append(pvcs, transformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  463. }
  464. return pvcs
  465. }
  466. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*StorageClass {
  467. var storageClasses []*StorageClass
  468. items := kcc.storageClassWatch.GetAll()
  469. for _, stc := range items {
  470. storageClasses = append(storageClasses, transformStorageClass(stc.(*stv1.StorageClass)))
  471. }
  472. return storageClasses
  473. }
  474. func (kcc *KubernetesClusterCache) GetAllJobs() []*Job {
  475. var jobs []*Job
  476. items := kcc.jobsWatch.GetAll()
  477. for _, job := range items {
  478. jobs = append(jobs, transformJob(job.(*batchv1.Job)))
  479. }
  480. return jobs
  481. }
  482. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
  483. var pdbs []*PodDisruptionBudget
  484. items := kcc.pdbWatch.GetAll()
  485. for _, pdb := range items {
  486. pdbs = append(pdbs, transformPodDisruptionBudget(pdb.(*policyv1.PodDisruptionBudget)))
  487. }
  488. return pdbs
  489. }
  490. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*ReplicationController {
  491. var rcs []*ReplicationController
  492. items := kcc.replicationControllerWatch.GetAll()
  493. for _, rc := range items {
  494. rcs = append(rcs, transformReplicationController(rc.(*v1.ReplicationController)))
  495. }
  496. return rcs
  497. }
  498. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  499. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  500. }