clusterimporter.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/util/json"
  6. "github.com/opencost/opencost/pkg/config"
  7. appsv1 "k8s.io/api/apps/v1"
  8. batchv1 "k8s.io/api/batch/v1"
  9. v1 "k8s.io/api/core/v1"
  10. policyv1 "k8s.io/api/policy/v1"
  11. stv1 "k8s.io/api/storage/v1"
  12. )
  13. // ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
  14. // as it's source of the cluster data.
  15. type ClusterImporter struct {
  16. source *config.ConfigFile
  17. sourceHandlerID config.HandlerID
  18. dataLock *sync.Mutex
  19. data *clusterEncoding
  20. }
  21. // Creates a new ClusterCache implementation which uses an import process to provide cluster data
  22. func NewClusterImporter(source *config.ConfigFile) ClusterCache {
  23. return &ClusterImporter{
  24. source: source,
  25. dataLock: new(sync.Mutex),
  26. data: new(clusterEncoding),
  27. }
  28. }
  29. // onImportSourceChanged handles the source data updating
  30. func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
  31. if changeType == config.ChangeTypeDeleted {
  32. ci.dataLock.Lock()
  33. ci.data = new(clusterEncoding)
  34. ci.dataLock.Unlock()
  35. return
  36. }
  37. ci.update(data)
  38. }
  39. // update replaces the underlying cluster data with the provided new data if it decodes
  40. func (ci *ClusterImporter) update(data []byte) {
  41. ce := new(clusterEncoding)
  42. err := json.Unmarshal(data, ce)
  43. if err != nil {
  44. log.Warnf("Failed to unmarshal cluster during import: %s", err)
  45. return
  46. }
  47. ci.dataLock.Lock()
  48. ci.data = ce
  49. ci.dataLock.Unlock()
  50. }
  51. // Run starts the watcher processes
  52. func (ci *ClusterImporter) Run() {
  53. if ci.source == nil {
  54. log.Errorf("ClusterImporter source does not exist, not running")
  55. return
  56. }
  57. exists, err := ci.source.Exists()
  58. if err != nil {
  59. log.Errorf("Failed to import source for cluster: %s", err)
  60. return
  61. }
  62. if exists {
  63. data, err := ci.source.Read()
  64. if err != nil {
  65. log.Warnf("Failed to import cluster: %s", err)
  66. } else {
  67. ci.update(data)
  68. }
  69. }
  70. ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
  71. }
  72. // Stops the watcher processes
  73. func (ci *ClusterImporter) Stop() {
  74. if ci.sourceHandlerID != "" {
  75. ci.source.RemoveChangeHandler(ci.sourceHandlerID)
  76. ci.sourceHandlerID = ""
  77. }
  78. }
  79. // GetAllNamespaces returns all the cached namespaces
  80. func (ci *ClusterImporter) GetAllNamespaces() []*v1.Namespace {
  81. ci.dataLock.Lock()
  82. defer ci.dataLock.Unlock()
  83. // Deep copy here to avoid callers from corrupting the cache
  84. // This also mimics the behavior of the default cluster cache impl.
  85. namespaces := ci.data.Namespaces
  86. cloneList := make([]*v1.Namespace, 0, len(namespaces))
  87. for _, v := range namespaces {
  88. cloneList = append(cloneList, v.DeepCopy())
  89. }
  90. return cloneList
  91. }
  92. // GetAllNodes returns all the cached nodes
  93. func (ci *ClusterImporter) GetAllNodes() []*v1.Node {
  94. ci.dataLock.Lock()
  95. defer ci.dataLock.Unlock()
  96. // Deep copy here to avoid callers from corrupting the cache
  97. // This also mimics the behavior of the default cluster cache impl.
  98. nodes := ci.data.Nodes
  99. cloneList := make([]*v1.Node, 0, len(nodes))
  100. for _, v := range nodes {
  101. cloneList = append(cloneList, v.DeepCopy())
  102. }
  103. return cloneList
  104. }
  105. // GetAllPods returns all the cached pods
  106. func (ci *ClusterImporter) GetAllPods() []*v1.Pod {
  107. ci.dataLock.Lock()
  108. defer ci.dataLock.Unlock()
  109. // Deep copy here to avoid callers from corrupting the cache
  110. // This also mimics the behavior of the default cluster cache impl.
  111. pods := ci.data.Pods
  112. cloneList := make([]*v1.Pod, 0, len(pods))
  113. for _, v := range pods {
  114. cloneList = append(cloneList, v.DeepCopy())
  115. }
  116. return cloneList
  117. }
  118. // GetAllServices returns all the cached services
  119. func (ci *ClusterImporter) GetAllServices() []*v1.Service {
  120. ci.dataLock.Lock()
  121. defer ci.dataLock.Unlock()
  122. // Deep copy here to avoid callers from corrupting the cache
  123. // This also mimics the behavior of the default cluster cache impl.
  124. services := ci.data.Services
  125. cloneList := make([]*v1.Service, 0, len(services))
  126. for _, v := range services {
  127. cloneList = append(cloneList, v.DeepCopy())
  128. }
  129. return cloneList
  130. }
  131. // GetAllDaemonSets returns all the cached DaemonSets
  132. func (ci *ClusterImporter) GetAllDaemonSets() []*appsv1.DaemonSet {
  133. ci.dataLock.Lock()
  134. defer ci.dataLock.Unlock()
  135. // Deep copy here to avoid callers from corrupting the cache
  136. // This also mimics the behavior of the default cluster cache impl.
  137. daemonSets := ci.data.DaemonSets
  138. cloneList := make([]*appsv1.DaemonSet, 0, len(daemonSets))
  139. for _, v := range daemonSets {
  140. cloneList = append(cloneList, v.DeepCopy())
  141. }
  142. return cloneList
  143. }
  144. // GetAllDeployments returns all the cached deployments
  145. func (ci *ClusterImporter) GetAllDeployments() []*appsv1.Deployment {
  146. ci.dataLock.Lock()
  147. defer ci.dataLock.Unlock()
  148. // Deep copy here to avoid callers from corrupting the cache
  149. // This also mimics the behavior of the default cluster cache impl.
  150. deployments := ci.data.Deployments
  151. cloneList := make([]*appsv1.Deployment, 0, len(deployments))
  152. for _, v := range deployments {
  153. cloneList = append(cloneList, v.DeepCopy())
  154. }
  155. return cloneList
  156. }
  157. // GetAllStatfulSets returns all the cached StatefulSets
  158. func (ci *ClusterImporter) GetAllStatefulSets() []*appsv1.StatefulSet {
  159. ci.dataLock.Lock()
  160. defer ci.dataLock.Unlock()
  161. // Deep copy here to avoid callers from corrupting the cache
  162. // This also mimics the behavior of the default cluster cache impl.
  163. statefulSets := ci.data.StatefulSets
  164. cloneList := make([]*appsv1.StatefulSet, 0, len(statefulSets))
  165. for _, v := range statefulSets {
  166. cloneList = append(cloneList, v.DeepCopy())
  167. }
  168. return cloneList
  169. }
  170. // GetAllReplicaSets returns all the cached ReplicaSets
  171. func (ci *ClusterImporter) GetAllReplicaSets() []*appsv1.ReplicaSet {
  172. ci.dataLock.Lock()
  173. defer ci.dataLock.Unlock()
  174. // Deep copy here to avoid callers from corrupting the cache
  175. // This also mimics the behavior of the default cluster cache impl.
  176. replicaSets := ci.data.ReplicaSets
  177. cloneList := make([]*appsv1.ReplicaSet, 0, len(replicaSets))
  178. for _, v := range replicaSets {
  179. cloneList = append(cloneList, v.DeepCopy())
  180. }
  181. return cloneList
  182. }
  183. // GetAllPersistentVolumes returns all the cached persistent volumes
  184. func (ci *ClusterImporter) GetAllPersistentVolumes() []*v1.PersistentVolume {
  185. ci.dataLock.Lock()
  186. defer ci.dataLock.Unlock()
  187. // Deep copy here to avoid callers from corrupting the cache
  188. // This also mimics the behavior of the default cluster cache impl.
  189. pvs := ci.data.PersistentVolumes
  190. cloneList := make([]*v1.PersistentVolume, 0, len(pvs))
  191. for _, v := range pvs {
  192. cloneList = append(cloneList, v.DeepCopy())
  193. }
  194. return cloneList
  195. }
  196. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  197. func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  198. ci.dataLock.Lock()
  199. defer ci.dataLock.Unlock()
  200. // Deep copy here to avoid callers from corrupting the cache
  201. // This also mimics the behavior of the default cluster cache impl.
  202. pvcs := ci.data.PersistentVolumeClaims
  203. cloneList := make([]*v1.PersistentVolumeClaim, 0, len(pvcs))
  204. for _, v := range pvcs {
  205. cloneList = append(cloneList, v.DeepCopy())
  206. }
  207. return cloneList
  208. }
  209. // GetAllStorageClasses returns all the cached storage classes
  210. func (ci *ClusterImporter) GetAllStorageClasses() []*stv1.StorageClass {
  211. ci.dataLock.Lock()
  212. defer ci.dataLock.Unlock()
  213. // Deep copy here to avoid callers from corrupting the cache
  214. // This also mimics the behavior of the default cluster cache impl.
  215. storageClasses := ci.data.StorageClasses
  216. cloneList := make([]*stv1.StorageClass, 0, len(storageClasses))
  217. for _, v := range storageClasses {
  218. cloneList = append(cloneList, v.DeepCopy())
  219. }
  220. return cloneList
  221. }
  222. // GetAllJobs returns all the cached jobs
  223. func (ci *ClusterImporter) GetAllJobs() []*batchv1.Job {
  224. ci.dataLock.Lock()
  225. defer ci.dataLock.Unlock()
  226. // Deep copy here to avoid callers from corrupting the cache
  227. // This also mimics the behavior of the default cluster cache impl.
  228. jobs := ci.data.Jobs
  229. cloneList := make([]*batchv1.Job, 0, len(jobs))
  230. for _, v := range jobs {
  231. cloneList = append(cloneList, v.DeepCopy())
  232. }
  233. return cloneList
  234. }
  235. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  236. func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
  237. ci.dataLock.Lock()
  238. defer ci.dataLock.Unlock()
  239. // Deep copy here to avoid callers from corrupting the cache
  240. // This also mimics the behavior of the default cluster cache impl.
  241. pdbs := ci.data.PodDisruptionBudgets
  242. cloneList := make([]*policyv1.PodDisruptionBudget, 0, len(pdbs))
  243. for _, v := range pdbs {
  244. cloneList = append(cloneList, v.DeepCopy())
  245. }
  246. return cloneList
  247. }
  248. func (ci *ClusterImporter) GetAllReplicationControllers() []*v1.ReplicationController {
  249. ci.dataLock.Lock()
  250. defer ci.dataLock.Unlock()
  251. // Deep copy here to avoid callers from corrupting the cache
  252. // This also mimics the behavior of the default cluster cache impl.
  253. rcs := ci.data.ReplicationControllers
  254. cloneList := make([]*v1.ReplicationController, 0, len(rcs))
  255. for _, v := range rcs {
  256. cloneList = append(cloneList, v.DeepCopy())
  257. }
  258. return cloneList
  259. }
  260. // SetConfigMapUpdateFunc sets the configmap update function
  261. func (ci *ClusterImporter) SetConfigMapUpdateFunc(_ func(interface{})) {
  262. // TODO: (bolt) This function is still a bit strange to me for the ClusterCache interface.
  263. // TODO: (bolt) no-op for now.
  264. log.Warnf("SetConfigMapUpdateFunc is disabled for imported cluster data.")
  265. }