2
0

clustercache2.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package clustercache
  2. import (
  3. "sync"
  4. cc "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/pkg/env"
  6. appsv1 "k8s.io/api/apps/v1"
  7. batchv1 "k8s.io/api/batch/v1"
  8. v1 "k8s.io/api/core/v1"
  9. policyv1 "k8s.io/api/policy/v1"
  10. stv1 "k8s.io/api/storage/v1"
  11. "k8s.io/client-go/kubernetes"
  12. )
  13. type KubernetesClusterCacheV2 struct {
  14. namespaceStore *GenericStore[*v1.Namespace, *cc.Namespace]
  15. nodeStore *GenericStore[*v1.Node, *cc.Node]
  16. podStore *GenericStore[*v1.Pod, *cc.Pod]
  17. serviceStore *GenericStore[*v1.Service, *cc.Service]
  18. daemonSetStore *GenericStore[*appsv1.DaemonSet, *cc.DaemonSet]
  19. deploymentStore *GenericStore[*appsv1.Deployment, *cc.Deployment]
  20. statefulSetStore *GenericStore[*appsv1.StatefulSet, *cc.StatefulSet]
  21. persistentVolumeStore *GenericStore[*v1.PersistentVolume, *cc.PersistentVolume]
  22. persistentVolumeClaimStore *GenericStore[*v1.PersistentVolumeClaim, *cc.PersistentVolumeClaim]
  23. storageClassStore *GenericStore[*stv1.StorageClass, *cc.StorageClass]
  24. jobStore *GenericStore[*batchv1.Job, *cc.Job]
  25. replicationControllerStore *GenericStore[*v1.ReplicationController, *cc.ReplicationController]
  26. replicaSetStore *GenericStore[*appsv1.ReplicaSet, *cc.ReplicaSet]
  27. pdbStore *GenericStore[*policyv1.PodDisruptionBudget, *cc.PodDisruptionBudget]
  28. resourceQuotasStore *GenericStore[*v1.ResourceQuota, *cc.ResourceQuota]
  29. stopCh chan struct{}
  30. }
  31. func NewKubernetesClusterCacheV2(clientset kubernetes.Interface) *KubernetesClusterCacheV2 {
  32. return &KubernetesClusterCacheV2{
  33. namespaceStore: CreateStore(clientset.CoreV1().RESTClient(), "namespaces", cc.TransformNamespace),
  34. nodeStore: CreateStore(clientset.CoreV1().RESTClient(), "nodes", cc.TransformNode),
  35. persistentVolumeClaimStore: CreateStore(clientset.CoreV1().RESTClient(), "persistentvolumeclaims", cc.TransformPersistentVolumeClaim),
  36. persistentVolumeStore: CreateStore(clientset.CoreV1().RESTClient(), "persistentvolumes", cc.TransformPersistentVolume),
  37. podStore: CreateStore(clientset.CoreV1().RESTClient(), "pods", cc.TransformPod),
  38. replicationControllerStore: CreateStore(clientset.CoreV1().RESTClient(), "replicationcontrollers", cc.TransformReplicationController),
  39. serviceStore: CreateStore(clientset.CoreV1().RESTClient(), "services", cc.TransformService),
  40. daemonSetStore: CreateStore(clientset.AppsV1().RESTClient(), "daemonsets", cc.TransformDaemonSet),
  41. deploymentStore: CreateStore(clientset.AppsV1().RESTClient(), "deployments", cc.TransformDeployment),
  42. replicaSetStore: CreateStore(clientset.AppsV1().RESTClient(), "replicasets", cc.TransformReplicaSet),
  43. statefulSetStore: CreateStore(clientset.AppsV1().RESTClient(), "statefulsets", cc.TransformStatefulSet),
  44. storageClassStore: CreateStore(clientset.StorageV1().RESTClient(), "storageclasses", cc.TransformStorageClass),
  45. jobStore: CreateStore(clientset.BatchV1().RESTClient(), "jobs", cc.TransformJob),
  46. pdbStore: CreateStore(clientset.PolicyV1().RESTClient(), "poddisruptionbudgets", cc.TransformPodDisruptionBudget),
  47. resourceQuotasStore: CreateStore(clientset.CoreV1().RESTClient(), "resourcequotas", cc.TransformResourceQuota),
  48. stopCh: make(chan struct{}),
  49. }
  50. }
  51. func (kcc *KubernetesClusterCacheV2) Run() {
  52. var wg sync.WaitGroup
  53. if env.HasKubernetesResourceAccess() {
  54. wg.Add(15)
  55. kcc.namespaceStore.Watch(kcc.stopCh, wg.Done)
  56. kcc.nodeStore.Watch(kcc.stopCh, wg.Done)
  57. kcc.persistentVolumeClaimStore.Watch(kcc.stopCh, wg.Done)
  58. kcc.persistentVolumeStore.Watch(kcc.stopCh, wg.Done)
  59. kcc.podStore.Watch(kcc.stopCh, wg.Done)
  60. kcc.replicationControllerStore.Watch(kcc.stopCh, wg.Done)
  61. kcc.serviceStore.Watch(kcc.stopCh, wg.Done)
  62. kcc.daemonSetStore.Watch(kcc.stopCh, wg.Done)
  63. kcc.deploymentStore.Watch(kcc.stopCh, wg.Done)
  64. kcc.replicaSetStore.Watch(kcc.stopCh, wg.Done)
  65. kcc.statefulSetStore.Watch(kcc.stopCh, wg.Done)
  66. kcc.storageClassStore.Watch(kcc.stopCh, wg.Done)
  67. kcc.jobStore.Watch(kcc.stopCh, wg.Done)
  68. kcc.pdbStore.Watch(kcc.stopCh, wg.Done)
  69. kcc.resourceQuotasStore.Watch(kcc.stopCh, wg.Done)
  70. }
  71. wg.Wait()
  72. }
  73. func (kcc *KubernetesClusterCacheV2) Stop() {
  74. if kcc.stopCh != nil {
  75. close(kcc.stopCh)
  76. kcc.stopCh = nil
  77. }
  78. }
  79. func (kcc *KubernetesClusterCacheV2) GetAllNamespaces() []*cc.Namespace {
  80. return kcc.namespaceStore.GetAll()
  81. }
  82. func (kcc *KubernetesClusterCacheV2) GetAllNodes() []*cc.Node {
  83. return kcc.nodeStore.GetAll()
  84. }
  85. func (kcc *KubernetesClusterCacheV2) GetAllPods() []*cc.Pod {
  86. return kcc.podStore.GetAll()
  87. }
  88. func (kcc *KubernetesClusterCacheV2) GetAllServices() []*cc.Service {
  89. return kcc.serviceStore.GetAll()
  90. }
  91. func (kcc *KubernetesClusterCacheV2) GetAllDaemonSets() []*cc.DaemonSet {
  92. return kcc.daemonSetStore.GetAll()
  93. }
  94. func (kcc *KubernetesClusterCacheV2) GetAllDeployments() []*cc.Deployment {
  95. return kcc.deploymentStore.GetAll()
  96. }
  97. func (kcc *KubernetesClusterCacheV2) GetAllStatefulSets() []*cc.StatefulSet {
  98. return kcc.statefulSetStore.GetAll()
  99. }
  100. func (kcc *KubernetesClusterCacheV2) GetAllPersistentVolumes() []*cc.PersistentVolume {
  101. return kcc.persistentVolumeStore.GetAll()
  102. }
  103. func (kcc *KubernetesClusterCacheV2) GetAllPersistentVolumeClaims() []*cc.PersistentVolumeClaim {
  104. return kcc.persistentVolumeClaimStore.GetAll()
  105. }
  106. func (kcc *KubernetesClusterCacheV2) GetAllStorageClasses() []*cc.StorageClass {
  107. return kcc.storageClassStore.GetAll()
  108. }
  109. func (kcc *KubernetesClusterCacheV2) GetAllJobs() []*cc.Job {
  110. return kcc.jobStore.GetAll()
  111. }
  112. func (kcc *KubernetesClusterCacheV2) GetAllReplicationControllers() []*cc.ReplicationController {
  113. return kcc.replicationControllerStore.GetAll()
  114. }
  115. func (kcc *KubernetesClusterCacheV2) GetAllReplicaSets() []*cc.ReplicaSet {
  116. return kcc.replicaSetStore.GetAll()
  117. }
  118. func (kcc *KubernetesClusterCacheV2) GetAllPodDisruptionBudgets() []*cc.PodDisruptionBudget {
  119. return kcc.pdbStore.GetAll()
  120. }
  121. func (kcc *KubernetesClusterCacheV2) GetAllResourceQuotas() []*cc.ResourceQuota {
  122. return kcc.resourceQuotasStore.GetAll()
  123. }