clustercache2.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/pkg/env"
  5. appsv1 "k8s.io/api/apps/v1"
  6. batchv1 "k8s.io/api/batch/v1"
  7. v1 "k8s.io/api/core/v1"
  8. policyv1 "k8s.io/api/policy/v1"
  9. stv1 "k8s.io/api/storage/v1"
  10. "k8s.io/client-go/kubernetes"
  11. )
  12. type KubernetesClusterCacheV2 struct {
  13. namespaceStore *GenericStore[*v1.Namespace, *Namespace]
  14. nodeStore *GenericStore[*v1.Node, *Node]
  15. podStore *GenericStore[*v1.Pod, *Pod]
  16. serviceStore *GenericStore[*v1.Service, *Service]
  17. daemonSetStore *GenericStore[*appsv1.DaemonSet, *DaemonSet]
  18. deploymentStore *GenericStore[*appsv1.Deployment, *Deployment]
  19. statefulSetStore *GenericStore[*appsv1.StatefulSet, *StatefulSet]
  20. persistentVolumeStore *GenericStore[*v1.PersistentVolume, *PersistentVolume]
  21. persistentVolumeClaimStore *GenericStore[*v1.PersistentVolumeClaim, *PersistentVolumeClaim]
  22. storageClassStore *GenericStore[*stv1.StorageClass, *StorageClass]
  23. jobStore *GenericStore[*batchv1.Job, *Job]
  24. replicationControllerStore *GenericStore[*v1.ReplicationController, *ReplicationController]
  25. replicaSetStore *GenericStore[*appsv1.ReplicaSet, *ReplicaSet]
  26. pdbStore *GenericStore[*policyv1.PodDisruptionBudget, *PodDisruptionBudget]
  27. stopCh chan struct{}
  28. }
  29. func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClusterCacheV2 {
  30. return &KubernetesClusterCacheV2{
  31. namespaceStore: CreateStore(clientset.CoreV1().RESTClient(), "namespaces", transformNamespace),
  32. nodeStore: CreateStore(clientset.CoreV1().RESTClient(), "nodes", transformNode),
  33. persistentVolumeClaimStore: CreateStore(clientset.CoreV1().RESTClient(), "persistentvolumeclaims", transformPersistentVolumeClaim),
  34. persistentVolumeStore: CreateStore(clientset.CoreV1().RESTClient(), "persistentvolumes", transformPersistentVolume),
  35. podStore: CreateStore(clientset.CoreV1().RESTClient(), "pods", transformPod),
  36. replicationControllerStore: CreateStore(clientset.CoreV1().RESTClient(), "replicationcontrollers", transformReplicationController),
  37. serviceStore: CreateStore(clientset.CoreV1().RESTClient(), "services", transformService),
  38. daemonSetStore: CreateStore(clientset.AppsV1().RESTClient(), "daemonsets", transformDaemonSet),
  39. deploymentStore: CreateStore(clientset.AppsV1().RESTClient(), "deployments", transformDeployment),
  40. replicaSetStore: CreateStore(clientset.AppsV1().RESTClient(), "replicasets", transformReplicaSet),
  41. statefulSetStore: CreateStore(clientset.AppsV1().RESTClient(), "statefulsets", transformStatefulSet),
  42. storageClassStore: CreateStore(clientset.StorageV1().RESTClient(), "storageclasses", transformStorageClass),
  43. jobStore: CreateStore(clientset.BatchV1().RESTClient(), "jobs", transformJob),
  44. pdbStore: CreateStore(clientset.PolicyV1().RESTClient(), "poddisruptionbudgets", transformPodDisruptionBudget),
  45. stopCh: make(chan struct{}),
  46. }
  47. }
  48. func (kcc *KubernetesClusterCacheV2) Run() {
  49. var wg sync.WaitGroup
  50. if !env.IsETLReadOnlyMode() {
  51. wg.Add(14)
  52. kcc.namespaceStore.Watch(kcc.stopCh, wg.Done)
  53. kcc.nodeStore.Watch(kcc.stopCh, wg.Done)
  54. kcc.persistentVolumeClaimStore.Watch(kcc.stopCh, wg.Done)
  55. kcc.persistentVolumeStore.Watch(kcc.stopCh, wg.Done)
  56. kcc.podStore.Watch(kcc.stopCh, wg.Done)
  57. kcc.replicationControllerStore.Watch(kcc.stopCh, wg.Done)
  58. kcc.serviceStore.Watch(kcc.stopCh, wg.Done)
  59. kcc.daemonSetStore.Watch(kcc.stopCh, wg.Done)
  60. kcc.deploymentStore.Watch(kcc.stopCh, wg.Done)
  61. kcc.replicaSetStore.Watch(kcc.stopCh, wg.Done)
  62. kcc.statefulSetStore.Watch(kcc.stopCh, wg.Done)
  63. kcc.storageClassStore.Watch(kcc.stopCh, wg.Done)
  64. kcc.jobStore.Watch(kcc.stopCh, wg.Done)
  65. kcc.pdbStore.Watch(kcc.stopCh, wg.Done)
  66. }
  67. wg.Wait()
  68. }
  69. func (kcc *KubernetesClusterCacheV2) Stop() {
  70. if kcc.stopCh != nil {
  71. close(kcc.stopCh)
  72. kcc.stopCh = nil
  73. }
  74. }
  75. func (kcc *KubernetesClusterCacheV2) GetAllNamespaces() []*Namespace {
  76. return kcc.namespaceStore.GetAll()
  77. }
  78. func (kcc *KubernetesClusterCacheV2) GetAllNodes() []*Node {
  79. return kcc.nodeStore.GetAll()
  80. }
  81. func (kcc *KubernetesClusterCacheV2) GetAllPods() []*Pod {
  82. return kcc.podStore.GetAll()
  83. }
  84. func (kcc *KubernetesClusterCacheV2) GetAllServices() []*Service {
  85. return kcc.serviceStore.GetAll()
  86. }
  87. func (kcc *KubernetesClusterCacheV2) GetAllDaemonSets() []*DaemonSet {
  88. return kcc.daemonSetStore.GetAll()
  89. }
  90. func (kcc *KubernetesClusterCacheV2) GetAllDeployments() []*Deployment {
  91. return kcc.deploymentStore.GetAll()
  92. }
  93. func (kcc *KubernetesClusterCacheV2) GetAllStatefulSets() []*StatefulSet {
  94. return kcc.statefulSetStore.GetAll()
  95. }
  96. func (kcc *KubernetesClusterCacheV2) GetAllPersistentVolumes() []*PersistentVolume {
  97. return kcc.persistentVolumeStore.GetAll()
  98. }
  99. func (kcc *KubernetesClusterCacheV2) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
  100. return kcc.persistentVolumeClaimStore.GetAll()
  101. }
  102. func (kcc *KubernetesClusterCacheV2) GetAllStorageClasses() []*StorageClass {
  103. return kcc.storageClassStore.GetAll()
  104. }
  105. func (kcc *KubernetesClusterCacheV2) GetAllJobs() []*Job {
  106. return kcc.jobStore.GetAll()
  107. }
  108. func (kcc *KubernetesClusterCacheV2) GetAllReplicationControllers() []*ReplicationController {
  109. return kcc.replicationControllerStore.GetAll()
  110. }
  111. func (kcc *KubernetesClusterCacheV2) GetAllReplicaSets() []*ReplicaSet {
  112. return kcc.replicaSetStore.GetAll()
  113. }
  114. func (kcc *KubernetesClusterCacheV2) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
  115. return kcc.pdbStore.GetAll()
  116. }