clustercache.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. stv1 "k8s.io/api/storage/v1"
  12. "k8s.io/apimachinery/pkg/fields"
  13. "k8s.io/client-go/kubernetes"
  14. )
  15. type Namespace struct {
  16. Name string
  17. Labels map[string]string
  18. Annotations map[string]string
  19. }
  20. func transformNamespace(input *v1.Namespace) *Namespace {
  21. return &Namespace{
  22. Name: input.Name,
  23. Annotations: input.Annotations,
  24. Labels: input.Labels,
  25. }
  26. }
  27. type Pod struct {
  28. UID types.UID
  29. Name string
  30. Namespace string
  31. Labels map[string]string
  32. Annotations map[string]string
  33. OwnerReferences []metav1.OwnerReference
  34. Status PodStatus
  35. Spec PodSpec
  36. }
  37. type PodStatus struct {
  38. Phase v1.PodPhase
  39. ContainerStatuses []v1.ContainerStatus
  40. }
  41. type PodSpec struct {
  42. NodeName string
  43. Containers []Container
  44. Volumes []v1.Volume
  45. }
  46. type Container struct {
  47. Name string
  48. Resources v1.ResourceRequirements
  49. }
  50. func transformPodContainer(input v1.Container) Container {
  51. return Container{
  52. Name: input.Name,
  53. Resources: input.Resources,
  54. }
  55. }
  56. func transformPodStatus(input v1.PodStatus) PodStatus {
  57. return PodStatus{
  58. Phase: input.Phase,
  59. ContainerStatuses: input.ContainerStatuses,
  60. }
  61. }
  62. func transformPodSpec(input v1.PodSpec) PodSpec {
  63. containers := make([]Container, len(input.Containers))
  64. for i, container := range input.Containers {
  65. containers[i] = transformPodContainer(container)
  66. }
  67. return PodSpec{
  68. NodeName: input.NodeName,
  69. Containers: containers,
  70. Volumes: input.Volumes,
  71. }
  72. }
  73. func transformPod(input *v1.Pod) *Pod {
  74. return &Pod{
  75. UID: input.UID,
  76. Name: input.Name,
  77. Namespace: input.Namespace,
  78. Labels: input.Labels,
  79. Annotations: input.Annotations,
  80. OwnerReferences: input.OwnerReferences,
  81. Spec: transformPodSpec(input.Spec),
  82. Status: transformPodStatus(input.Status),
  83. }
  84. }
  85. type Node struct {
  86. Name string
  87. Labels map[string]string
  88. Annotations map[string]string
  89. Status v1.NodeStatus
  90. SpecProviderID string
  91. }
  92. func transformNode(input *v1.Node) *Node {
  93. return &Node{
  94. Name: input.Name,
  95. Labels: input.Labels,
  96. Annotations: input.Annotations,
  97. Status: input.Status,
  98. SpecProviderID: input.Spec.ProviderID,
  99. }
  100. }
  101. type Service struct {
  102. Name string
  103. Namespace string
  104. Selector map[string]string
  105. Type v1.ServiceType
  106. Status v1.ServiceStatus
  107. }
  108. func transformService(input *v1.Service) *Service {
  109. return &Service{
  110. Name: input.Name,
  111. Namespace: input.Namespace,
  112. Selector: input.Spec.Selector,
  113. Type: input.Spec.Type,
  114. Status: input.Status,
  115. }
  116. }
  117. type DaemonSet struct {
  118. Name string
  119. Namespace string
  120. Labels map[string]string
  121. SpecContainers []v1.Container
  122. }
  123. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  124. return &DaemonSet{
  125. Name: input.Name,
  126. Namespace: input.Namespace,
  127. Labels: input.Labels,
  128. SpecContainers: input.Spec.Template.Spec.Containers,
  129. }
  130. }
  131. type Deployment struct {
  132. Name string
  133. Namespace string
  134. Labels map[string]string
  135. MatchLabels map[string]string
  136. SpecSelector *metav1.LabelSelector
  137. SpecReplicas *int32
  138. StatusAvailableReplicas int32
  139. }
  140. func transformDeployment(input *appsv1.Deployment) *Deployment {
  141. return &Deployment{
  142. Name: input.Name,
  143. Namespace: input.Namespace,
  144. Labels: input.Labels,
  145. MatchLabels: input.Spec.Selector.MatchLabels,
  146. SpecReplicas: input.Spec.Replicas,
  147. SpecSelector: input.Spec.Selector,
  148. StatusAvailableReplicas: input.Status.AvailableReplicas,
  149. }
  150. }
  151. type StatefulSet struct {
  152. Name string
  153. Namespace string
  154. SpecSelector *metav1.LabelSelector
  155. }
  156. func transformStatefulSet(input *appsv1.StatefulSet) *StatefulSet {
  157. return &StatefulSet{
  158. Name: input.Name,
  159. Namespace: input.Namespace,
  160. SpecSelector: input.Spec.Selector,
  161. }
  162. }
  163. type PersistentVolume struct {
  164. Name string
  165. Namespace string
  166. Labels map[string]string
  167. Annotations map[string]string
  168. Spec v1.PersistentVolumeSpec
  169. Status v1.PersistentVolumeStatus
  170. }
  171. func transformPersistentVolume(input *v1.PersistentVolume) *PersistentVolume {
  172. return &PersistentVolume{
  173. Name: input.Name,
  174. Namespace: input.Namespace,
  175. Labels: input.Labels,
  176. Annotations: input.Annotations,
  177. Spec: input.Spec,
  178. Status: input.Status,
  179. }
  180. }
  181. type PersistentVolumeClaim struct {
  182. Name string
  183. Namespace string
  184. Spec v1.PersistentVolumeClaimSpec
  185. Annotations map[string]string
  186. }
  187. func transformPersistentVolumeClaim(input *v1.PersistentVolumeClaim) *PersistentVolumeClaim {
  188. return &PersistentVolumeClaim{
  189. Name: input.Name,
  190. Namespace: input.Namespace,
  191. Spec: input.Spec,
  192. Annotations: input.Annotations,
  193. }
  194. }
  195. type StorageClass struct {
  196. Name string
  197. Annotations map[string]string
  198. Parameters map[string]string
  199. Provisioner string
  200. }
  201. func transformStorageClass(input *stv1.StorageClass) *StorageClass {
  202. return &StorageClass{
  203. Name: input.Name,
  204. Annotations: input.Annotations,
  205. Parameters: input.Parameters,
  206. Provisioner: input.Provisioner,
  207. }
  208. }
  209. type Job struct {
  210. Name string
  211. Namespace string
  212. Status batchv1.JobStatus
  213. }
  214. func transformJob(input *batchv1.Job) *Job {
  215. return &Job{
  216. Name: input.Name,
  217. Namespace: input.Namespace,
  218. Status: input.Status,
  219. }
  220. }
  221. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  222. // up to date resources using watchers
  223. type ClusterCache interface {
  224. // Run starts the watcher processes
  225. Run()
  226. // Stops the watcher processes
  227. Stop()
  228. // GetAllNamespaces returns all the cached namespaces
  229. GetAllNamespaces() []*Namespace
  230. // GetAllNodes returns all the cached nodes
  231. GetAllNodes() []*Node
  232. // GetAllPods returns all the cached pods
  233. GetAllPods() []*Pod
  234. // GetAllServices returns all the cached services
  235. GetAllServices() []*Service
  236. // GetAllDaemonSets returns all the cached DaemonSets
  237. GetAllDaemonSets() []*DaemonSet
  238. // GetAllDeployments returns all the cached deployments
  239. GetAllDeployments() []*Deployment
  240. // GetAllStatfulSets returns all the cached StatefulSets
  241. GetAllStatefulSets() []*StatefulSet
  242. // GetAllPersistentVolumes returns all the cached persistent volumes
  243. GetAllPersistentVolumes() []*PersistentVolume
  244. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  245. GetAllPersistentVolumeClaims() []*PersistentVolumeClaim
  246. // GetAllStorageClasses returns all the cached storage classes
  247. GetAllStorageClasses() []*StorageClass
  248. // GetAllJobs returns all the cached jobs
  249. GetAllJobs() []*Job
  250. // SetConfigMapUpdateFunc sets the configmap update function
  251. SetConfigMapUpdateFunc(func(interface{}))
  252. }
  253. // KubernetesClusterCache is the implementation of ClusterCache
  254. type KubernetesClusterCache struct {
  255. client kubernetes.Interface
  256. namespaceWatch WatchController
  257. nodeWatch WatchController
  258. podWatch WatchController
  259. kubecostConfigMapWatch WatchController
  260. serviceWatch WatchController
  261. daemonsetsWatch WatchController
  262. deploymentsWatch WatchController
  263. statefulsetWatch WatchController
  264. pvWatch WatchController
  265. pvcWatch WatchController
  266. storageClassWatch WatchController
  267. jobsWatch WatchController
  268. stop chan struct{}
  269. }
  270. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  271. defer wg.Done()
  272. wc.WarmUp(cancel)
  273. }
  274. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  275. return NewKubernetesClusterCache2(client)
  276. }
  277. func NewKubernetesClusterCacheV1(client kubernetes.Interface) ClusterCache {
  278. coreRestClient := client.CoreV1().RESTClient()
  279. appsRestClient := client.AppsV1().RESTClient()
  280. storageRestClient := client.StorageV1().RESTClient()
  281. batchClient := client.BatchV1().RESTClient()
  282. kubecostNamespace := env.GetKubecostNamespace()
  283. log.Infof("NAMESPACE: %s", kubecostNamespace)
  284. kcc := &KubernetesClusterCache{
  285. client: client,
  286. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  287. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  288. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  289. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  290. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  291. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  292. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  293. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  294. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  295. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  296. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  297. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  298. }
  299. // Wait for each caching watcher to initialize
  300. cancel := make(chan struct{})
  301. var wg sync.WaitGroup
  302. if env.IsETLReadOnlyMode() {
  303. wg.Add(1)
  304. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  305. } else {
  306. wg.Add(12)
  307. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  308. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  309. go initializeCache(kcc.nodeWatch, &wg, cancel)
  310. go initializeCache(kcc.podWatch, &wg, cancel)
  311. go initializeCache(kcc.serviceWatch, &wg, cancel)
  312. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  313. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  314. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  315. go initializeCache(kcc.pvWatch, &wg, cancel)
  316. go initializeCache(kcc.pvcWatch, &wg, cancel)
  317. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  318. go initializeCache(kcc.jobsWatch, &wg, cancel)
  319. }
  320. wg.Wait()
  321. log.Infof("Done waiting")
  322. return kcc
  323. }
  324. func (kcc *KubernetesClusterCache) Run() {
  325. if kcc.stop != nil {
  326. return
  327. }
  328. stopCh := make(chan struct{})
  329. go kcc.namespaceWatch.Run(1, stopCh)
  330. go kcc.nodeWatch.Run(1, stopCh)
  331. go kcc.podWatch.Run(1, stopCh)
  332. go kcc.serviceWatch.Run(1, stopCh)
  333. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  334. go kcc.daemonsetsWatch.Run(1, stopCh)
  335. go kcc.deploymentsWatch.Run(1, stopCh)
  336. go kcc.statefulsetWatch.Run(1, stopCh)
  337. go kcc.pvWatch.Run(1, stopCh)
  338. go kcc.pvcWatch.Run(1, stopCh)
  339. go kcc.storageClassWatch.Run(1, stopCh)
  340. go kcc.jobsWatch.Run(1, stopCh)
  341. kcc.stop = stopCh
  342. }
  343. func (kcc *KubernetesClusterCache) Stop() {
  344. if kcc.stop == nil {
  345. return
  346. }
  347. close(kcc.stop)
  348. kcc.stop = nil
  349. }
  350. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  351. var namespaces []*Namespace
  352. items := kcc.namespaceWatch.GetAll()
  353. for _, ns := range items {
  354. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  355. }
  356. return namespaces
  357. }
  358. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  359. var nodes []*Node
  360. items := kcc.nodeWatch.GetAll()
  361. for _, node := range items {
  362. nodes = append(nodes, transformNode(node.(*v1.Node)))
  363. }
  364. return nodes
  365. }
  366. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  367. var pods []*Pod
  368. items := kcc.podWatch.GetAll()
  369. for _, pod := range items {
  370. pods = append(pods, transformPod(pod.(*v1.Pod)))
  371. }
  372. return pods
  373. }
  374. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  375. var services []*Service
  376. items := kcc.serviceWatch.GetAll()
  377. for _, service := range items {
  378. services = append(services, transformService(service.(*v1.Service)))
  379. }
  380. return services
  381. }
  382. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  383. var daemonsets []*DaemonSet
  384. items := kcc.daemonsetsWatch.GetAll()
  385. for _, daemonset := range items {
  386. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  387. }
  388. return daemonsets
  389. }
  390. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  391. var deployments []*Deployment
  392. items := kcc.deploymentsWatch.GetAll()
  393. for _, deployment := range items {
  394. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  395. }
  396. return deployments
  397. }
  398. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
  399. var statefulsets []*StatefulSet
  400. items := kcc.statefulsetWatch.GetAll()
  401. for _, statefulset := range items {
  402. statefulsets = append(statefulsets, transformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  403. }
  404. return statefulsets
  405. }
  406. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*PersistentVolume {
  407. var pvs []*PersistentVolume
  408. items := kcc.pvWatch.GetAll()
  409. for _, pv := range items {
  410. pvs = append(pvs, transformPersistentVolume(pv.(*v1.PersistentVolume)))
  411. }
  412. return pvs
  413. }
  414. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
  415. var pvcs []*PersistentVolumeClaim
  416. items := kcc.pvcWatch.GetAll()
  417. for _, pvc := range items {
  418. pvcs = append(pvcs, transformPersistentVolumeClaim(pvc.(*v1.PersistentVolumeClaim)))
  419. }
  420. return pvcs
  421. }
  422. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*StorageClass {
  423. var storageClasses []*StorageClass
  424. items := kcc.storageClassWatch.GetAll()
  425. for _, stc := range items {
  426. storageClasses = append(storageClasses, transformStorageClass(stc.(*stv1.StorageClass)))
  427. }
  428. return storageClasses
  429. }
  430. func (kcc *KubernetesClusterCache) GetAllJobs() []*Job {
  431. var jobs []*Job
  432. items := kcc.jobsWatch.GetAll()
  433. for _, job := range items {
  434. jobs = append(jobs, transformJob(job.(*batchv1.Job)))
  435. }
  436. return jobs
  437. }
  438. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  439. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  440. }