clusterimporter.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package clustercache
  2. import (
  3. "sync"
  4. "github.com/opencost/opencost/core/pkg/log"
  5. "github.com/opencost/opencost/core/pkg/util/json"
  6. "github.com/opencost/opencost/pkg/config"
  7. "golang.org/x/exp/slices"
  8. )
  9. // ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
  10. // as it's source of the cluster data.
  11. type ClusterImporter struct {
  12. source *config.ConfigFile
  13. sourceHandlerID config.HandlerID
  14. dataLock *sync.Mutex
  15. data *clusterEncoding
  16. }
  17. // Creates a new ClusterCache implementation which uses an import process to provide cluster data
  18. func NewClusterImporter(source *config.ConfigFile) ClusterCache {
  19. return &ClusterImporter{
  20. source: source,
  21. dataLock: new(sync.Mutex),
  22. data: new(clusterEncoding),
  23. }
  24. }
  25. // onImportSourceChanged handles the source data updating
  26. func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
  27. if changeType == config.ChangeTypeDeleted {
  28. ci.dataLock.Lock()
  29. ci.data = new(clusterEncoding)
  30. ci.dataLock.Unlock()
  31. return
  32. }
  33. ci.update(data)
  34. }
  35. // update replaces the underlying cluster data with the provided new data if it decodes
  36. func (ci *ClusterImporter) update(data []byte) {
  37. ce := new(clusterEncoding)
  38. err := json.Unmarshal(data, ce)
  39. if err != nil {
  40. log.Warnf("Failed to unmarshal cluster during import: %s", err)
  41. return
  42. }
  43. ci.dataLock.Lock()
  44. ci.data = ce
  45. ci.dataLock.Unlock()
  46. }
  47. // Run starts the watcher processes
  48. func (ci *ClusterImporter) Run() {
  49. if ci.source == nil {
  50. log.Errorf("ClusterImporter source does not exist, not running")
  51. return
  52. }
  53. exists, err := ci.source.Exists()
  54. if err != nil {
  55. log.Errorf("Failed to import source for cluster: %s", err)
  56. return
  57. }
  58. if exists {
  59. data, err := ci.source.Read()
  60. if err != nil {
  61. log.Warnf("Failed to import cluster: %s", err)
  62. } else {
  63. ci.update(data)
  64. }
  65. }
  66. ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
  67. }
  68. // Stops the watcher processes
  69. func (ci *ClusterImporter) Stop() {
  70. if ci.sourceHandlerID != "" {
  71. ci.source.RemoveChangeHandler(ci.sourceHandlerID)
  72. ci.sourceHandlerID = ""
  73. }
  74. }
  75. // GetAllNamespaces returns all the cached namespaces
  76. func (ci *ClusterImporter) GetAllNamespaces() []*Namespace {
  77. ci.dataLock.Lock()
  78. defer ci.dataLock.Unlock()
  79. return slices.Clone(ci.data.Namespaces)
  80. }
  81. // GetAllNodes returns all the cached nodes
  82. func (ci *ClusterImporter) GetAllNodes() []*Node {
  83. ci.dataLock.Lock()
  84. defer ci.dataLock.Unlock()
  85. return slices.Clone(ci.data.Nodes)
  86. }
  87. // GetAllPods returns all the cached pods
  88. func (ci *ClusterImporter) GetAllPods() []*Pod {
  89. ci.dataLock.Lock()
  90. defer ci.dataLock.Unlock()
  91. return slices.Clone(ci.data.Pods)
  92. }
  93. // GetAllServices returns all the cached services
  94. func (ci *ClusterImporter) GetAllServices() []*Service {
  95. ci.dataLock.Lock()
  96. defer ci.dataLock.Unlock()
  97. return slices.Clone(ci.data.Services)
  98. }
  99. // GetAllDaemonSets returns all the cached DaemonSets
  100. func (ci *ClusterImporter) GetAllDaemonSets() []*DaemonSet {
  101. ci.dataLock.Lock()
  102. defer ci.dataLock.Unlock()
  103. return slices.Clone(ci.data.DaemonSets)
  104. }
  105. // GetAllDeployments returns all the cached deployments
  106. func (ci *ClusterImporter) GetAllDeployments() []*Deployment {
  107. ci.dataLock.Lock()
  108. defer ci.dataLock.Unlock()
  109. return slices.Clone(ci.data.Deployments)
  110. }
  111. // GetAllStatfulSets returns all the cached StatefulSets
  112. func (ci *ClusterImporter) GetAllStatefulSets() []*StatefulSet {
  113. ci.dataLock.Lock()
  114. defer ci.dataLock.Unlock()
  115. return slices.Clone(ci.data.StatefulSets)
  116. }
  117. // GetAllReplicaSets returns all the cached ReplicaSets
  118. func (ci *ClusterImporter) GetAllReplicaSets() []*ReplicaSet {
  119. ci.dataLock.Lock()
  120. defer ci.dataLock.Unlock()
  121. return slices.Clone(ci.data.ReplicaSets)
  122. }
  123. // GetAllPersistentVolumes returns all the cached persistent volumes
  124. func (ci *ClusterImporter) GetAllPersistentVolumes() []*PersistentVolume {
  125. ci.dataLock.Lock()
  126. defer ci.dataLock.Unlock()
  127. return slices.Clone(ci.data.PersistentVolumes)
  128. }
  129. // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
  130. func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
  131. ci.dataLock.Lock()
  132. defer ci.dataLock.Unlock()
  133. return slices.Clone(ci.data.PersistentVolumeClaims)
  134. }
  135. // GetAllStorageClasses returns all the cached storage classes
  136. func (ci *ClusterImporter) GetAllStorageClasses() []*StorageClass {
  137. ci.dataLock.Lock()
  138. defer ci.dataLock.Unlock()
  139. return slices.Clone(ci.data.StorageClasses)
  140. }
  141. // GetAllJobs returns all the cached jobs
  142. func (ci *ClusterImporter) GetAllJobs() []*Job {
  143. ci.dataLock.Lock()
  144. defer ci.dataLock.Unlock()
  145. return slices.Clone(ci.data.Jobs)
  146. }
  147. // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
  148. func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
  149. ci.dataLock.Lock()
  150. defer ci.dataLock.Unlock()
  151. return slices.Clone(ci.data.PodDisruptionBudgets)
  152. }
  153. func (ci *ClusterImporter) GetAllReplicationControllers() []*ReplicationController {
  154. ci.dataLock.Lock()
  155. defer ci.dataLock.Unlock()
  156. return slices.Clone(ci.data.ReplicationControllers)
  157. }