clusterimporter.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package clustercache
  2. import (
  3. "sync"
  4. cc "github.com/opencost/opencost/core/pkg/clustercache"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/core/pkg/util/json"
  7. "github.com/opencost/opencost/pkg/config"
  8. "golang.org/x/exp/slices"
  9. )
  10. // ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
  11. // as it's source of the cluster data.
  12. type ClusterImporter struct {
  13. source *config.ConfigFile
  14. sourceHandlerID config.HandlerID
  15. dataLock *sync.Mutex
  16. data *clusterEncoding
  17. }
  18. // Creates a new ClusterCache implementation which uses an import process to provide cluster data
  19. func NewClusterImporter(source *config.ConfigFile) cc.ClusterCache {
  20. return &ClusterImporter{
  21. source: source,
  22. dataLock: new(sync.Mutex),
  23. data: new(clusterEncoding),
  24. }
  25. }
  26. // onImportSourceChanged handles the source data updating
  27. func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
  28. if changeType == config.ChangeTypeDeleted {
  29. ci.dataLock.Lock()
  30. ci.data = new(clusterEncoding)
  31. ci.dataLock.Unlock()
  32. return
  33. }
  34. ci.update(data)
  35. }
  36. // update replaces the underlying cluster data with the provided new data if it decodes
  37. func (ci *ClusterImporter) update(data []byte) {
  38. ce := new(clusterEncoding)
  39. err := json.Unmarshal(data, ce)
  40. if err != nil {
  41. log.Warnf("Failed to unmarshal cluster during import: %s", err)
  42. return
  43. }
  44. ci.dataLock.Lock()
  45. ci.data = ce
  46. ci.dataLock.Unlock()
  47. }
  48. // Run starts the watcher processes
  49. func (ci *ClusterImporter) Run() {
  50. if ci.source == nil {
  51. log.Errorf("ClusterImporter source does not exist, not running")
  52. return
  53. }
  54. exists, err := ci.source.Exists()
  55. if err != nil {
  56. log.Errorf("Failed to import source for cluster: %s", err)
  57. return
  58. }
  59. if exists {
  60. data, err := ci.source.Read()
  61. if err != nil {
  62. log.Warnf("Failed to import cluster: %s", err)
  63. } else {
  64. ci.update(data)
  65. }
  66. }
  67. ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
  68. }
  69. // Stops the watcher processes
  70. func (ci *ClusterImporter) Stop() {
  71. if ci.sourceHandlerID != "" {
  72. ci.source.RemoveChangeHandler(ci.sourceHandlerID)
  73. ci.sourceHandlerID = ""
  74. }
  75. }
  76. // GetAllNamespaces returns all the cached namespaces
  77. func (ci *ClusterImporter) GetAllNamespaces() []*cc.Namespace {
  78. ci.dataLock.Lock()
  79. defer ci.dataLock.Unlock()
  80. return slices.Clone(ci.data.Namespaces)
  81. }
  82. // GetAllNodes returns all the cached nodes
  83. func (ci *ClusterImporter) GetAllNodes() []*cc.Node {
  84. ci.dataLock.Lock()
  85. defer ci.dataLock.Unlock()
  86. return slices.Clone(ci.data.Nodes)
  87. }
  88. // GetAllPods returns all the cached pods
  89. func (ci *ClusterImporter) GetAllPods() []*cc.Pod {
  90. ci.dataLock.Lock()
  91. defer ci.dataLock.Unlock()
  92. return slices.Clone(ci.data.Pods)
  93. }
  94. // GetAllServices returns all the cached services
  95. func (ci *ClusterImporter) GetAllServices() []*cc.Service {
  96. ci.dataLock.Lock()
  97. defer ci.dataLock.Unlock()
  98. return slices.Clone(ci.data.Services)
  99. }
  100. // GetAllDaemonSets returns all the cached DaemonSets
  101. func (ci *ClusterImporter) GetAllDaemonSets() []*cc.DaemonSet {
  102. ci.dataLock.Lock()
  103. defer ci.dataLock.Unlock()
  104. return slices.Clone(ci.data.DaemonSets)
  105. }
  106. // GetAllDeployments returns all the cached deployments
  107. func (ci *ClusterImporter) GetAllDeployments() []*cc.Deployment {
  108. ci.dataLock.Lock()
  109. defer ci.dataLock.Unlock()
  110. return slices.Clone(ci.data.Deployments)
  111. }
  112. // GetAllStatfulSets returns all the cached StatefulSets
  113. func (ci *ClusterImporter) GetAllStatefulSets() []*cc.StatefulSet {
  114. ci.dataLock.Lock()
  115. defer ci.dataLock.Unlock()
  116. return slices.Clone(ci.data.StatefulSets)
  117. }
  118. // GetAllReplicaSets returns all the cached ReplicaSets
  119. func (ci *ClusterImporter) GetAllReplicaSets() []*cc.ReplicaSet {
  120. ci.dataLock.Lock()
  121. defer ci.dataLock.Unlock()
  122. return slices.Clone(ci.data.ReplicaSets)
  123. }
  124. // GetAllPersistentVolumes returns all the cached persistent volumes
  125. func (ci *ClusterImporter) GetAllPersistentVolumes() []*cc.PersistentVolume {
  126. ci.dataLock.Lock()
  127. defer ci.dataLock.Unlock()
  128. return slices.Clone(ci.data.PersistentVolumes)
  129. }
  130. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  131. func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*cc.PersistentVolumeClaim {
  132. ci.dataLock.Lock()
  133. defer ci.dataLock.Unlock()
  134. return slices.Clone(ci.data.PersistentVolumeClaims)
  135. }
  136. // GetAllStorageClasses returns all the cached storage classes
  137. func (ci *ClusterImporter) GetAllStorageClasses() []*cc.StorageClass {
  138. ci.dataLock.Lock()
  139. defer ci.dataLock.Unlock()
  140. return slices.Clone(ci.data.StorageClasses)
  141. }
  142. // GetAllJobs returns all the cached jobs
  143. func (ci *ClusterImporter) GetAllJobs() []*cc.Job {
  144. ci.dataLock.Lock()
  145. defer ci.dataLock.Unlock()
  146. return slices.Clone(ci.data.Jobs)
  147. }
  148. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  149. func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*cc.PodDisruptionBudget {
  150. ci.dataLock.Lock()
  151. defer ci.dataLock.Unlock()
  152. return slices.Clone(ci.data.PodDisruptionBudgets)
  153. }
  154. func (ci *ClusterImporter) GetAllReplicationControllers() []*cc.ReplicationController {
  155. ci.dataLock.Lock()
  156. defer ci.dataLock.Unlock()
  157. return slices.Clone(ci.data.ReplicationControllers)
  158. }