clustercache.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. "k8s.io/apimachinery/pkg/fields"
  14. "k8s.io/client-go/kubernetes"
  15. )
  16. type Namespace struct {
  17. Name string
  18. Labels map[string]string
  19. Annotations map[string]string
  20. }
  21. func transformNamespace(input *v1.Namespace) *Namespace {
  22. return &Namespace{
  23. Name: input.Name,
  24. Annotations: input.Annotations,
  25. Labels: input.Labels,
  26. }
  27. }
  28. type Pod struct {
  29. UID types.UID
  30. Name string
  31. Namespace string
  32. Labels map[string]string
  33. Annotations map[string]string
  34. OwnerReferences []metav1.OwnerReference
  35. Status PodStatus
  36. Spec PodSpec
  37. }
  38. type PodStatus struct {
  39. Phase v1.PodPhase
  40. ContainerStatuses []v1.ContainerStatus
  41. }
  42. type PodSpec struct {
  43. NodeName string
  44. Containers []Container
  45. Volumes []v1.Volume
  46. }
  47. type Container struct {
  48. Name string
  49. Resources v1.ResourceRequirements
  50. }
  51. func transformPodContainer(input v1.Container) Container {
  52. return Container{
  53. Name: input.Name,
  54. Resources: input.Resources,
  55. }
  56. }
  57. func transformPodStatus(input v1.PodStatus) PodStatus {
  58. return PodStatus{
  59. Phase: input.Phase,
  60. ContainerStatuses: input.ContainerStatuses,
  61. }
  62. }
  63. func transformPodSpec(input v1.PodSpec) PodSpec {
  64. containers := make([]Container, len(input.Containers))
  65. for i, container := range input.Containers {
  66. containers[i] = transformPodContainer(container)
  67. }
  68. return PodSpec{
  69. NodeName: input.NodeName,
  70. Containers: containers,
  71. Volumes: input.Volumes,
  72. }
  73. }
  74. func transformPod(input *v1.Pod) *Pod {
  75. return &Pod{
  76. UID: input.UID,
  77. Name: input.Name,
  78. Namespace: input.Namespace,
  79. Labels: input.Labels,
  80. Annotations: input.Annotations,
  81. OwnerReferences: input.OwnerReferences,
  82. Spec: transformPodSpec(input.Spec),
  83. Status: transformPodStatus(input.Status),
  84. }
  85. }
  86. type Node struct {
  87. Name string
  88. Labels map[string]string
  89. Annotations map[string]string
  90. Status v1.NodeStatus
  91. SpecProviderID string
  92. }
  93. func transformNode(input *v1.Node) *Node {
  94. return &Node{
  95. Name: input.Name,
  96. Labels: input.Labels,
  97. Annotations: input.Annotations,
  98. Status: input.Status,
  99. SpecProviderID: input.Spec.ProviderID,
  100. }
  101. }
  102. type Service struct {
  103. Name string
  104. Namespace string
  105. Selector map[string]string
  106. Type v1.ServiceType
  107. Status v1.ServiceStatus
  108. }
  109. func transformService(input *v1.Service) *Service {
  110. return &Service{
  111. Name: input.Name,
  112. Namespace: input.Namespace,
  113. Selector: input.Spec.Selector,
  114. Type: input.Spec.Type,
  115. Status: input.Status,
  116. }
  117. }
  118. type DaemonSet struct {
  119. Name string
  120. Namespace string
  121. Labels map[string]string
  122. SpecContainers []v1.Container
  123. }
  124. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  125. return &DaemonSet{
  126. Name: input.Name,
  127. Namespace: input.Namespace,
  128. Labels: input.Labels,
  129. SpecContainers: input.Spec.Template.Spec.Containers,
  130. }
  131. }
  132. type Deployment struct {
  133. Name string
  134. Namespace string
  135. Labels map[string]string
  136. MatchLabels map[string]string
  137. SpecSelector *metav1.LabelSelector
  138. SpecReplicas *int32
  139. StatusAvailableReplicas int32
  140. }
  141. func transformDeployment(input *appsv1.Deployment) *Deployment {
  142. return &Deployment{
  143. Name: input.Name,
  144. Namespace: input.Namespace,
  145. Labels: input.Labels,
  146. MatchLabels: input.Spec.Selector.MatchLabels,
  147. SpecReplicas: input.Spec.Replicas,
  148. SpecSelector: input.Spec.Selector,
  149. StatusAvailableReplicas: input.Status.AvailableReplicas,
  150. }
  151. }
  152. type StatefulSet struct {
  153. Name string
  154. Namespace string
  155. SpecSelector *metav1.LabelSelector
  156. }
  157. func transformStatefulSet(input *appsv1.StatefulSet) *StatefulSet {
  158. return &StatefulSet{
  159. Name: input.Name,
  160. Namespace: input.Namespace,
  161. SpecSelector: input.Spec.Selector,
  162. }
  163. }
  164. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  165. // up to date resources using watchers
  166. type ClusterCache interface {
  167. // Run starts the watcher processes
  168. Run()
  169. // Stops the watcher processes
  170. Stop()
  171. // GetAllNamespaces returns all the cached namespaces
  172. GetAllNamespaces() []*Namespace
  173. // GetAllNodes returns all the cached nodes
  174. GetAllNodes() []*Node
  175. // GetAllPods returns all the cached pods
  176. GetAllPods() []*Pod
  177. // GetAllServices returns all the cached services
  178. GetAllServices() []*Service
  179. // GetAllDaemonSets returns all the cached DaemonSets
  180. GetAllDaemonSets() []*DaemonSet
  181. // GetAllDeployments returns all the cached deployments
  182. GetAllDeployments() []*Deployment
  183. // GetAllStatfulSets returns all the cached StatefulSets
  184. GetAllStatefulSets() []*StatefulSet
  185. // GetAllReplicaSets returns all the cached ReplicaSets
  186. GetAllReplicaSets() []*appsv1.ReplicaSet
  187. // GetAllPersistentVolumes returns all the cached persistent volumes
  188. GetAllPersistentVolumes() []*v1.PersistentVolume
  189. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  190. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  191. // GetAllStorageClasses returns all the cached storage classes
  192. GetAllStorageClasses() []*stv1.StorageClass
  193. // GetAllJobs returns all the cached jobs
  194. GetAllJobs() []*batchv1.Job
  195. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  196. GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget
  197. // GetAllReplicationControllers returns all cached replication controllers
  198. GetAllReplicationControllers() []*v1.ReplicationController
  199. // SetConfigMapUpdateFunc sets the configmap update function
  200. SetConfigMapUpdateFunc(func(interface{}))
  201. }
  202. // KubernetesClusterCache is the implementation of ClusterCache
  203. type KubernetesClusterCache struct {
  204. client kubernetes.Interface
  205. namespaceWatch WatchController
  206. nodeWatch WatchController
  207. podWatch WatchController
  208. kubecostConfigMapWatch WatchController
  209. serviceWatch WatchController
  210. daemonsetsWatch WatchController
  211. deploymentsWatch WatchController
  212. statefulsetWatch WatchController
  213. replicasetWatch WatchController
  214. pvWatch WatchController
  215. pvcWatch WatchController
  216. storageClassWatch WatchController
  217. jobsWatch WatchController
  218. pdbWatch WatchController
  219. replicationControllerWatch WatchController
  220. stop chan struct{}
  221. }
  222. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  223. defer wg.Done()
  224. wc.WarmUp(cancel)
  225. }
  226. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  227. coreRestClient := client.CoreV1().RESTClient()
  228. appsRestClient := client.AppsV1().RESTClient()
  229. storageRestClient := client.StorageV1().RESTClient()
  230. batchClient := client.BatchV1().RESTClient()
  231. pdbClient := client.PolicyV1().RESTClient()
  232. kubecostNamespace := env.GetKubecostNamespace()
  233. log.Infof("NAMESPACE: %s", kubecostNamespace)
  234. kcc := &KubernetesClusterCache{
  235. client: client,
  236. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  237. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  238. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  239. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  240. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  241. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  242. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  243. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  244. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  245. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  246. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  247. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  248. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  249. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  250. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  251. }
  252. // Wait for each caching watcher to initialize
  253. cancel := make(chan struct{})
  254. var wg sync.WaitGroup
  255. if env.IsETLReadOnlyMode() {
  256. wg.Add(1)
  257. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  258. } else {
  259. wg.Add(15)
  260. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  261. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  262. go initializeCache(kcc.nodeWatch, &wg, cancel)
  263. go initializeCache(kcc.podWatch, &wg, cancel)
  264. go initializeCache(kcc.serviceWatch, &wg, cancel)
  265. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  266. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  267. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  268. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  269. go initializeCache(kcc.pvWatch, &wg, cancel)
  270. go initializeCache(kcc.pvcWatch, &wg, cancel)
  271. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  272. go initializeCache(kcc.jobsWatch, &wg, cancel)
  273. go initializeCache(kcc.pdbWatch, &wg, cancel)
  274. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  275. }
  276. wg.Wait()
  277. log.Infof("Done waiting")
  278. return kcc
  279. }
  280. func (kcc *KubernetesClusterCache) Run() {
  281. if kcc.stop != nil {
  282. return
  283. }
  284. stopCh := make(chan struct{})
  285. go kcc.namespaceWatch.Run(1, stopCh)
  286. go kcc.nodeWatch.Run(1, stopCh)
  287. go kcc.podWatch.Run(1, stopCh)
  288. go kcc.serviceWatch.Run(1, stopCh)
  289. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  290. go kcc.daemonsetsWatch.Run(1, stopCh)
  291. go kcc.deploymentsWatch.Run(1, stopCh)
  292. go kcc.statefulsetWatch.Run(1, stopCh)
  293. go kcc.replicasetWatch.Run(1, stopCh)
  294. go kcc.pvWatch.Run(1, stopCh)
  295. go kcc.pvcWatch.Run(1, stopCh)
  296. go kcc.storageClassWatch.Run(1, stopCh)
  297. go kcc.jobsWatch.Run(1, stopCh)
  298. go kcc.pdbWatch.Run(1, stopCh)
  299. go kcc.replicationControllerWatch.Run(1, stopCh)
  300. kcc.stop = stopCh
  301. }
  302. func (kcc *KubernetesClusterCache) Stop() {
  303. if kcc.stop == nil {
  304. return
  305. }
  306. close(kcc.stop)
  307. kcc.stop = nil
  308. }
  309. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  310. var namespaces []*Namespace
  311. items := kcc.namespaceWatch.GetAll()
  312. for _, ns := range items {
  313. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  314. }
  315. return namespaces
  316. }
  317. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  318. var nodes []*Node
  319. items := kcc.nodeWatch.GetAll()
  320. for _, node := range items {
  321. nodes = append(nodes, transformNode(node.(*v1.Node)))
  322. }
  323. return nodes
  324. }
  325. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  326. var pods []*Pod
  327. items := kcc.podWatch.GetAll()
  328. for _, pod := range items {
  329. pods = append(pods, transformPod(pod.(*v1.Pod)))
  330. }
  331. return pods
  332. }
  333. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  334. var services []*Service
  335. items := kcc.serviceWatch.GetAll()
  336. for _, service := range items {
  337. services = append(services, transformService(service.(*v1.Service)))
  338. }
  339. return services
  340. }
  341. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  342. var daemonsets []*DaemonSet
  343. items := kcc.daemonsetsWatch.GetAll()
  344. for _, daemonset := range items {
  345. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  346. }
  347. return daemonsets
  348. }
  349. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  350. var deployments []*Deployment
  351. items := kcc.deploymentsWatch.GetAll()
  352. for _, deployment := range items {
  353. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  354. }
  355. return deployments
  356. }
  357. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*StatefulSet {
  358. var statefulsets []*StatefulSet
  359. items := kcc.statefulsetWatch.GetAll()
  360. for _, statefulset := range items {
  361. statefulsets = append(statefulsets, transformStatefulSet(statefulset.(*appsv1.StatefulSet)))
  362. }
  363. return statefulsets
  364. }
  365. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  366. var replicasets []*appsv1.ReplicaSet
  367. items := kcc.replicasetWatch.GetAll()
  368. for _, replicaset := range items {
  369. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  370. }
  371. return replicasets
  372. }
  373. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  374. var pvs []*v1.PersistentVolume
  375. items := kcc.pvWatch.GetAll()
  376. for _, pv := range items {
  377. pvs = append(pvs, pv.(*v1.PersistentVolume))
  378. }
  379. return pvs
  380. }
  381. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  382. var pvcs []*v1.PersistentVolumeClaim
  383. items := kcc.pvcWatch.GetAll()
  384. for _, pvc := range items {
  385. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  386. }
  387. return pvcs
  388. }
  389. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  390. var storageClasses []*stv1.StorageClass
  391. items := kcc.storageClassWatch.GetAll()
  392. for _, stc := range items {
  393. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  394. }
  395. return storageClasses
  396. }
  397. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  398. var jobs []*batchv1.Job
  399. items := kcc.jobsWatch.GetAll()
  400. for _, job := range items {
  401. jobs = append(jobs, job.(*batchv1.Job))
  402. }
  403. return jobs
  404. }
  405. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
  406. var pdbs []*policyv1.PodDisruptionBudget
  407. items := kcc.pdbWatch.GetAll()
  408. for _, pdb := range items {
  409. pdbs = append(pdbs, pdb.(*policyv1.PodDisruptionBudget))
  410. }
  411. return pdbs
  412. }
  413. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
  414. var rcs []*v1.ReplicationController
  415. items := kcc.replicationControllerWatch.GetAll()
  416. for _, rc := range items {
  417. rcs = append(rcs, rc.(*v1.ReplicationController))
  418. }
  419. return rcs
  420. }
  421. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  422. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  423. }