clusterimporter.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/kubecost/cost-model/pkg/config"
  5. "github.com/kubecost/cost-model/pkg/log"
  6. "github.com/kubecost/cost-model/pkg/util/json"
  7. appsv1 "k8s.io/api/apps/v1"
  8. autoscaling "k8s.io/api/autoscaling/v2beta1"
  9. batchv1 "k8s.io/api/batch/v1"
  10. v1 "k8s.io/api/core/v1"
  11. "k8s.io/api/policy/v1beta1"
  12. stv1 "k8s.io/api/storage/v1"
  13. )
  14. // ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
  15. // as it's source of the cluster data.
  16. type ClusterImporter struct {
  17. source *config.ConfigFile
  18. sourceHandlerID config.HandlerID
  19. dataLock *sync.Mutex
  20. data *clusterEncoding
  21. }
  22. // Creates a new ClusterCache implementation which uses an import process to provide cluster data
  23. func NewClusterImporter(source *config.ConfigFile) ClusterCache {
  24. return &ClusterImporter{
  25. source: source,
  26. dataLock: new(sync.Mutex),
  27. data: new(clusterEncoding),
  28. }
  29. }
  30. // onImportSourceChanged handles the source data updating
  31. func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
  32. if changeType == config.ChangeTypeDeleted {
  33. ci.dataLock.Lock()
  34. ci.data = new(clusterEncoding)
  35. ci.dataLock.Unlock()
  36. return
  37. }
  38. ci.update(data)
  39. }
  40. // update replaces the underlying cluster data with the provided new data if it decodes
  41. func (ci *ClusterImporter) update(data []byte) {
  42. ce := new(clusterEncoding)
  43. err := json.Unmarshal(data, ce)
  44. if err != nil {
  45. log.Warningf("Failed to unmarshal cluster during import: %s", err)
  46. return
  47. }
  48. ci.dataLock.Lock()
  49. ci.data = ce
  50. ci.dataLock.Unlock()
  51. }
  52. // Run starts the watcher processes
  53. func (ci *ClusterImporter) Run() {
  54. exists, err := ci.source.Exists()
  55. if err != nil {
  56. log.Errorf("Failed to import source for cluster: %s", err)
  57. return
  58. }
  59. if exists {
  60. data, err := ci.source.Read()
  61. if err != nil {
  62. log.Warningf("Failed to import cluster: %s", err)
  63. } else {
  64. ci.update(data)
  65. }
  66. }
  67. ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
  68. }
  69. // Stops the watcher processes
  70. func (ci *ClusterImporter) Stop() {
  71. if ci.sourceHandlerID != "" {
  72. ci.source.RemoveChangeHandler(ci.sourceHandlerID)
  73. ci.sourceHandlerID = ""
  74. }
  75. }
  76. // GetAllNamespaces returns all the cached namespaces
  77. func (ci *ClusterImporter) GetAllNamespaces() []*v1.Namespace {
  78. ci.dataLock.Lock()
  79. defer ci.dataLock.Unlock()
  80. // Deep copy here to avoid callers from corrupting the cache
  81. // This also mimic's the behavior of the default cluster cache impl.
  82. namespaces := ci.data.Namespaces
  83. cloneList := make([]*v1.Namespace, 0, len(namespaces))
  84. for _, v := range namespaces {
  85. cloneList = append(cloneList, v.DeepCopy())
  86. }
  87. return cloneList
  88. }
  89. // GetAllNodes returns all the cached nodes
  90. func (ci *ClusterImporter) GetAllNodes() []*v1.Node {
  91. ci.dataLock.Lock()
  92. defer ci.dataLock.Unlock()
  93. // Deep copy here to avoid callers from corrupting the cache
  94. // This also mimic's the behavior of the default cluster cache impl.
  95. nodes := ci.data.Nodes
  96. cloneList := make([]*v1.Node, 0, len(nodes))
  97. for _, v := range nodes {
  98. cloneList = append(cloneList, v.DeepCopy())
  99. }
  100. return cloneList
  101. }
  102. // GetAllPods returns all the cached pods
  103. func (ci *ClusterImporter) GetAllPods() []*v1.Pod {
  104. ci.dataLock.Lock()
  105. defer ci.dataLock.Unlock()
  106. // Deep copy here to avoid callers from corrupting the cache
  107. // This also mimic's the behavior of the default cluster cache impl.
  108. pods := ci.data.Pods
  109. cloneList := make([]*v1.Pod, 0, len(pods))
  110. for _, v := range pods {
  111. cloneList = append(cloneList, v.DeepCopy())
  112. }
  113. return cloneList
  114. }
  115. // GetAllServices returns all the cached services
  116. func (ci *ClusterImporter) GetAllServices() []*v1.Service {
  117. ci.dataLock.Lock()
  118. defer ci.dataLock.Unlock()
  119. // Deep copy here to avoid callers from corrupting the cache
  120. // This also mimic's the behavior of the default cluster cache impl.
  121. services := ci.data.Services
  122. cloneList := make([]*v1.Service, 0, len(services))
  123. for _, v := range services {
  124. cloneList = append(cloneList, v.DeepCopy())
  125. }
  126. return cloneList
  127. }
  128. // GetAllDaemonSets returns all the cached DaemonSets
  129. func (ci *ClusterImporter) GetAllDaemonSets() []*appsv1.DaemonSet {
  130. ci.dataLock.Lock()
  131. defer ci.dataLock.Unlock()
  132. // Deep copy here to avoid callers from corrupting the cache
  133. // This also mimic's the behavior of the default cluster cache impl.
  134. daemonSets := ci.data.DaemonSets
  135. cloneList := make([]*appsv1.DaemonSet, 0, len(daemonSets))
  136. for _, v := range daemonSets {
  137. cloneList = append(cloneList, v.DeepCopy())
  138. }
  139. return cloneList
  140. }
  141. // GetAllDeployments returns all the cached deployments
  142. func (ci *ClusterImporter) GetAllDeployments() []*appsv1.Deployment {
  143. ci.dataLock.Lock()
  144. defer ci.dataLock.Unlock()
  145. // Deep copy here to avoid callers from corrupting the cache
  146. // This also mimic's the behavior of the default cluster cache impl.
  147. deployments := ci.data.Deployments
  148. cloneList := make([]*appsv1.Deployment, 0, len(deployments))
  149. for _, v := range deployments {
  150. cloneList = append(cloneList, v.DeepCopy())
  151. }
  152. return cloneList
  153. }
  154. // GetAllStatfulSets returns all the cached StatefulSets
  155. func (ci *ClusterImporter) GetAllStatefulSets() []*appsv1.StatefulSet {
  156. ci.dataLock.Lock()
  157. defer ci.dataLock.Unlock()
  158. // Deep copy here to avoid callers from corrupting the cache
  159. // This also mimic's the behavior of the default cluster cache impl.
  160. statefulSets := ci.data.StatefulSets
  161. cloneList := make([]*appsv1.StatefulSet, 0, len(statefulSets))
  162. for _, v := range statefulSets {
  163. cloneList = append(cloneList, v.DeepCopy())
  164. }
  165. return cloneList
  166. }
  167. // GetAllReplicaSets returns all the cached ReplicaSets
  168. func (ci *ClusterImporter) GetAllReplicaSets() []*appsv1.ReplicaSet {
  169. ci.dataLock.Lock()
  170. defer ci.dataLock.Unlock()
  171. // Deep copy here to avoid callers from corrupting the cache
  172. // This also mimic's the behavior of the default cluster cache impl.
  173. replicaSets := ci.data.ReplicaSets
  174. cloneList := make([]*appsv1.ReplicaSet, 0, len(replicaSets))
  175. for _, v := range replicaSets {
  176. cloneList = append(cloneList, v.DeepCopy())
  177. }
  178. return cloneList
  179. }
  180. // GetAllPersistentVolumes returns all the cached persistent volumes
  181. func (ci *ClusterImporter) GetAllPersistentVolumes() []*v1.PersistentVolume {
  182. ci.dataLock.Lock()
  183. defer ci.dataLock.Unlock()
  184. // Deep copy here to avoid callers from corrupting the cache
  185. // This also mimic's the behavior of the default cluster cache impl.
  186. pvs := ci.data.PersistentVolumes
  187. cloneList := make([]*v1.PersistentVolume, 0, len(pvs))
  188. for _, v := range pvs {
  189. cloneList = append(cloneList, v.DeepCopy())
  190. }
  191. return cloneList
  192. }
  193. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  194. func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
  195. ci.dataLock.Lock()
  196. defer ci.dataLock.Unlock()
  197. // Deep copy here to avoid callers from corrupting the cache
  198. // This also mimic's the behavior of the default cluster cache impl.
  199. pvcs := ci.data.PersistentVolumeClaims
  200. cloneList := make([]*v1.PersistentVolumeClaim, 0, len(pvcs))
  201. for _, v := range pvcs {
  202. cloneList = append(cloneList, v.DeepCopy())
  203. }
  204. return cloneList
  205. }
  206. // GetAllStorageClasses returns all the cached storage classes
  207. func (ci *ClusterImporter) GetAllStorageClasses() []*stv1.StorageClass {
  208. ci.dataLock.Lock()
  209. defer ci.dataLock.Unlock()
  210. // Deep copy here to avoid callers from corrupting the cache
  211. // This also mimic's the behavior of the default cluster cache impl.
  212. storageClasses := ci.data.StorageClasses
  213. cloneList := make([]*stv1.StorageClass, 0, len(storageClasses))
  214. for _, v := range storageClasses {
  215. cloneList = append(cloneList, v.DeepCopy())
  216. }
  217. return cloneList
  218. }
  219. // GetAllJobs returns all the cached jobs
  220. func (ci *ClusterImporter) GetAllJobs() []*batchv1.Job {
  221. ci.dataLock.Lock()
  222. defer ci.dataLock.Unlock()
  223. // Deep copy here to avoid callers from corrupting the cache
  224. // This also mimic's the behavior of the default cluster cache impl.
  225. jobs := ci.data.Jobs
  226. cloneList := make([]*batchv1.Job, 0, len(jobs))
  227. for _, v := range jobs {
  228. cloneList = append(cloneList, v.DeepCopy())
  229. }
  230. return cloneList
  231. }
  232. // GetAllHorizontalPodAutoscalers() returns all cached horizontal pod autoscalers
  233. func (ci *ClusterImporter) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
  234. ci.dataLock.Lock()
  235. defer ci.dataLock.Unlock()
  236. // Deep copy here to avoid callers from corrupting the cache
  237. // This also mimic's the behavior of the default cluster cache impl.
  238. hpas := ci.data.HorizontalPodAutoscalers
  239. cloneList := make([]*autoscaling.HorizontalPodAutoscaler, 0, len(hpas))
  240. for _, v := range hpas {
  241. cloneList = append(cloneList, v.DeepCopy())
  242. }
  243. return cloneList
  244. }
  245. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  246. func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
  247. ci.dataLock.Lock()
  248. defer ci.dataLock.Unlock()
  249. // Deep copy here to avoid callers from corrupting the cache
  250. // This also mimic's the behavior of the default cluster cache impl.
  251. pdbs := ci.data.PodDisruptionBudgets
  252. cloneList := make([]*v1beta1.PodDisruptionBudget, 0, len(pdbs))
  253. for _, v := range pdbs {
  254. cloneList = append(cloneList, v.DeepCopy())
  255. }
  256. return cloneList
  257. }
  258. // SetConfigMapUpdateFunc sets the configmap update function
  259. func (ci *ClusterImporter) SetConfigMapUpdateFunc(_ func(interface{})) {
  260. // TODO: (bolt) This function is still a bit strange to me for the ClusterCache interface.
  261. // TODO: (bolt) no-op for now.
  262. log.Warningf("SetConfigMapUpdateFunc is disabled for imported cluster data.")
  263. }