clusterimporter.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/pkg/config"
  5. "github.com/opencost/opencost/pkg/log"
  6. "github.com/opencost/opencost/pkg/util/json"
  7. appsv1 "k8s.io/api/apps/v1"
  8. autoscaling "k8s.io/api/autoscaling/v2beta1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. "k8s.io/api/policy/v1beta1"
  12. stv1 "k8s.io/api/storage/v1"
  13. )
  14. // ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
  15. // as it's source of the cluster data.
  16. type ClusterImporter struct {
  17. source *config.ConfigFile
  18. sourceHandlerID config.HandlerID
  19. dataLock *sync.Mutex
  20. data *clusterEncoding
  21. }
  22. // Creates a new ClusterCache implementation which uses an import process to provide cluster data
  23. func NewClusterImporter(source *config.ConfigFile) ClusterCache {
  24. return &ClusterImporter{
  25. source: source,
  26. dataLock: new(sync.Mutex),
  27. data: new(clusterEncoding),
  28. }
  29. }
  30. // onImportSourceChanged handles the source data updating
  31. func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
  32. if changeType == config.ChangeTypeDeleted {
  33. ci.dataLock.Lock()
  34. ci.data = new(clusterEncoding)
  35. ci.dataLock.Unlock()
  36. return
  37. }
  38. ci.update(data)
  39. }
  40. // update replaces the underlying cluster data with the provided new data if it decodes
  41. func (ci *ClusterImporter) update(data []byte) {
  42. ce := new(clusterEncoding)
  43. err := json.Unmarshal(data, ce)
  44. if err != nil {
  45. log.Warnf("Failed to unmarshal cluster during import: %s", err)
  46. return
  47. }
  48. ci.dataLock.Lock()
  49. ci.data = ce
  50. ci.dataLock.Unlock()
  51. }
  52. // Run starts the watcher processes
  53. func (ci *ClusterImporter) Run() {
  54. if ci.source == nil {
  55. log.Errorf("ClusterImporter source does not exist, not running")
  56. return
  57. }
  58. exists, err := ci.source.Exists()
  59. if err != nil {
  60. log.Errorf("Failed to import source for cluster: %s", err)
  61. return
  62. }
  63. if exists {
  64. data, err := ci.source.Read()
  65. if err != nil {
  66. log.Warnf("Failed to import cluster: %s", err)
  67. } else {
  68. ci.update(data)
  69. }
  70. }
  71. ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
  72. }
  73. // Stops the watcher processes
  74. func (ci *ClusterImporter) Stop() {
  75. if ci.sourceHandlerID != "" {
  76. ci.source.RemoveChangeHandler(ci.sourceHandlerID)
  77. ci.sourceHandlerID = ""
  78. }
  79. }
  80. // GetAllNamespaces returns all the cached namespaces
  81. func (ci *ClusterImporter) GetAllNamespaces() []*v1.Namespace {
  82. ci.dataLock.Lock()
  83. defer ci.dataLock.Unlock()
  84. // Deep copy here to avoid callers from corrupting the cache
  85. // This also mimics the behavior of the default cluster cache impl.
  86. namespaces := ci.data.Namespaces
  87. cloneList := make([]*v1.Namespace, 0, len(namespaces))
  88. for _, v := range namespaces {
  89. cloneList = append(cloneList, v.DeepCopy())
  90. }
  91. return cloneList
  92. }
  93. // GetAllNodes returns all the cached nodes
  94. func (ci *ClusterImporter) GetAllNodes() []*v1.Node {
  95. ci.dataLock.Lock()
  96. defer ci.dataLock.Unlock()
  97. // Deep copy here to avoid callers from corrupting the cache
  98. // This also mimics the behavior of the default cluster cache impl.
  99. nodes := ci.data.Nodes
  100. cloneList := make([]*v1.Node, 0, len(nodes))
  101. for _, v := range nodes {
  102. cloneList = append(cloneList, v.DeepCopy())
  103. }
  104. return cloneList
  105. }
  106. // GetAllPods returns all the cached pods
  107. func (ci *ClusterImporter) GetAllPods() []*v1.Pod {
  108. ci.dataLock.Lock()
  109. defer ci.dataLock.Unlock()
  110. // Deep copy here to avoid callers from corrupting the cache
  111. // This also mimics the behavior of the default cluster cache impl.
  112. pods := ci.data.Pods
  113. cloneList := make([]*v1.Pod, 0, len(pods))
  114. for _, v := range pods {
  115. cloneList = append(cloneList, v.DeepCopy())
  116. }
  117. return cloneList
  118. }
  119. // GetAllServices returns all the cached services
  120. func (ci *ClusterImporter) GetAllServices() []*v1.Service {
  121. ci.dataLock.Lock()
  122. defer ci.dataLock.Unlock()
  123. // Deep copy here to avoid callers from corrupting the cache
  124. // This also mimics the behavior of the default cluster cache impl.
  125. services := ci.data.Services
  126. cloneList := make([]*v1.Service, 0, len(services))
  127. for _, v := range services {
  128. cloneList = append(cloneList, v.DeepCopy())
  129. }
  130. return cloneList
  131. }
  132. // GetAllDaemonSets returns all the cached DaemonSets
  133. func (ci *ClusterImporter) GetAllDaemonSets() []*appsv1.DaemonSet {
  134. ci.dataLock.Lock()
  135. defer ci.dataLock.Unlock()
  136. // Deep copy here to avoid callers from corrupting the cache
  137. // This also mimics the behavior of the default cluster cache impl.
  138. daemonSets := ci.data.DaemonSets
  139. cloneList := make([]*appsv1.DaemonSet, 0, len(daemonSets))
  140. for _, v := range daemonSets {
  141. cloneList = append(cloneList, v.DeepCopy())
  142. }
  143. return cloneList
  144. }
  145. // GetAllDeployments returns all the cached deployments
  146. func (ci *ClusterImporter) GetAllDeployments() []*appsv1.Deployment {
  147. ci.dataLock.Lock()
  148. defer ci.dataLock.Unlock()
  149. // Deep copy here to avoid callers from corrupting the cache
  150. // This also mimics the behavior of the default cluster cache impl.
  151. deployments := ci.data.Deployments
  152. cloneList := make([]*appsv1.Deployment, 0, len(deployments))
  153. for _, v := range deployments {
  154. cloneList = append(cloneList, v.DeepCopy())
  155. }
  156. return cloneList
  157. }
  158. // GetAllStatfulSets returns all the cached StatefulSets
  159. func (ci *ClusterImporter) GetAllStatefulSets() []*appsv1.StatefulSet {
  160. ci.dataLock.Lock()
  161. defer ci.dataLock.Unlock()
  162. // Deep copy here to avoid callers from corrupting the cache
  163. // This also mimics the behavior of the default cluster cache impl.
  164. statefulSets := ci.data.StatefulSets
  165. cloneList := make([]*appsv1.StatefulSet, 0, len(statefulSets))
  166. for _, v := range statefulSets {
  167. cloneList = append(cloneList, v.DeepCopy())
  168. }
  169. return cloneList
  170. }
  171. // GetAllReplicaSets returns all the cached ReplicaSets
  172. func (ci *ClusterImporter) GetAllReplicaSets() []*appsv1.ReplicaSet {
  173. ci.dataLock.Lock()
  174. defer ci.dataLock.Unlock()
  175. // Deep copy here to avoid callers from corrupting the cache
  176. // This also mimics the behavior of the default cluster cache impl.
  177. replicaSets := ci.data.ReplicaSets
  178. cloneList := make([]*appsv1.ReplicaSet, 0, len(replicaSets))
  179. for _, v := range replicaSets {
  180. cloneList = append(cloneList, v.DeepCopy())
  181. }
  182. return cloneList
  183. }
  184. // GetAllPersistentVolumes returns all the cached persistent volumes
  185. func (ci *ClusterImporter) GetAllPersistentVolumes() []*v1.PersistentVolume {
  186. ci.dataLock.Lock()
  187. defer ci.dataLock.Unlock()
  188. // Deep copy here to avoid callers from corrupting the cache
  189. // This also mimics the behavior of the default cluster cache impl.
  190. pvs := ci.data.PersistentVolumes
  191. cloneList := make([]*v1.PersistentVolume, 0, len(pvs))
  192. for _, v := range pvs {
  193. cloneList = append(cloneList, v.DeepCopy())
  194. }
  195. return cloneList
  196. }
  197. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  198. func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  199. ci.dataLock.Lock()
  200. defer ci.dataLock.Unlock()
  201. // Deep copy here to avoid callers from corrupting the cache
  202. // This also mimics the behavior of the default cluster cache impl.
  203. pvcs := ci.data.PersistentVolumeClaims
  204. cloneList := make([]*v1.PersistentVolumeClaim, 0, len(pvcs))
  205. for _, v := range pvcs {
  206. cloneList = append(cloneList, v.DeepCopy())
  207. }
  208. return cloneList
  209. }
  210. // GetAllStorageClasses returns all the cached storage classes
  211. func (ci *ClusterImporter) GetAllStorageClasses() []*stv1.StorageClass {
  212. ci.dataLock.Lock()
  213. defer ci.dataLock.Unlock()
  214. // Deep copy here to avoid callers from corrupting the cache
  215. // This also mimics the behavior of the default cluster cache impl.
  216. storageClasses := ci.data.StorageClasses
  217. cloneList := make([]*stv1.StorageClass, 0, len(storageClasses))
  218. for _, v := range storageClasses {
  219. cloneList = append(cloneList, v.DeepCopy())
  220. }
  221. return cloneList
  222. }
  223. // GetAllJobs returns all the cached jobs
  224. func (ci *ClusterImporter) GetAllJobs() []*batchv1.Job {
  225. ci.dataLock.Lock()
  226. defer ci.dataLock.Unlock()
  227. // Deep copy here to avoid callers from corrupting the cache
  228. // This also mimics the behavior of the default cluster cache impl.
  229. jobs := ci.data.Jobs
  230. cloneList := make([]*batchv1.Job, 0, len(jobs))
  231. for _, v := range jobs {
  232. cloneList = append(cloneList, v.DeepCopy())
  233. }
  234. return cloneList
  235. }
  236. // GetAllHorizontalPodAutoscalers() returns all cached horizontal pod autoscalers
  237. func (ci *ClusterImporter) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
  238. ci.dataLock.Lock()
  239. defer ci.dataLock.Unlock()
  240. // Deep copy here to avoid callers from corrupting the cache
  241. // This also mimics the behavior of the default cluster cache impl.
  242. hpas := ci.data.HorizontalPodAutoscalers
  243. cloneList := make([]*autoscaling.HorizontalPodAutoscaler, 0, len(hpas))
  244. for _, v := range hpas {
  245. cloneList = append(cloneList, v.DeepCopy())
  246. }
  247. return cloneList
  248. }
  249. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  250. func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
  251. ci.dataLock.Lock()
  252. defer ci.dataLock.Unlock()
  253. // Deep copy here to avoid callers from corrupting the cache
  254. // This also mimics the behavior of the default cluster cache impl.
  255. pdbs := ci.data.PodDisruptionBudgets
  256. cloneList := make([]*v1beta1.PodDisruptionBudget, 0, len(pdbs))
  257. for _, v := range pdbs {
  258. cloneList = append(cloneList, v.DeepCopy())
  259. }
  260. return cloneList
  261. }
  262. func (ci *ClusterImporter) GetAllReplicationControllers() []*v1.ReplicationController {
  263. ci.dataLock.Lock()
  264. defer ci.dataLock.Unlock()
  265. // Deep copy here to avoid callers from corrupting the cache
  266. // This also mimics the behavior of the default cluster cache impl.
  267. rcs := ci.data.ReplicationControllers
  268. cloneList := make([]*v1.ReplicationController, 0, len(rcs))
  269. for _, v := range rcs {
  270. cloneList = append(cloneList, v.DeepCopy())
  271. }
  272. return cloneList
  273. }
  274. // SetConfigMapUpdateFunc sets the configmap update function
  275. func (ci *ClusterImporter) SetConfigMapUpdateFunc(_ func(interface{})) {
  276. // TODO: (bolt) This function is still a bit strange to me for the ClusterCache interface.
  277. // TODO: (bolt) no-op for now.
  278. log.Warnf("SetConfigMapUpdateFunc is disabled for imported cluster data.")
  279. }