| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package clustercache
- import (
- "sync"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/core/pkg/util/json"
- "github.com/opencost/opencost/pkg/config"
- "golang.org/x/exp/slices"
- )
- // ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
- // as it's source of the cluster data.
- type ClusterImporter struct {
- source *config.ConfigFile
- sourceHandlerID config.HandlerID
- dataLock *sync.Mutex
- data *clusterEncoding
- }
- // Creates a new ClusterCache implementation which uses an import process to provide cluster data
- func NewClusterImporter(source *config.ConfigFile) ClusterCache {
- return &ClusterImporter{
- source: source,
- dataLock: new(sync.Mutex),
- data: new(clusterEncoding),
- }
- }
- // onImportSourceChanged handles the source data updating
- func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
- if changeType == config.ChangeTypeDeleted {
- ci.dataLock.Lock()
- ci.data = new(clusterEncoding)
- ci.dataLock.Unlock()
- return
- }
- ci.update(data)
- }
- // update replaces the underlying cluster data with the provided new data if it decodes
- func (ci *ClusterImporter) update(data []byte) {
- ce := new(clusterEncoding)
- err := json.Unmarshal(data, ce)
- if err != nil {
- log.Warnf("Failed to unmarshal cluster during import: %s", err)
- return
- }
- ci.dataLock.Lock()
- ci.data = ce
- ci.dataLock.Unlock()
- }
- // Run starts the watcher processes
- func (ci *ClusterImporter) Run() {
- if ci.source == nil {
- log.Errorf("ClusterImporter source does not exist, not running")
- return
- }
- exists, err := ci.source.Exists()
- if err != nil {
- log.Errorf("Failed to import source for cluster: %s", err)
- return
- }
- if exists {
- data, err := ci.source.Read()
- if err != nil {
- log.Warnf("Failed to import cluster: %s", err)
- } else {
- ci.update(data)
- }
- }
- ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
- }
- // Stops the watcher processes
- func (ci *ClusterImporter) Stop() {
- if ci.sourceHandlerID != "" {
- ci.source.RemoveChangeHandler(ci.sourceHandlerID)
- ci.sourceHandlerID = ""
- }
- }
- // GetAllNamespaces returns all the cached namespaces
- func (ci *ClusterImporter) GetAllNamespaces() []*Namespace {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.Namespaces)
- }
- // GetAllNodes returns all the cached nodes
- func (ci *ClusterImporter) GetAllNodes() []*Node {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.Nodes)
- }
- // GetAllPods returns all the cached pods
- func (ci *ClusterImporter) GetAllPods() []*Pod {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.Pods)
- }
- // GetAllServices returns all the cached services
- func (ci *ClusterImporter) GetAllServices() []*Service {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.Services)
- }
- // GetAllDaemonSets returns all the cached DaemonSets
- func (ci *ClusterImporter) GetAllDaemonSets() []*DaemonSet {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.DaemonSets)
- }
- // GetAllDeployments returns all the cached deployments
- func (ci *ClusterImporter) GetAllDeployments() []*Deployment {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.Deployments)
- }
- // GetAllStatfulSets returns all the cached StatefulSets
- func (ci *ClusterImporter) GetAllStatefulSets() []*StatefulSet {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.StatefulSets)
- }
- // GetAllReplicaSets returns all the cached ReplicaSets
- func (ci *ClusterImporter) GetAllReplicaSets() []*ReplicaSet {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.ReplicaSets)
- }
- // GetAllPersistentVolumes returns all the cached persistent volumes
- func (ci *ClusterImporter) GetAllPersistentVolumes() []*PersistentVolume {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.PersistentVolumes)
- }
- // GetAllPersistentVolumeClaims returns all the cached persistent volume claims
- func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*PersistentVolumeClaim {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.PersistentVolumeClaims)
- }
- // GetAllStorageClasses returns all the cached storage classes
- func (ci *ClusterImporter) GetAllStorageClasses() []*StorageClass {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.StorageClasses)
- }
- // GetAllJobs returns all the cached jobs
- func (ci *ClusterImporter) GetAllJobs() []*Job {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.Jobs)
- }
- // GetAllPodDisruptionBudgets returns all cached pod disruption budgets
- func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*PodDisruptionBudget {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.PodDisruptionBudgets)
- }
- func (ci *ClusterImporter) GetAllReplicationControllers() []*ReplicationController {
- ci.dataLock.Lock()
- defer ci.dataLock.Unlock()
- return slices.Clone(ci.data.ReplicationControllers)
- }
|