2
0

clustercache.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/pkg/env"
  6. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  7. "k8s.io/apimachinery/pkg/types"
  8. appsv1 "k8s.io/api/apps/v1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. policyv1 "k8s.io/api/policy/v1"
  12. stv1 "k8s.io/api/storage/v1"
  13. "k8s.io/apimachinery/pkg/fields"
  14. "k8s.io/client-go/kubernetes"
  15. )
  16. type Namespace struct {
  17. Name string
  18. Labels map[string]string
  19. Annotations map[string]string
  20. }
  21. func transformNamespace(input *v1.Namespace) *Namespace {
  22. return &Namespace{
  23. Name: input.Name,
  24. Annotations: input.Annotations,
  25. Labels: input.Labels,
  26. }
  27. }
  28. type Pod struct {
  29. UID types.UID
  30. Name string
  31. Namespace string
  32. Labels map[string]string
  33. Annotations map[string]string
  34. OwnerReferences []metav1.OwnerReference
  35. Status PodStatus
  36. Spec PodSpec
  37. }
  38. type PodStatus struct {
  39. Phase v1.PodPhase
  40. ContainerStatuses []v1.ContainerStatus
  41. }
  42. type PodSpec struct {
  43. NodeName string
  44. Containers []Container
  45. Volumes []v1.Volume
  46. }
  47. type Container struct {
  48. Name string
  49. Resources v1.ResourceRequirements
  50. }
  51. func transformPodContainer(input v1.Container) Container {
  52. return Container{
  53. Name: input.Name,
  54. Resources: input.Resources,
  55. }
  56. }
  57. func transformPodStatus(input v1.PodStatus) PodStatus {
  58. return PodStatus{
  59. Phase: input.Phase,
  60. ContainerStatuses: input.ContainerStatuses,
  61. }
  62. }
  63. func transformPodSpec(input v1.PodSpec) PodSpec {
  64. containers := make([]Container, len(input.Containers))
  65. for i, container := range input.Containers {
  66. containers[i] = transformPodContainer(container)
  67. }
  68. return PodSpec{
  69. NodeName: input.NodeName,
  70. Containers: containers,
  71. Volumes: input.Volumes,
  72. }
  73. }
  74. func transformPod(input *v1.Pod) *Pod {
  75. return &Pod{
  76. UID: input.UID,
  77. Name: input.Name,
  78. Namespace: input.Namespace,
  79. Labels: input.Labels,
  80. Annotations: input.Annotations,
  81. OwnerReferences: input.OwnerReferences,
  82. Spec: transformPodSpec(input.Spec),
  83. Status: transformPodStatus(input.Status),
  84. }
  85. }
  86. type Node struct {
  87. Name string
  88. Labels map[string]string
  89. Annotations map[string]string
  90. Status v1.NodeStatus
  91. SpecProviderID string
  92. }
  93. func transformNode(input *v1.Node) *Node {
  94. return &Node{
  95. Name: input.Name,
  96. Labels: input.Labels,
  97. Annotations: input.Annotations,
  98. Status: input.Status,
  99. SpecProviderID: input.Spec.ProviderID,
  100. }
  101. }
  102. type Service struct {
  103. Name string
  104. Namespace string
  105. Selector map[string]string
  106. Type v1.ServiceType
  107. Status v1.ServiceStatus
  108. }
  109. func transformService(input *v1.Service) *Service {
  110. return &Service{
  111. Name: input.Name,
  112. Namespace: input.Namespace,
  113. Selector: input.Spec.Selector,
  114. Type: input.Spec.Type,
  115. Status: input.Status,
  116. }
  117. }
  118. type DaemonSet struct {
  119. Name string
  120. Namespace string
  121. Labels map[string]string
  122. SpecContainers []v1.Container
  123. }
  124. func transformDaemonSet(input *appsv1.DaemonSet) *DaemonSet {
  125. return &DaemonSet{
  126. Name: input.Name,
  127. Namespace: input.Namespace,
  128. Labels: input.Labels,
  129. SpecContainers: input.Spec.Template.Spec.Containers,
  130. }
  131. }
  132. type Deployment struct {
  133. Name string
  134. Namespace string
  135. Labels map[string]string
  136. MatchLabels map[string]string
  137. SpecSelector *metav1.LabelSelector
  138. SpecReplicas *int32
  139. StatusAvailableReplicas int32
  140. }
  141. func transformDeployment(input *appsv1.Deployment) *Deployment {
  142. return &Deployment{
  143. Name: input.Name,
  144. Namespace: input.Namespace,
  145. Labels: input.Labels,
  146. MatchLabels: input.Spec.Selector.MatchLabels,
  147. SpecReplicas: input.Spec.Replicas,
  148. SpecSelector: input.Spec.Selector,
  149. StatusAvailableReplicas: input.Status.AvailableReplicas,
  150. }
  151. }
  152. // ClusterCache defines an contract for an object which caches components within a cluster, ensuring
  153. // up to date resources using watchers
  154. type ClusterCache interface {
  155. // Run starts the watcher processes
  156. Run()
  157. // Stops the watcher processes
  158. Stop()
  159. // GetAllNamespaces returns all the cached namespaces
  160. GetAllNamespaces() []*Namespace
  161. // GetAllNodes returns all the cached nodes
  162. GetAllNodes() []*Node
  163. // GetAllPods returns all the cached pods
  164. GetAllPods() []*Pod
  165. // GetAllServices returns all the cached services
  166. GetAllServices() []*Service
  167. // GetAllDaemonSets returns all the cached DaemonSets
  168. GetAllDaemonSets() []*DaemonSet
  169. // GetAllDeployments returns all the cached deployments
  170. GetAllDeployments() []*Deployment
  171. // GetAllStatfulSets returns all the cached StatefulSets
  172. GetAllStatefulSets() []*appsv1.StatefulSet
  173. // GetAllReplicaSets returns all the cached ReplicaSets
  174. GetAllReplicaSets() []*appsv1.ReplicaSet
  175. // GetAllPersistentVolumes returns all the cached persistent volumes
  176. GetAllPersistentVolumes() []*v1.PersistentVolume
  177. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  178. GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim
  179. // GetAllStorageClasses returns all the cached storage classes
  180. GetAllStorageClasses() []*stv1.StorageClass
  181. // GetAllJobs returns all the cached jobs
  182. GetAllJobs() []*batchv1.Job
  183. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  184. GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget
  185. // GetAllReplicationControllers returns all cached replication controllers
  186. GetAllReplicationControllers() []*v1.ReplicationController
  187. // SetConfigMapUpdateFunc sets the configmap update function
  188. SetConfigMapUpdateFunc(func(interface{}))
  189. }
  190. // KubernetesClusterCache is the implementation of ClusterCache
  191. type KubernetesClusterCache struct {
  192. client kubernetes.Interface
  193. namespaceWatch WatchController
  194. nodeWatch WatchController
  195. podWatch WatchController
  196. kubecostConfigMapWatch WatchController
  197. serviceWatch WatchController
  198. daemonsetsWatch WatchController
  199. deploymentsWatch WatchController
  200. statefulsetWatch WatchController
  201. replicasetWatch WatchController
  202. pvWatch WatchController
  203. pvcWatch WatchController
  204. storageClassWatch WatchController
  205. jobsWatch WatchController
  206. pdbWatch WatchController
  207. replicationControllerWatch WatchController
  208. stop chan struct{}
  209. }
  210. func initializeCache(wc WatchController, wg *sync.WaitGroup, cancel chan struct{}) {
  211. defer wg.Done()
  212. wc.WarmUp(cancel)
  213. }
  214. func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
  215. coreRestClient := client.CoreV1().RESTClient()
  216. appsRestClient := client.AppsV1().RESTClient()
  217. storageRestClient := client.StorageV1().RESTClient()
  218. batchClient := client.BatchV1().RESTClient()
  219. pdbClient := client.PolicyV1().RESTClient()
  220. kubecostNamespace := env.GetKubecostNamespace()
  221. log.Infof("NAMESPACE: %s", kubecostNamespace)
  222. kcc := &KubernetesClusterCache{
  223. client: client,
  224. namespaceWatch: NewCachingWatcher(coreRestClient, "namespaces", &v1.Namespace{}, "", fields.Everything()),
  225. nodeWatch: NewCachingWatcher(coreRestClient, "nodes", &v1.Node{}, "", fields.Everything()),
  226. podWatch: NewCachingWatcher(coreRestClient, "pods", &v1.Pod{}, "", fields.Everything()),
  227. kubecostConfigMapWatch: NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, kubecostNamespace, fields.Everything()),
  228. serviceWatch: NewCachingWatcher(coreRestClient, "services", &v1.Service{}, "", fields.Everything()),
  229. daemonsetsWatch: NewCachingWatcher(appsRestClient, "daemonsets", &appsv1.DaemonSet{}, "", fields.Everything()),
  230. deploymentsWatch: NewCachingWatcher(appsRestClient, "deployments", &appsv1.Deployment{}, "", fields.Everything()),
  231. statefulsetWatch: NewCachingWatcher(appsRestClient, "statefulsets", &appsv1.StatefulSet{}, "", fields.Everything()),
  232. replicasetWatch: NewCachingWatcher(appsRestClient, "replicasets", &appsv1.ReplicaSet{}, "", fields.Everything()),
  233. pvWatch: NewCachingWatcher(coreRestClient, "persistentvolumes", &v1.PersistentVolume{}, "", fields.Everything()),
  234. pvcWatch: NewCachingWatcher(coreRestClient, "persistentvolumeclaims", &v1.PersistentVolumeClaim{}, "", fields.Everything()),
  235. storageClassWatch: NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
  236. jobsWatch: NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
  237. pdbWatch: NewCachingWatcher(pdbClient, "poddisruptionbudgets", &policyv1.PodDisruptionBudget{}, "", fields.Everything()),
  238. replicationControllerWatch: NewCachingWatcher(coreRestClient, "replicationcontrollers", &v1.ReplicationController{}, "", fields.Everything()),
  239. }
  240. // Wait for each caching watcher to initialize
  241. cancel := make(chan struct{})
  242. var wg sync.WaitGroup
  243. if env.IsETLReadOnlyMode() {
  244. wg.Add(1)
  245. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  246. } else {
  247. wg.Add(15)
  248. go initializeCache(kcc.kubecostConfigMapWatch, &wg, cancel)
  249. go initializeCache(kcc.namespaceWatch, &wg, cancel)
  250. go initializeCache(kcc.nodeWatch, &wg, cancel)
  251. go initializeCache(kcc.podWatch, &wg, cancel)
  252. go initializeCache(kcc.serviceWatch, &wg, cancel)
  253. go initializeCache(kcc.daemonsetsWatch, &wg, cancel)
  254. go initializeCache(kcc.deploymentsWatch, &wg, cancel)
  255. go initializeCache(kcc.statefulsetWatch, &wg, cancel)
  256. go initializeCache(kcc.replicasetWatch, &wg, cancel)
  257. go initializeCache(kcc.pvWatch, &wg, cancel)
  258. go initializeCache(kcc.pvcWatch, &wg, cancel)
  259. go initializeCache(kcc.storageClassWatch, &wg, cancel)
  260. go initializeCache(kcc.jobsWatch, &wg, cancel)
  261. go initializeCache(kcc.pdbWatch, &wg, cancel)
  262. go initializeCache(kcc.replicationControllerWatch, &wg, cancel)
  263. }
  264. wg.Wait()
  265. log.Infof("Done waiting")
  266. return kcc
  267. }
  268. func (kcc *KubernetesClusterCache) Run() {
  269. if kcc.stop != nil {
  270. return
  271. }
  272. stopCh := make(chan struct{})
  273. go kcc.namespaceWatch.Run(1, stopCh)
  274. go kcc.nodeWatch.Run(1, stopCh)
  275. go kcc.podWatch.Run(1, stopCh)
  276. go kcc.serviceWatch.Run(1, stopCh)
  277. go kcc.kubecostConfigMapWatch.Run(1, stopCh)
  278. go kcc.daemonsetsWatch.Run(1, stopCh)
  279. go kcc.deploymentsWatch.Run(1, stopCh)
  280. go kcc.statefulsetWatch.Run(1, stopCh)
  281. go kcc.replicasetWatch.Run(1, stopCh)
  282. go kcc.pvWatch.Run(1, stopCh)
  283. go kcc.pvcWatch.Run(1, stopCh)
  284. go kcc.storageClassWatch.Run(1, stopCh)
  285. go kcc.jobsWatch.Run(1, stopCh)
  286. go kcc.pdbWatch.Run(1, stopCh)
  287. go kcc.replicationControllerWatch.Run(1, stopCh)
  288. kcc.stop = stopCh
  289. }
  290. func (kcc *KubernetesClusterCache) Stop() {
  291. if kcc.stop == nil {
  292. return
  293. }
  294. close(kcc.stop)
  295. kcc.stop = nil
  296. }
  297. func (kcc *KubernetesClusterCache) GetAllNamespaces() []*Namespace {
  298. var namespaces []*Namespace
  299. items := kcc.namespaceWatch.GetAll()
  300. for _, ns := range items {
  301. namespaces = append(namespaces, transformNamespace(ns.(*v1.Namespace)))
  302. }
  303. return namespaces
  304. }
  305. func (kcc *KubernetesClusterCache) GetAllNodes() []*Node {
  306. var nodes []*Node
  307. items := kcc.nodeWatch.GetAll()
  308. for _, node := range items {
  309. nodes = append(nodes, transformNode(node.(*v1.Node)))
  310. }
  311. return nodes
  312. }
  313. func (kcc *KubernetesClusterCache) GetAllPods() []*Pod {
  314. var pods []*Pod
  315. items := kcc.podWatch.GetAll()
  316. for _, pod := range items {
  317. pods = append(pods, transformPod(pod.(*v1.Pod)))
  318. }
  319. return pods
  320. }
  321. func (kcc *KubernetesClusterCache) GetAllServices() []*Service {
  322. var services []*Service
  323. items := kcc.serviceWatch.GetAll()
  324. for _, service := range items {
  325. services = append(services, transformService(service.(*v1.Service)))
  326. }
  327. return services
  328. }
  329. func (kcc *KubernetesClusterCache) GetAllDaemonSets() []*DaemonSet {
  330. var daemonsets []*DaemonSet
  331. items := kcc.daemonsetsWatch.GetAll()
  332. for _, daemonset := range items {
  333. daemonsets = append(daemonsets, transformDaemonSet(daemonset.(*appsv1.DaemonSet)))
  334. }
  335. return daemonsets
  336. }
  337. func (kcc *KubernetesClusterCache) GetAllDeployments() []*Deployment {
  338. var deployments []*Deployment
  339. items := kcc.deploymentsWatch.GetAll()
  340. for _, deployment := range items {
  341. deployments = append(deployments, transformDeployment(deployment.(*appsv1.Deployment)))
  342. }
  343. return deployments
  344. }
  345. func (kcc *KubernetesClusterCache) GetAllStatefulSets() []*appsv1.StatefulSet {
  346. var statefulsets []*appsv1.StatefulSet
  347. items := kcc.statefulsetWatch.GetAll()
  348. for _, statefulset := range items {
  349. statefulsets = append(statefulsets, statefulset.(*appsv1.StatefulSet))
  350. }
  351. return statefulsets
  352. }
  353. func (kcc *KubernetesClusterCache) GetAllReplicaSets() []*appsv1.ReplicaSet {
  354. var replicasets []*appsv1.ReplicaSet
  355. items := kcc.replicasetWatch.GetAll()
  356. for _, replicaset := range items {
  357. replicasets = append(replicasets, replicaset.(*appsv1.ReplicaSet))
  358. }
  359. return replicasets
  360. }
  361. func (kcc *KubernetesClusterCache) GetAllPersistentVolumes() []*v1.PersistentVolume {
  362. var pvs []*v1.PersistentVolume
  363. items := kcc.pvWatch.GetAll()
  364. for _, pv := range items {
  365. pvs = append(pvs, pv.(*v1.PersistentVolume))
  366. }
  367. return pvs
  368. }
  369. func (kcc *KubernetesClusterCache) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  370. var pvcs []*v1.PersistentVolumeClaim
  371. items := kcc.pvcWatch.GetAll()
  372. for _, pvc := range items {
  373. pvcs = append(pvcs, pvc.(*v1.PersistentVolumeClaim))
  374. }
  375. return pvcs
  376. }
  377. func (kcc *KubernetesClusterCache) GetAllStorageClasses() []*stv1.StorageClass {
  378. var storageClasses []*stv1.StorageClass
  379. items := kcc.storageClassWatch.GetAll()
  380. for _, stc := range items {
  381. storageClasses = append(storageClasses, stc.(*stv1.StorageClass))
  382. }
  383. return storageClasses
  384. }
  385. func (kcc *KubernetesClusterCache) GetAllJobs() []*batchv1.Job {
  386. var jobs []*batchv1.Job
  387. items := kcc.jobsWatch.GetAll()
  388. for _, job := range items {
  389. jobs = append(jobs, job.(*batchv1.Job))
  390. }
  391. return jobs
  392. }
  393. func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
  394. var pdbs []*policyv1.PodDisruptionBudget
  395. items := kcc.pdbWatch.GetAll()
  396. for _, pdb := range items {
  397. pdbs = append(pdbs, pdb.(*policyv1.PodDisruptionBudget))
  398. }
  399. return pdbs
  400. }
  401. func (kcc *KubernetesClusterCache) GetAllReplicationControllers() []*v1.ReplicationController {
  402. var rcs []*v1.ReplicationController
  403. items := kcc.replicationControllerWatch.GetAll()
  404. for _, rc := range items {
  405. rcs = append(rcs, rc.(*v1.ReplicationController))
  406. }
  407. return rcs
  408. }
  409. func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
  410. kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
  411. }