clustercache.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. package clustercache
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/pkg/env"
  7. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  8. "k8s.io/apimachinery/pkg/types"
  9. "k8s.io/utils/ptr"
  10. appsv1 "k8s.io/api/apps/v1"
  11. batchv1 "k8s.io/api/batch/v1"
  12. v1 "k8s.io/api/core/v1"
  13. policyv1 "k8s.io/api/policy/v1"
  14. stv1 "k8s.io/api/storage/v1"
  15. "k8s.io/apimachinery/pkg/fields"
  16. "k8s.io/client-go/kubernetes"
  17. )
  18. type Namespace struct {
  19. Name string
  20. Labels map[string]string
  21. Annotations map[string]string
  22. }
  23. type Pod struct {
  24. UID types.UID
  25. Name string
  26. Namespace string
  27. Labels map[string]string
  28. Annotations map[string]string
  29. OwnerReferences []metav1.OwnerReference
  30. Status PodStatus
  31. Spec PodSpec
  32. DeletionTimestamp *time.Time
  33. }
  34. type PodStatus struct {
  35. Phase v1.PodPhase
  36. ContainerStatuses []v1.ContainerStatus
  37. }
  38. type PodSpec struct {
  39. NodeName string
  40. Containers []Container
  41. Volumes []v1.Volume
  42. RestartPolicy v1.RestartPolicy
  43. }
  44. type Container struct {
  45. Name string
  46. Resources v1.ResourceRequirements
  47. }
  48. type Node struct {
  49. Name string
  50. Labels map[string]string
  51. Annotations map[string]string
  52. Status v1.NodeStatus
  53. SpecProviderID string
  54. }
  55. type Service struct {
  56. Name string
  57. Namespace string
  58. SpecSelector map[string]string
  59. Type v1.ServiceType
  60. Status v1.ServiceStatus
  61. }
  62. type DaemonSet struct {
  63. Name string
  64. Namespace string
  65. Labels map[string]string
  66. SpecContainers []v1.Container
  67. }
  68. type Deployment struct {
  69. Name string
  70. Namespace string
  71. Labels map[string]string
  72. Annotations map[string]string
  73. MatchLabels map[string]string
  74. SpecSelector *metav1.LabelSelector
  75. SpecReplicas *int32
  76. SpecStrategy appsv1.DeploymentStrategy
  77. StatusAvailableReplicas int32
  78. PodSpec PodSpec
  79. }
  80. type StatefulSet struct {
  81. Name string
  82. Namespace string
  83. Labels map[string]string
  84. Annotations map[string]string
  85. SpecSelector *metav1.LabelSelector
  86. SpecReplicas *int32
  87. PodSpec PodSpec
  88. }
  89. type PersistentVolumeClaim struct {
  90. Name string
  91. Namespace string
  92. Spec v1.PersistentVolumeClaimSpec
  93. Labels map[string]string
  94. Annotations map[string]string
  95. }
  96. type StorageClass struct {
  97. Name string
  98. Labels map[string]string
  99. Annotations map[string]string
  100. Parameters map[string]string
  101. Provisioner string
  102. TypeMeta metav1.TypeMeta
  103. Size int
  104. }
  105. type Job struct {
  106. Name string
  107. Namespace string
  108. Status batchv1.JobStatus
  109. }
  110. type PersistentVolume struct {
  111. Name string
  112. Namespace string
  113. Labels map[string]string
  114. Annotations map[string]string
  115. Spec v1.PersistentVolumeSpec
  116. Status v1.PersistentVolumeStatus
  117. }
  118. type ReplicationController struct {
  119. Name string
  120. Namespace string
  121. Spec v1.ReplicationControllerSpec
  122. }
  123. type PodDisruptionBudget struct {
  124. Name string
  125. Namespace string
  126. Spec policyv1.PodDisruptionBudgetSpec
  127. Status policyv1.PodDisruptionBudgetStatus
  128. }
  129. type ReplicaSet struct {
  130. Name string
  131. Namespace string
  132. SpecSelector *metav1.LabelSelector
  133. Spec appsv1.ReplicaSetSpec
  134. }
  135. type Volume struct {
  136. }
  137. // GetControllerOf returns a pointer to a copy of the controllerRef if controllee has a controller
  138. func GetControllerOf(pod *Pod) *metav1.OwnerReference {
  139. ref := GetControllerOfNoCopy(pod)
  140. if ref == nil {
  141. return nil
  142. }
  143. cp := *ref
  144. cp.Controller = ptr.To(*ref.Controller)
  145. if ref.BlockOwnerDeletion != nil {
  146. cp.BlockOwnerDeletion = ptr.To(*ref.BlockOwnerDeletion)
  147. }
  148. return &cp
  149. }
  150. // GetControllerOfNoCopy returns a pointer to the controllerRef if controllee has a controller
  151. func GetControllerOfNoCopy(pod *Pod) *metav1.OwnerReference {
  152. refs := pod.OwnerReferences
  153. for i := range refs {
  154. if refs[i].Controller != nil && *refs[i].Controller {
  155. return &refs[i]
  156. }
  157. }
  158. return nil
  159. }
  160. func transformNamespace(input *v1.Namespace) *Namespace {
  161. return &Namespace{
  162. Name: input.Name,
  163. Annotations: input.Annotations,
  164. Labels: input.Labels,
  165. }
  166. }
  167. func transformPodContainer(input v1.Container) Container {
  168. return Container{
  169. Name: input.Name,
  170. Resources: input.Resources,
  171. }
  172. }
  173. func transformPodStatus(input v1.PodStatus) PodStatus {
  174. return PodStatus{
  175. Phase: input.Phase,
  176. ContainerStatuses: input.ContainerStatuses,
  177. }
  178. }
  179. func transformPodSpec(input v1.PodSpec) PodSpec {
  180. containers := make([]Container, len(input.Containers))
  181. for i, container := range input.Containers {
  182. containers[i] = transformPodContainer(container)
  183. }
  184. return PodSpec{
  185. NodeName: input.NodeName,
  186. Containers: containers,
  187. Volumes: input.Volumes,
  188. RestartPolicy: input.RestartPolicy,
  189. }
  190. }
  191. func transformTimestamp(input *metav1.Time) *time.Time {
  192. if input == nil {
  193. return nil
  194. }
  195. t := input.Time
  196. return &t
  197. }
  198. func transformPod(input *v1.Pod) *Pod {
  199. return &Pod{
  200. UID: input.UID,
  201. Name: input.Name,
  202. Namespace: input.Namespace,
  203. Labels: input.Labels,
  204. Annotations: input.Annotations,
  205. OwnerReferences: input.OwnerReferences,
  206. Spec: transformPodSpec(input.Spec),
  207. Status: transformPodStatus(input.Status),
  208. DeletionTimestamp: transformTimestamp(input.DeletionTimestamp),
  209. }
  210. }
  211. func transformNode(input *v1.Node) *Node {
  212. return &Node{
  213. Name: input.Name,
  214. Labels: input.Labels,
  215. Annotations: input.Annotations,
  216. Status: input.Status,
  217. SpecProviderID: input.Spec.ProviderID,
  218. }
  219. }
  220. func transformService(input *v1.Service) *Service {
  221. return &Service{
  222. Name: input.Name,
  223. Namespace: input.Namespace,
  224. SpecSelector: input.Spec.Selector,
  225. Type: input.Spec.Type,
  226. Status: input.Status,
  227. }
  228. }
  229. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  230. return &DaemonSet{
  231. Name: input.Name,
  232. Namespace: input.Namespace,
  233. Labels: input.Labels,
  234. SpecContainers: input.Spec.Template.Spec.Containers,
  235. }
  236. }
  237. func transformDeployment(input *appsv1.Deployment) *Deployment {
  238. return &Deployment{
  239. Name: input.Name,
  240. Namespace: input.Namespace,
  241. Labels: input.Labels,
  242. MatchLabels: input.Spec.Selector.MatchLabels,
  243. SpecReplicas: input.Spec.Replicas,
  244. SpecSelector: input.Spec.Selector,
  245. SpecStrategy: input.Spec.Strategy,
  246. StatusAvailableReplicas: input.Status.AvailableReplicas,
  247. PodSpec: transformPodSpec(input.Spec.Template.Spec),
  248. }
  249. }
  250. func transformStatefulSet(input *appsv1.StatefulSet) *StatefulSet {
  251. return &StatefulSet{
  252. Name: input.Name,
  253. Namespace: input.Namespace,
  254. SpecSelector: input.Spec.Selector,
  255. SpecReplicas: input.Spec.Replicas,
  256. PodSpec: transformPodSpec(input.Spec.Template.Spec),
  257. }
  258. }
  259. func transformPersistentVolume(input *v1.PersistentVolume) *PersistentVolume {
  260. return &PersistentVolume{
  261. Name: input.Name,
  262. Namespace: input.Namespace,
  263. Labels: input.Labels,
  264. Annotations: input.Annotations,
  265. Spec: input.Spec,
  266. Status: input.Status,
  267. }
  268. }
  269. func transformPersistentVolumeClaim(input *v1.PersistentVolumeClaim) *PersistentVolumeClaim {
  270. return &PersistentVolumeClaim{
  271. Name: input.Name,
  272. Namespace: input.Namespace,
  273. Spec: input.Spec,
  274. Labels: input.Labels,
  275. Annotations: input.Annotations,
  276. }
  277. }
  278. func transformStorageClass(input *stv1.StorageClass) *StorageClass {
  279. return &StorageClass{
  280. Name: input.Name,
  281. Annotations: input.Annotations,
  282. Labels: input.Labels,
  283. Parameters: input.Parameters,
  284. Provisioner: input.Provisioner,
  285. TypeMeta: input.TypeMeta,
  286. Size: input.Size(),
  287. }
  288. }
  289. func transformJob(input *batchv1.Job) *Job {
  290. return &Job{
  291. Name: input.Name,
  292. Namespace: input.Namespace,
  293. Status: input.Status,
  294. }
  295. }
  296. func transformReplicationController(input *v1.ReplicationController) *ReplicationController {
  297. return &ReplicationController{
  298. Name: input.Name,
  299. Namespace: input.Namespace,
  300. Spec: input.Spec,
  301. }
  302. }
  303. func transformPodDisruptionBudget(input *policyv1.PodDisruptionBudget) *PodDisruptionBudget {
  304. return &PodDisruptionBudget{
  305. Name: input.Name,
  306. Namespace: input.Namespace,
  307. Spec: input.Spec,
  308. Status: input.Status,
  309. }
  310. }
  311. func transformReplicaSet(input *appsv1.ReplicaSet) *ReplicaSet {
  312. return &ReplicaSet{
  313. Name: input.Name,
  314. Namespace: input.Namespace,
  315. Spec: input.Spec,
  316. SpecSelector: input.Spec.Selector,
  317. }
  318. }
  319. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  320. // up to date resources using watchers
  321. type ClusterCache interface {
  322. // Run starts the watcher processes
  323. Run()
  324. // Stops the watcher processes
  325. Stop()
  326. // GetAllNamespaces returns all the cached namespaces
  327. GetAllNamespaces() []*Namespace
  328. // GetAllNodes returns all the cached nodes
  329. GetAllNodes() []*Node
  330. // GetAllPods returns all the cached pods
  331. GetAllPods() []*Pod
  332. // GetAllServices returns all the cached services
  333. GetAllServices() []*Service
  334. // GetAllDaemonSets returns all the cached DaemonSets
  335. GetAllDaemonSets() []*DaemonSet
  336. // GetAllDeployments returns all the cached deployments
  337. GetAllDeployments() []*Deployment
  338. // GetAllStatfulSets returns all the cached StatefulSets
  339. GetAllStatefulSets() []*StatefulSet
  340. // GetAllReplicaSets returns all the cached ReplicaSets
  341. GetAllReplicaSets() []*ReplicaSet
  342. // GetAllPersistentVolumes returns all the cached persistent volumes
  343. GetAllPersistentVolumes() []*PersistentVolume
  344. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  345. GetAllPersistentVolumeClaims() []*PersistentVolumeClaim
  346. // GetAllStorageClasses returns all the cached storage classes
  347. GetAllStorageClasses() []*StorageClass
  348. // GetAllJobs returns all the cached jobs
  349. GetAllJobs() []*Job
  350. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  351. GetAllPodDisruptionBudgets() []*PodDisruptionBudget
  352. // GetAllReplicationControllers returns all cached replication controllers
  353. GetAllReplicationControllers() []*ReplicationController
  354. }
  355. // KubernetesClusterCache is the implementation of ClusterCache
  356. type KubernetesClusterCache struct {
  357. client kubernetes.Interface
  358. namespaceWatch WatchController
  359. nodeWatch WatchController
  360. podWatch WatchController
  361. serviceWatch WatchController
  362. daemonsetsWatch WatchController
  363. deploymentsWatch WatchController
  364. statefulsetWatch WatchController
  365. replicasetWatch WatchController
  366. pvWatch WatchController
  367. pvcWatch WatchController
  368. storageClassWatch WatchController
  369. jobsWatch WatchController
  370. pdbWatch WatchController
  371. replicationControllerWatch WatchController
  372. stop chan struct{}
  373. }
  374. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  375. defer wg.Done()
  376. wc.WarmUp(cancel)
  377. }
  378. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  379. if env.GetUseCacheV1() {
  380. return NewKubernetesClusterCacheV1(client)
  381. }
  382. return NewKubernetesClusterCacheV2(client)
  383. }
  384. func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
  385. coreRestClient := client.CoreV1().RESTClient()
  386. appsRestClient := client.AppsV1().RESTClient()
  387. storageRestClient := client.StorageV1().RESTClient()
  388. batchClient := client.BatchV1().RESTClient()
  389. pdbClient := client.PolicyV1().RESTClient()
  390. kubecostNamespace := env.GetKubecostNamespace()
  391. log.Infof("NAMESPACE: %s", kubecostNamespace)
  392. kcc := &KubernetesClusterCache{
  393. client: client,
  394. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  395. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  396. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  397. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  398. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  399. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  400. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  401. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  402. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  403. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  404. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  405. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  406. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  407. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  408. }
  409. // Wait for each caching watcher to initialize
  410. cancel := make(chan struct{})
  411. var wg sync.WaitGroup
  412. if !env.IsETLReadOnlyMode() {
  413. wg.Add(14)
  414. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  415. go initializeCache(kcc.nodeWatch, &wg, cancel)
  416. go initializeCache(kcc.podWatch, &wg, cancel)
  417. go initializeCache(kcc.serviceWatch, &wg, cancel)
  418. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  419. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  420. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  421. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  422. go initializeCache(kcc.pvWatch, &wg, cancel)
  423. go initializeCache(kcc.pvcWatch, &wg, cancel)
  424. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  425. go initializeCache(kcc.jobsWatch, &wg, cancel)
  426. go initializeCache(kcc.pdbWatch, &wg, cancel)
  427. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  428. }
  429. wg.Wait()
  430. log.Infof("Done waiting")
  431. return kcc
  432. }
  433. func (kcc *KubernetesClusterCache) Run() {
  434. if kcc.stop != nil {
  435. return
  436. }
  437. stopCh := make(chan struct{})
  438. go kcc.namespaceWatch.Run(1, stopCh)
  439. go kcc.nodeWatch.Run(1, stopCh)
  440. go kcc.podWatch.Run(1, stopCh)
  441. go kcc.serviceWatch.Run(1, stopCh)
  442. go kcc.daemonsetsWatch.Run(1, stopCh)
  443. go kcc.deploymentsWatch.Run(1, stopCh)
  444. go kcc.statefulsetWatch.Run(1, stopCh)
  445. go kcc.replicasetWatch.Run(1, stopCh)
  446. go kcc.pvWatch.Run(1, stopCh)
  447. go kcc.pvcWatch.Run(1, stopCh)
  448. go kcc.storageClassWatch.Run(1, stopCh)
  449. go kcc.jobsWatch.Run(1, stopCh)
  450. go kcc.pdbWatch.Run(1, stopCh)
  451. go kcc.replicationControllerWatch.Run(1, stopCh)
  452. kcc.stop = stopCh
  453. }
  454. func (kcc *KubernetesClusterCache) Stop() {
  455. if kcc.stop == nil {
  456. return
  457. }
  458. close(kcc.stop)
  459. kcc.stop = nil
  460. }
  461. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  462. var namespaces []*Namespace
  463. items := kcc.namespaceWatch.GetAll()
  464. for _, ns := range items {
  465. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  466. }
  467. return namespaces
  468. }
  469. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  470. var nodes []*Node
  471. items := kcc.nodeWatch.GetAll()
  472. for _, node := range items {
  473. nodes = append(nodes, transformNode(node.(*v1.Node)))
  474. }
  475. return nodes
  476. }
  477. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  478. var pods []*Pod
  479. items := kcc.podWatch.GetAll()
  480. for _, pod := range items {
  481. pods = append(pods, transformPod(pod.(*v1.Pod)))
  482. }
  483. return pods
  484. }
  485. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  486. var services []*Service
  487. items := kcc.serviceWatch.GetAll()
  488. for _, service := range items {
  489. services = append(services, transformService(service.(*v1.Service)))
  490. }
  491. return services
  492. }
  493. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  494. var daemonsets []*DaemonSet
  495. items := kcc.daemonsetsWatch.GetAll()
  496. for _, daemonset := range items {
  497. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  498. }
  499. return daemonsets
  500. }
  501. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  502. var deployments []*Deployment
  503. items := kcc.deploymentsWatch.GetAll()
  504. for _, deployment := range items {
  505. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  506. }
  507. return deployments
  508. }
  509. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
  510. var statefulsets []*StatefulSet
  511. items := kcc.statefulsetWatch.GetAll()
  512. for _, statefulset := range items {
  513. statefulsets = append(statefulsets, transformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  514. }
  515. return statefulsets
  516. }
  517. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*ReplicaSet {
  518. var replicasets []*ReplicaSet
  519. items := kcc.replicasetWatch.GetAll()
  520. for _, replicaset := range items {
  521. replicasets = append(replicasets, transformReplicaSet(replicaset.(*appsv1.ReplicaSet)))
  522. }
  523. return replicasets
  524. }
  525. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
  526. var pvs []*PersistentVolume
  527. items := kcc.pvWatch.GetAll()
  528. for _, pv := range items {
  529. pvs = append(pvs, transformPersistentVolume(pv.(*v1.PersistentVolume)))
  530. }
  531. return pvs
  532. }
  533. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
  534. var pvcs []*PersistentVolumeClaim
  535. items := kcc.pvcWatch.GetAll()
  536. for _, pvc := range items {
  537. pvcs = append(pvcs, transformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  538. }
  539. return pvcs
  540. }
  541. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*StorageClass {
  542. var storageClasses []*StorageClass
  543. items := kcc.storageClassWatch.GetAll()
  544. for _, stc := range items {
  545. storageClasses = append(storageClasses, transformStorageClass(stc.(*stv1.StorageClass)))
  546. }
  547. return storageClasses
  548. }
  549. func (kcc *KubernetesClusterCache) GetAllJobs() []*Job {
  550. var jobs []*Job
  551. items := kcc.jobsWatch.GetAll()
  552. for _, job := range items {
  553. jobs = append(jobs, transformJob(job.(*batchv1.Job)))
  554. }
  555. return jobs
  556. }
  557. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
  558. var pdbs []*PodDisruptionBudget
  559. items := kcc.pdbWatch.GetAll()
  560. for _, pdb := range items {
  561. pdbs = append(pdbs, transformPodDisruptionBudget(pdb.(*policyv1.PodDisruptionBudget)))
  562. }
  563. return pdbs
  564. }
  565. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*ReplicationController {
  566. var rcs []*ReplicationController
  567. items := kcc.replicationControllerWatch.GetAll()
  568. for _, rc := range items {
  569. rcs = append(rcs, transformReplicationController(rc.(*v1.ReplicationController)))
  570. }
  571. return rcs
  572. }