clustercache.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. "k8s.io/apimachinery/pkg/fields"
  14. "k8s.io/client-go/kubernetes"
  15. )
  16. type Namespace struct {
  17. Name string
  18. Labels map[string]string
  19. Annotations map[string]string
  20. }
  21. func transformNamespace(input *v1.Namespace) *Namespace {
  22. return &Namespace{
  23. Name: input.Name,
  24. Annotations: input.Annotations,
  25. Labels: input.Labels,
  26. }
  27. }
  28. type Pod struct {
  29. UID types.UID
  30. Name string
  31. Namespace string
  32. Labels map[string]string
  33. Annotations map[string]string
  34. OwnerReferences []metav1.OwnerReference
  35. Status PodStatus
  36. Spec PodSpec
  37. }
  38. type PodStatus struct {
  39. Phase v1.PodPhase
  40. ContainerStatuses []v1.ContainerStatus
  41. }
  42. type PodSpec struct {
  43. NodeName string
  44. Containers []Container
  45. Volumes []v1.Volume
  46. }
  47. type Container struct {
  48. Name string
  49. Resources v1.ResourceRequirements
  50. }
  51. func transformPodContainer(input v1.Container) Container {
  52. return Container{
  53. Name: input.Name,
  54. Resources: input.Resources,
  55. }
  56. }
  57. func transformPodStatus(input v1.PodStatus) PodStatus {
  58. return PodStatus{
  59. Phase: input.Phase,
  60. ContainerStatuses: input.ContainerStatuses,
  61. }
  62. }
  63. func transformPodSpec(input v1.PodSpec) PodSpec {
  64. containers := make([]Container, len(input.Containers))
  65. for i, container := range input.Containers {
  66. containers[i] = transformPodContainer(container)
  67. }
  68. return PodSpec{
  69. NodeName: input.NodeName,
  70. Containers: containers,
  71. Volumes: input.Volumes,
  72. }
  73. }
  74. func transformPod(input *v1.Pod) *Pod {
  75. return &Pod{
  76. UID: input.UID,
  77. Name: input.Name,
  78. Namespace: input.Namespace,
  79. Labels: input.Labels,
  80. Annotations: input.Annotations,
  81. OwnerReferences: input.OwnerReferences,
  82. Spec: transformPodSpec(input.Spec),
  83. Status: transformPodStatus(input.Status),
  84. }
  85. }
  86. type Node struct {
  87. Name string
  88. Labels map[string]string
  89. Annotations map[string]string
  90. Status v1.NodeStatus
  91. SpecProviderID string
  92. }
  93. func transformNode(input *v1.Node) *Node {
  94. return &Node{
  95. Name: input.Name,
  96. Labels: input.Labels,
  97. Annotations: input.Annotations,
  98. Status: input.Status,
  99. SpecProviderID: input.Spec.ProviderID,
  100. }
  101. }
  102. type Service struct {
  103. Name string
  104. Namespace string
  105. Selector map[string]string
  106. Type v1.ServiceType
  107. Status v1.ServiceStatus
  108. }
  109. func transformService(input *v1.Service) *Service {
  110. return &Service{
  111. Name: input.Name,
  112. Namespace: input.Namespace,
  113. Selector: input.Spec.Selector,
  114. Type: input.Spec.Type,
  115. Status: input.Status,
  116. }
  117. }
  118. type DaemonSet struct {
  119. Name string
  120. Namespace string
  121. Labels map[string]string
  122. SpecContainers []v1.Container
  123. }
  124. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  125. return &DaemonSet{
  126. Name: input.Name,
  127. Namespace: input.Namespace,
  128. Labels: input.Labels,
  129. SpecContainers: input.Spec.Template.Spec.Containers,
  130. }
  131. }
  132. type Deployment struct {
  133. Name string
  134. Namespace string
  135. Labels map[string]string
  136. MatchLabels map[string]string
  137. SpecSelector *metav1.LabelSelector
  138. SpecReplicas *int32
  139. StatusAvailableReplicas int32
  140. }
  141. func transformDeployment(input *appsv1.Deployment) *Deployment {
  142. return &Deployment{
  143. Name: input.Name,
  144. Namespace: input.Namespace,
  145. Labels: input.Labels,
  146. MatchLabels: input.Spec.Selector.MatchLabels,
  147. SpecReplicas: input.Spec.Replicas,
  148. SpecSelector: input.Spec.Selector,
  149. StatusAvailableReplicas: input.Status.AvailableReplicas,
  150. }
  151. }
  152. type StatefulSet struct {
  153. Name string
  154. Namespace string
  155. SpecSelector *metav1.LabelSelector
  156. }
  157. func transformStatefulSet(input *appsv1.StatefulSet) *StatefulSet {
  158. return &StatefulSet{
  159. Name: input.Name,
  160. Namespace: input.Namespace,
  161. SpecSelector: input.Spec.Selector,
  162. }
  163. }
  164. type PersistentVolume struct {
  165. Name string
  166. Namespace string
  167. Labels map[string]string
  168. Annotations map[string]string
  169. Spec v1.PersistentVolumeSpec
  170. Status v1.PersistentVolumeStatus
  171. }
  172. func transformPersistentVolume(input *v1.PersistentVolume) *PersistentVolume {
  173. return &PersistentVolume{
  174. Name: input.Name,
  175. Namespace: input.Namespace,
  176. Labels: input.Labels,
  177. Annotations: input.Annotations,
  178. Spec: input.Spec,
  179. Status: input.Status,
  180. }
  181. }
  182. type PersistentVolumeClaim struct {
  183. Name string
  184. Namespace string
  185. Spec v1.PersistentVolumeClaimSpec
  186. Annotations map[string]string
  187. }
  188. func transformPersistentVolumeClaim(input *v1.PersistentVolumeClaim) *PersistentVolumeClaim {
  189. return &PersistentVolumeClaim{
  190. Name: input.Name,
  191. Namespace: input.Namespace,
  192. Spec: input.Spec,
  193. Annotations: input.Annotations,
  194. }
  195. }
  196. type StorageClass struct {
  197. Name string
  198. Annotations map[string]string
  199. Parameters map[string]string
  200. Provisioner string
  201. }
  202. func transformStorageClass(input *stv1.StorageClass) *StorageClass {
  203. return &StorageClass{
  204. Name: input.Name,
  205. Annotations: input.Annotations,
  206. Parameters: input.Parameters,
  207. Provisioner: input.Provisioner,
  208. }
  209. }
  210. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  211. // up to date resources using watchers
  212. type ClusterCache interface {
  213. // Run starts the watcher processes
  214. Run()
  215. // Stops the watcher processes
  216. Stop()
  217. // GetAllNamespaces returns all the cached namespaces
  218. GetAllNamespaces() []*Namespace
  219. // GetAllNodes returns all the cached nodes
  220. GetAllNodes() []*Node
  221. // GetAllPods returns all the cached pods
  222. GetAllPods() []*Pod
  223. // GetAllServices returns all the cached services
  224. GetAllServices() []*Service
  225. // GetAllDaemonSets returns all the cached DaemonSets
  226. GetAllDaemonSets() []*DaemonSet
  227. // GetAllDeployments returns all the cached deployments
  228. GetAllDeployments() []*Deployment
  229. // GetAllStatfulSets returns all the cached StatefulSets
  230. GetAllStatefulSets() []*StatefulSet
  231. // GetAllPersistentVolumes returns all the cached persistent volumes
  232. GetAllPersistentVolumes() []*PersistentVolume
  233. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  234. GetAllPersistentVolumeClaims() []*PersistentVolumeClaim
  235. // GetAllStorageClasses returns all the cached storage classes
  236. GetAllStorageClasses() []*StorageClass
  237. // GetAllJobs returns all the cached jobs
  238. GetAllJobs() []*batchv1.Job
  239. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  240. GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget
  241. // SetConfigMapUpdateFunc sets the configmap update function
  242. SetConfigMapUpdateFunc(func(interface{}))
  243. }
  244. // KubernetesClusterCache is the implementation of ClusterCache
  245. type KubernetesClusterCache struct {
  246. client kubernetes.Interface
  247. namespaceWatch WatchController
  248. nodeWatch WatchController
  249. podWatch WatchController
  250. kubecostConfigMapWatch WatchController
  251. serviceWatch WatchController
  252. daemonsetsWatch WatchController
  253. deploymentsWatch WatchController
  254. statefulsetWatch WatchController
  255. pvWatch WatchController
  256. pvcWatch WatchController
  257. storageClassWatch WatchController
  258. jobsWatch WatchController
  259. pdbWatch WatchController
  260. stop chan struct{}
  261. }
  262. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  263. defer wg.Done()
  264. wc.WarmUp(cancel)
  265. }
  266. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  267. coreRestClient := client.CoreV1().RESTClient()
  268. appsRestClient := client.AppsV1().RESTClient()
  269. storageRestClient := client.StorageV1().RESTClient()
  270. batchClient := client.BatchV1().RESTClient()
  271. pdbClient := client.PolicyV1().RESTClient()
  272. kubecostNamespace := env.GetKubecostNamespace()
  273. log.Infof("NAMESPACE: %s", kubecostNamespace)
  274. kcc := &KubernetesClusterCache{
  275. client: client,
  276. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  277. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  278. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  279. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  280. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  281. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  282. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  283. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  284. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  285. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  286. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  287. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  288. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  289. }
  290. // Wait for each caching watcher to initialize
  291. cancel := make(chan struct{})
  292. var wg sync.WaitGroup
  293. if env.IsETLReadOnlyMode() {
  294. wg.Add(1)
  295. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  296. } else {
  297. wg.Add(13)
  298. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  299. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  300. go initializeCache(kcc.nodeWatch, &wg, cancel)
  301. go initializeCache(kcc.podWatch, &wg, cancel)
  302. go initializeCache(kcc.serviceWatch, &wg, cancel)
  303. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  304. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  305. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  306. go initializeCache(kcc.pvWatch, &wg, cancel)
  307. go initializeCache(kcc.pvcWatch, &wg, cancel)
  308. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  309. go initializeCache(kcc.jobsWatch, &wg, cancel)
  310. go initializeCache(kcc.pdbWatch, &wg, cancel)
  311. }
  312. wg.Wait()
  313. log.Infof("Done waiting")
  314. return kcc
  315. }
  316. func (kcc *KubernetesClusterCache) Run() {
  317. if kcc.stop != nil {
  318. return
  319. }
  320. stopCh := make(chan struct{})
  321. go kcc.namespaceWatch.Run(1, stopCh)
  322. go kcc.nodeWatch.Run(1, stopCh)
  323. go kcc.podWatch.Run(1, stopCh)
  324. go kcc.serviceWatch.Run(1, stopCh)
  325. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  326. go kcc.daemonsetsWatch.Run(1, stopCh)
  327. go kcc.deploymentsWatch.Run(1, stopCh)
  328. go kcc.statefulsetWatch.Run(1, stopCh)
  329. go kcc.pvWatch.Run(1, stopCh)
  330. go kcc.pvcWatch.Run(1, stopCh)
  331. go kcc.storageClassWatch.Run(1, stopCh)
  332. go kcc.jobsWatch.Run(1, stopCh)
  333. go kcc.pdbWatch.Run(1, stopCh)
  334. kcc.stop = stopCh
  335. }
  336. func (kcc *KubernetesClusterCache) Stop() {
  337. if kcc.stop == nil {
  338. return
  339. }
  340. close(kcc.stop)
  341. kcc.stop = nil
  342. }
  343. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  344. var namespaces []*Namespace
  345. items := kcc.namespaceWatch.GetAll()
  346. for _, ns := range items {
  347. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  348. }
  349. return namespaces
  350. }
  351. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  352. var nodes []*Node
  353. items := kcc.nodeWatch.GetAll()
  354. for _, node := range items {
  355. nodes = append(nodes, transformNode(node.(*v1.Node)))
  356. }
  357. return nodes
  358. }
  359. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  360. var pods []*Pod
  361. items := kcc.podWatch.GetAll()
  362. for _, pod := range items {
  363. pods = append(pods, transformPod(pod.(*v1.Pod)))
  364. }
  365. return pods
  366. }
  367. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  368. var services []*Service
  369. items := kcc.serviceWatch.GetAll()
  370. for _, service := range items {
  371. services = append(services, transformService(service.(*v1.Service)))
  372. }
  373. return services
  374. }
  375. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  376. var daemonsets []*DaemonSet
  377. items := kcc.daemonsetsWatch.GetAll()
  378. for _, daemonset := range items {
  379. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  380. }
  381. return daemonsets
  382. }
  383. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  384. var deployments []*Deployment
  385. items := kcc.deploymentsWatch.GetAll()
  386. for _, deployment := range items {
  387. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  388. }
  389. return deployments
  390. }
  391. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
  392. var statefulsets []*StatefulSet
  393. items := kcc.statefulsetWatch.GetAll()
  394. for _, statefulset := range items {
  395. statefulsets = append(statefulsets, transformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  396. }
  397. return statefulsets
  398. }
  399. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
  400. var pvs []*PersistentVolume
  401. items := kcc.pvWatch.GetAll()
  402. for _, pv := range items {
  403. pvs = append(pvs, transformPersistentVolume(pv.(*v1.PersistentVolume)))
  404. }
  405. return pvs
  406. }
  407. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
  408. var pvcs []*PersistentVolumeClaim
  409. items := kcc.pvcWatch.GetAll()
  410. for _, pvc := range items {
  411. pvcs = append(pvcs, transformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  412. }
  413. return pvcs
  414. }
  415. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*StorageClass {
  416. var storageClasses []*StorageClass
  417. items := kcc.storageClassWatch.GetAll()
  418. for _, stc := range items {
  419. storageClasses = append(storageClasses, transformStorageClass(stc.(*stv1.StorageClass)))
  420. }
  421. return storageClasses
  422. }
  423. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  424. var jobs []*batchv1.Job
  425. items := kcc.jobsWatch.GetAll()
  426. for _, job := range items {
  427. jobs = append(jobs, job.(*batchv1.Job))
  428. }
  429. return jobs
  430. }
  431. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
  432. var pdbs []*policyv1.PodDisruptionBudget
  433. items := kcc.pdbWatch.GetAll()
  434. for _, pdb := range items {
  435. pdbs = append(pdbs, pdb.(*policyv1.PodDisruptionBudget))
  436. }
  437. return pdbs
  438. }
  439. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  440. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  441. }