clustercache.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. "k8s.io/apimachinery/pkg/fields"
  14. "k8s.io/client-go/kubernetes"
  15. )
  16. type Namespace struct {
  17. Name string
  18. Labels map[string]string
  19. Annotations map[string]string
  20. }
  21. func transformNamespace(input *v1.Namespace) *Namespace {
  22. return &Namespace{
  23. Name: input.Name,
  24. Annotations: input.Annotations,
  25. Labels: input.Labels,
  26. }
  27. }
  28. type Pod struct {
  29. UID types.UID
  30. Name string
  31. Namespace string
  32. Labels map[string]string
  33. Annotations map[string]string
  34. OwnerReferences []metav1.OwnerReference
  35. Status PodStatus
  36. Spec PodSpec
  37. }
  38. type PodStatus struct {
  39. Phase v1.PodPhase
  40. ContainerStatuses []v1.ContainerStatus
  41. }
  42. type PodSpec struct {
  43. NodeName string
  44. Containers []Container
  45. Volumes []v1.Volume
  46. }
  47. type Container struct {
  48. Name string
  49. Resources v1.ResourceRequirements
  50. }
  51. func transformPodContainer(input v1.Container) Container {
  52. return Container{
  53. Name: input.Name,
  54. Resources: input.Resources,
  55. }
  56. }
  57. func transformPodStatus(input v1.PodStatus) PodStatus {
  58. return PodStatus{
  59. Phase: input.Phase,
  60. ContainerStatuses: input.ContainerStatuses,
  61. }
  62. }
  63. func transformPodSpec(input v1.PodSpec) PodSpec {
  64. containers := make([]Container, len(input.Containers))
  65. for i, container := range input.Containers {
  66. containers[i] = transformPodContainer(container)
  67. }
  68. return PodSpec{
  69. NodeName: input.NodeName,
  70. Containers: containers,
  71. Volumes: input.Volumes,
  72. }
  73. }
  74. func transformPod(input *v1.Pod) *Pod {
  75. return &Pod{
  76. UID: input.UID,
  77. Name: input.Name,
  78. Namespace: input.Namespace,
  79. Labels: input.Labels,
  80. Annotations: input.Annotations,
  81. OwnerReferences: input.OwnerReferences,
  82. Spec: transformPodSpec(input.Spec),
  83. Status: transformPodStatus(input.Status),
  84. }
  85. }
  86. type Node struct {
  87. Name string
  88. Labels map[string]string
  89. Annotations map[string]string
  90. Status v1.NodeStatus
  91. SpecProviderID string
  92. }
  93. func transformNode(input *v1.Node) *Node {
  94. return &Node{
  95. Name: input.Name,
  96. Labels: input.Labels,
  97. Annotations: input.Annotations,
  98. Status: input.Status,
  99. SpecProviderID: input.Spec.ProviderID,
  100. }
  101. }
  102. type Service struct {
  103. Name string
  104. Namespace string
  105. Selector map[string]string
  106. Type v1.ServiceType
  107. Status v1.ServiceStatus
  108. }
  109. func transformService(input *v1.Service) *Service {
  110. return &Service{
  111. Name: input.Name,
  112. Namespace: input.Namespace,
  113. Selector: input.Spec.Selector,
  114. Type: input.Spec.Type,
  115. Status: input.Status,
  116. }
  117. }
  118. type DaemonSet struct {
  119. Name string
  120. Namespace string
  121. Labels map[string]string
  122. SpecContainers []v1.Container
  123. }
  124. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  125. return &DaemonSet{
  126. Name: input.Name,
  127. Namespace: input.Namespace,
  128. Labels: input.Labels,
  129. SpecContainers: input.Spec.Template.Spec.Containers,
  130. }
  131. }
  132. type Deployment struct {
  133. Name string
  134. Namespace string
  135. Labels map[string]string
  136. MatchLabels map[string]string
  137. SpecSelector *metav1.LabelSelector
  138. SpecReplicas *int32
  139. StatusAvailableReplicas int32
  140. }
  141. func transformDeployment(input *appsv1.Deployment) *Deployment {
  142. return &Deployment{
  143. Name: input.Name,
  144. Namespace: input.Namespace,
  145. Labels: input.Labels,
  146. MatchLabels: input.Spec.Selector.MatchLabels,
  147. SpecReplicas: input.Spec.Replicas,
  148. SpecSelector: input.Spec.Selector,
  149. StatusAvailableReplicas: input.Status.AvailableReplicas,
  150. }
  151. }
  152. type StatefulSet struct {
  153. Name string
  154. Namespace string
  155. SpecSelector *metav1.LabelSelector
  156. }
  157. func transformStatefulSet(input *appsv1.StatefulSet) *StatefulSet {
  158. return &StatefulSet{
  159. Name: input.Name,
  160. Namespace: input.Namespace,
  161. SpecSelector: input.Spec.Selector,
  162. }
  163. }
  164. type PersistentVolume struct {
  165. Name string
  166. Namespace string
  167. Labels map[string]string
  168. Annotations map[string]string
  169. Spec v1.PersistentVolumeSpec
  170. Status v1.PersistentVolumeStatus
  171. }
  172. func transformPersistentVolume(input *v1.PersistentVolume) *PersistentVolume {
  173. return &PersistentVolume{
  174. Name: input.Name,
  175. Namespace: input.Namespace,
  176. Labels: input.Labels,
  177. Annotations: input.Annotations,
  178. Spec: input.Spec,
  179. Status: input.Status,
  180. }
  181. }
  182. type PersistentVolumeClaim struct {
  183. Name string
  184. Namespace string
  185. Spec v1.PersistentVolumeClaimSpec
  186. Annotations map[string]string
  187. }
  188. func transformPersistentVolumeClaim(input *v1.PersistentVolumeClaim) *PersistentVolumeClaim {
  189. return &PersistentVolumeClaim{
  190. Name: input.Name,
  191. Namespace: input.Namespace,
  192. Spec: input.Spec,
  193. Annotations: input.Annotations,
  194. }
  195. }
  196. type StorageClass struct {
  197. Name string
  198. Annotations map[string]string
  199. Parameters map[string]string
  200. Provisioner string
  201. }
  202. func transformStorageClass(input *stv1.StorageClass) *StorageClass {
  203. return &StorageClass{
  204. Name: input.Name,
  205. Annotations: input.Annotations,
  206. Parameters: input.Parameters,
  207. Provisioner: input.Provisioner,
  208. }
  209. }
  210. type Job struct {
  211. Name string
  212. Namespace string
  213. Status batchv1.JobStatus
  214. }
  215. func transformJob(input *batchv1.Job) *Job {
  216. return &Job{
  217. Name: input.Name,
  218. Namespace: input.Namespace,
  219. Status: input.Status,
  220. }
  221. }
  222. type ReplicationController struct{}
  223. func transformReplicationController(input *v1.ReplicationController) *ReplicationController {
  224. return &ReplicationController{}
  225. }
  226. type PodDisruptionBudget struct{}
  227. func transformPodDisruptionBudget(input *policyv1.PodDisruptionBudget) *PodDisruptionBudget {
  228. return &PodDisruptionBudget{}
  229. }
  230. type ReplicaSet struct{}
  231. func transformReplicaSet(input *appsv1.ReplicaSet) *ReplicaSet {
  232. return &ReplicaSet{}
  233. }
  234. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  235. // up to date resources using watchers
  236. type ClusterCache interface {
  237. // Run starts the watcher processes
  238. Run()
  239. // Stops the watcher processes
  240. Stop()
  241. // GetAllNamespaces returns all the cached namespaces
  242. GetAllNamespaces() []*Namespace
  243. // GetAllNodes returns all the cached nodes
  244. GetAllNodes() []*Node
  245. // GetAllPods returns all the cached pods
  246. GetAllPods() []*Pod
  247. // GetAllServices returns all the cached services
  248. GetAllServices() []*Service
  249. // GetAllDaemonSets returns all the cached DaemonSets
  250. GetAllDaemonSets() []*DaemonSet
  251. // GetAllDeployments returns all the cached deployments
  252. GetAllDeployments() []*Deployment
  253. // GetAllStatfulSets returns all the cached StatefulSets
  254. GetAllStatefulSets() []*StatefulSet
  255. // GetAllReplicaSets returns all the cached ReplicaSets
  256. GetAllReplicaSets() []*ReplicaSet
  257. // GetAllPersistentVolumes returns all the cached persistent volumes
  258. GetAllPersistentVolumes() []*PersistentVolume
  259. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  260. GetAllPersistentVolumeClaims() []*PersistentVolumeClaim
  261. // GetAllStorageClasses returns all the cached storage classes
  262. GetAllStorageClasses() []*StorageClass
  263. // GetAllJobs returns all the cached jobs
  264. GetAllJobs() []*Job
  265. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  266. GetAllPodDisruptionBudgets() []*PodDisruptionBudget
  267. // GetAllReplicationControllers returns all cached replication controllers
  268. GetAllReplicationControllers() []*ReplicationController
  269. // SetConfigMapUpdateFunc sets the configmap update function
  270. SetConfigMapUpdateFunc(func(interface{}))
  271. }
  272. // KubernetesClusterCache is the implementation of ClusterCache
  273. type KubernetesClusterCache struct {
  274. client kubernetes.Interface
  275. namespaceWatch WatchController
  276. nodeWatch WatchController
  277. podWatch WatchController
  278. kubecostConfigMapWatch WatchController
  279. serviceWatch WatchController
  280. daemonsetsWatch WatchController
  281. deploymentsWatch WatchController
  282. statefulsetWatch WatchController
  283. replicasetWatch WatchController
  284. pvWatch WatchController
  285. pvcWatch WatchController
  286. storageClassWatch WatchController
  287. jobsWatch WatchController
  288. pdbWatch WatchController
  289. replicationControllerWatch WatchController
  290. stop chan struct{}
  291. }
  292. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  293. defer wg.Done()
  294. wc.WarmUp(cancel)
  295. }
  296. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  297. return NewKubernetesClusterCacheV2(client)
  298. }
  299. func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
  300. coreRestClient := client.CoreV1().RESTClient()
  301. appsRestClient := client.AppsV1().RESTClient()
  302. storageRestClient := client.StorageV1().RESTClient()
  303. batchClient := client.BatchV1().RESTClient()
  304. pdbClient := client.PolicyV1().RESTClient()
  305. kubecostNamespace := env.GetKubecostNamespace()
  306. log.Infof("NAMESPACE: %s", kubecostNamespace)
  307. kcc := &KubernetesClusterCache{
  308. client: client,
  309. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  310. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  311. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  312. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  313. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  314. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  315. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  316. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  317. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  318. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  319. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  320. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  321. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  322. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  323. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  324. }
  325. // Wait for each caching watcher to initialize
  326. cancel := make(chan struct{})
  327. var wg sync.WaitGroup
  328. if env.IsETLReadOnlyMode() {
  329. wg.Add(1)
  330. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  331. } else {
  332. wg.Add(15)
  333. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  334. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  335. go initializeCache(kcc.nodeWatch, &wg, cancel)
  336. go initializeCache(kcc.podWatch, &wg, cancel)
  337. go initializeCache(kcc.serviceWatch, &wg, cancel)
  338. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  339. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  340. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  341. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  342. go initializeCache(kcc.pvWatch, &wg, cancel)
  343. go initializeCache(kcc.pvcWatch, &wg, cancel)
  344. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  345. go initializeCache(kcc.jobsWatch, &wg, cancel)
  346. go initializeCache(kcc.pdbWatch, &wg, cancel)
  347. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  348. }
  349. wg.Wait()
  350. log.Infof("Done waiting")
  351. return kcc
  352. }
  353. func (kcc *KubernetesClusterCache) Run() {
  354. if kcc.stop != nil {
  355. return
  356. }
  357. stopCh := make(chan struct{})
  358. go kcc.namespaceWatch.Run(1, stopCh)
  359. go kcc.nodeWatch.Run(1, stopCh)
  360. go kcc.podWatch.Run(1, stopCh)
  361. go kcc.serviceWatch.Run(1, stopCh)
  362. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  363. go kcc.daemonsetsWatch.Run(1, stopCh)
  364. go kcc.deploymentsWatch.Run(1, stopCh)
  365. go kcc.statefulsetWatch.Run(1, stopCh)
  366. go kcc.replicasetWatch.Run(1, stopCh)
  367. go kcc.pvWatch.Run(1, stopCh)
  368. go kcc.pvcWatch.Run(1, stopCh)
  369. go kcc.storageClassWatch.Run(1, stopCh)
  370. go kcc.jobsWatch.Run(1, stopCh)
  371. go kcc.pdbWatch.Run(1, stopCh)
  372. go kcc.replicationControllerWatch.Run(1, stopCh)
  373. kcc.stop = stopCh
  374. }
  375. func (kcc *KubernetesClusterCache) Stop() {
  376. if kcc.stop == nil {
  377. return
  378. }
  379. close(kcc.stop)
  380. kcc.stop = nil
  381. }
  382. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  383. var namespaces []*Namespace
  384. items := kcc.namespaceWatch.GetAll()
  385. for _, ns := range items {
  386. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  387. }
  388. return namespaces
  389. }
  390. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  391. var nodes []*Node
  392. items := kcc.nodeWatch.GetAll()
  393. for _, node := range items {
  394. nodes = append(nodes, transformNode(node.(*v1.Node)))
  395. }
  396. return nodes
  397. }
  398. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  399. var pods []*Pod
  400. items := kcc.podWatch.GetAll()
  401. for _, pod := range items {
  402. pods = append(pods, transformPod(pod.(*v1.Pod)))
  403. }
  404. return pods
  405. }
  406. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  407. var services []*Service
  408. items := kcc.serviceWatch.GetAll()
  409. for _, service := range items {
  410. services = append(services, transformService(service.(*v1.Service)))
  411. }
  412. return services
  413. }
  414. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  415. var daemonsets []*DaemonSet
  416. items := kcc.daemonsetsWatch.GetAll()
  417. for _, daemonset := range items {
  418. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  419. }
  420. return daemonsets
  421. }
  422. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  423. var deployments []*Deployment
  424. items := kcc.deploymentsWatch.GetAll()
  425. for _, deployment := range items {
  426. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  427. }
  428. return deployments
  429. }
  430. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
  431. var statefulsets []*StatefulSet
  432. items := kcc.statefulsetWatch.GetAll()
  433. for _, statefulset := range items {
  434. statefulsets = append(statefulsets, transformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  435. }
  436. return statefulsets
  437. }
  438. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*ReplicaSet {
  439. var replicasets []*ReplicaSet
  440. items := kcc.replicasetWatch.GetAll()
  441. for _, replicaset := range items {
  442. replicasets = append(replicasets, transformReplicaSet(replicaset.(*appsv1.ReplicaSet)))
  443. }
  444. return replicasets
  445. }
  446. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
  447. var pvs []*PersistentVolume
  448. items := kcc.pvWatch.GetAll()
  449. for _, pv := range items {
  450. pvs = append(pvs, transformPersistentVolume(pv.(*v1.PersistentVolume)))
  451. }
  452. return pvs
  453. }
  454. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
  455. var pvcs []*PersistentVolumeClaim
  456. items := kcc.pvcWatch.GetAll()
  457. for _, pvc := range items {
  458. pvcs = append(pvcs, transformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  459. }
  460. return pvcs
  461. }
  462. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*StorageClass {
  463. var storageClasses []*StorageClass
  464. items := kcc.storageClassWatch.GetAll()
  465. for _, stc := range items {
  466. storageClasses = append(storageClasses, transformStorageClass(stc.(*stv1.StorageClass)))
  467. }
  468. return storageClasses
  469. }
  470. func (kcc *KubernetesClusterCache) GetAllJobs() []*Job {
  471. var jobs []*Job
  472. items := kcc.jobsWatch.GetAll()
  473. for _, job := range items {
  474. jobs = append(jobs, transformJob(job.(*batchv1.Job)))
  475. }
  476. return jobs
  477. }
  478. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
  479. var pdbs []*PodDisruptionBudget
  480. items := kcc.pdbWatch.GetAll()
  481. for _, pdb := range items {
  482. pdbs = append(pdbs, transformPodDisruptionBudget(pdb.(*policyv1.PodDisruptionBudget)))
  483. }
  484. return pdbs
  485. }
  486. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*ReplicationController {
  487. var rcs []*ReplicationController
  488. items := kcc.replicationControllerWatch.GetAll()
  489. for _, rc := range items {
  490. rcs = append(rcs, transformReplicationController(rc.(*v1.ReplicationController)))
  491. }
  492. return rcs
  493. }
  494. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  495. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  496. }