|
|
@@ -12,7 +12,6 @@ import (
|
|
|
"github.com/opencost/opencost/core/pkg/util/timeutil"
|
|
|
"github.com/opencost/opencost/pkg/cloud"
|
|
|
"github.com/opencost/opencost/pkg/cloud/models"
|
|
|
- "github.com/opencost/opencost/pkg/cloud/provider"
|
|
|
"github.com/opencost/opencost/pkg/env"
|
|
|
)
|
|
|
|
|
|
@@ -22,27 +21,47 @@ const configFile = "cloud-configurations.json"
|
|
|
// methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
|
|
|
// upon any change detected from the config watchers.
|
|
|
type Controller struct {
|
|
|
- path string
|
|
|
+ storage controllerStorage
|
|
|
lock sync.RWMutex
|
|
|
observers []Observer
|
|
|
watchers map[ConfigSource]cloud.KeyedConfigWatcher
|
|
|
}
|
|
|
|
|
|
// NewController initializes an Config Controller
|
|
|
-func NewController(cp models.Provider) *Controller {
|
|
|
- var watchers map[ConfigSource]cloud.KeyedConfigWatcher
|
|
|
- if env.IsKubernetesEnabled() && cp != nil {
|
|
|
- providerConfig := provider.ExtractConfigFromProviders(cp)
|
|
|
- watchers = GetCloudBillingWatchers(providerConfig)
|
|
|
- } else {
|
|
|
- watchers = GetCloudBillingWatchers(nil)
|
|
|
+func NewController(providerConfig models.ProviderConfig) *Controller {
|
|
|
+
|
|
|
+ watchers := GetCloudBillingWatchers(providerConfig)
|
|
|
+
|
|
|
+ storage := &FileControllerStorage{
|
|
|
+ path: filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile),
|
|
|
+ }
|
|
|
+
|
|
|
+ ic := &Controller{
|
|
|
+ storage: storage,
|
|
|
+ watchers: watchers,
|
|
|
}
|
|
|
+
|
|
|
+ ic.start()
|
|
|
+
|
|
|
+ return ic
|
|
|
+}
|
|
|
+
|
|
|
+// NewMemoryController initializes a Config Controller backed in memory
|
|
|
+func NewMemoryController(providerConfig models.ProviderConfig) *Controller {
|
|
|
+ watchers := GetCloudBillingWatchers(providerConfig)
|
|
|
+
|
|
|
ic := &Controller{
|
|
|
- path: filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile),
|
|
|
+ storage: &MemoryControllerStorage{},
|
|
|
watchers: watchers,
|
|
|
}
|
|
|
|
|
|
- ic.pullWatchers()
|
|
|
+ ic.start()
|
|
|
+
|
|
|
+ return ic
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Controller) start() {
|
|
|
+ c.pullWatchers()
|
|
|
|
|
|
go func() {
|
|
|
ticker := timeutil.NewJobTicker()
|
|
|
@@ -53,22 +72,20 @@ func NewController(cp models.Provider) *Controller {
|
|
|
|
|
|
<-ticker.Ch
|
|
|
|
|
|
- ic.pullWatchers()
|
|
|
+ c.pullWatchers()
|
|
|
}
|
|
|
}()
|
|
|
-
|
|
|
- return ic
|
|
|
}
|
|
|
|
|
|
// pullWatchers retrieve configs from watchers and update configs according to priority of sources
|
|
|
func (c *Controller) pullWatchers() {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
- statuses, err := c.load()
|
|
|
+ statuses, err := c.storage.load()
|
|
|
if err != nil {
|
|
|
log.Warnf("Controller: pullWatchers: %s. Proceeding to create the file", err.Error())
|
|
|
statuses = Statuses{}
|
|
|
- err = c.save(statuses)
|
|
|
+ err = c.storage.save(statuses)
|
|
|
if err != nil {
|
|
|
log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
|
|
|
}
|
|
|
@@ -158,7 +175,7 @@ func (c *Controller) pullWatchers() {
|
|
|
if status.Active {
|
|
|
c.broadcastAddConfig(conf)
|
|
|
}
|
|
|
- err = c.save(statuses)
|
|
|
+ err = c.storage.save(statuses)
|
|
|
if err != nil {
|
|
|
log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
|
|
|
}
|
|
|
@@ -178,7 +195,7 @@ func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
|
|
|
return fmt.Errorf("provided configuration was invalid: %w", err)
|
|
|
}
|
|
|
|
|
|
- statuses, err := c.load()
|
|
|
+ statuses, err := c.storage.load()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to load statuses")
|
|
|
}
|
|
|
@@ -219,7 +236,7 @@ func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
|
|
|
}
|
|
|
|
|
|
c.broadcastAddConfig(conf)
|
|
|
- err = c.save(statuses)
|
|
|
+ err = c.storage.save(statuses)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to save statues: %w", err)
|
|
|
}
|
|
|
@@ -231,7 +248,7 @@ func (c *Controller) EnableConfig(key, sourceStr string) error {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
- statuses, err := c.load()
|
|
|
+ statuses, err := c.storage.load()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to load statuses")
|
|
|
}
|
|
|
@@ -261,7 +278,7 @@ func (c *Controller) EnableConfig(key, sourceStr string) error {
|
|
|
|
|
|
cs.Active = true
|
|
|
c.broadcastAddConfig(cs.Config)
|
|
|
- c.save(statuses)
|
|
|
+ c.storage.save(statuses)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -269,7 +286,7 @@ func (c *Controller) EnableConfig(key, sourceStr string) error {
|
|
|
func (c *Controller) DisableConfig(key, sourceStr string) error {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
- statuses, err := c.load()
|
|
|
+ statuses, err := c.storage.load()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to load statuses")
|
|
|
}
|
|
|
@@ -284,7 +301,7 @@ func (c *Controller) DisableConfig(key, sourceStr string) error {
|
|
|
|
|
|
is.Active = false
|
|
|
c.broadcastRemoveConfig(key)
|
|
|
- c.save(statuses)
|
|
|
+ c.storage.save(statuses)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -298,7 +315,7 @@ func (c *Controller) DeleteConfig(key, sourceStr string) error {
|
|
|
return fmt.Errorf("controller does not own config with key %s from source %s, manage this config at its source", key, source.String())
|
|
|
}
|
|
|
|
|
|
- statuses, err := c.load()
|
|
|
+ statuses, err := c.storage.load()
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to load statuses")
|
|
|
}
|
|
|
@@ -321,37 +338,7 @@ func (c *Controller) deleteConfig(key string, source ConfigSource, statuses Stat
|
|
|
c.broadcastRemoveConfig(key)
|
|
|
}
|
|
|
delete(statuses[source], key)
|
|
|
- c.save(statuses)
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func (c *Controller) load() (Statuses, error) {
|
|
|
- raw, err := os.ReadFile(c.path)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("failed to load config statuses from file: %w", err)
|
|
|
- }
|
|
|
-
|
|
|
- statuses := Statuses{}
|
|
|
- err = json.Unmarshal(raw, &statuses)
|
|
|
- if err != nil {
|
|
|
- return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
|
|
|
- }
|
|
|
-
|
|
|
- return statuses, nil
|
|
|
-}
|
|
|
-
|
|
|
-func (c *Controller) save(statuses Statuses) error {
|
|
|
-
|
|
|
- raw, err := json.Marshal(statuses)
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to marshal config statuses: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
- err = os.WriteFile(c.path, raw, 0644)
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("failed to save config statuses to file: %s", err)
|
|
|
- }
|
|
|
-
|
|
|
+ c.storage.save(statuses)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -386,7 +373,7 @@ func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
|
|
|
|
|
|
func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
|
|
|
activeConfigs := make(map[string]cloud.KeyedConfig)
|
|
|
- statuses, err := c.load()
|
|
|
+ statuses, err := c.storage.load()
|
|
|
if err != nil {
|
|
|
log.Errorf("GetStatus: failed to load cloud statuses")
|
|
|
}
|
|
|
@@ -438,7 +425,7 @@ func (c *Controller) GetStatus() []Status {
|
|
|
c.lock.RLock()
|
|
|
defer c.lock.RUnlock()
|
|
|
var status []Status
|
|
|
- statuses, err := c.load()
|
|
|
+ statuses, err := c.storage.load()
|
|
|
if err != nil {
|
|
|
log.Errorf("GetStatus: failed to load cloud statuses")
|
|
|
}
|
|
|
@@ -447,3 +434,74 @@ func (c *Controller) GetStatus() []Status {
|
|
|
}
|
|
|
return status
|
|
|
}
|
|
|
+
|
|
|
+type controllerStorage interface {
|
|
|
+ load() (Statuses, error)
|
|
|
+ save(statuses Statuses) error
|
|
|
+}
|
|
|
+
|
|
|
+type FileControllerStorage struct {
|
|
|
+ path string
|
|
|
+}
|
|
|
+
|
|
|
+func (fcs *FileControllerStorage) load() (Statuses, error) {
|
|
|
+ raw, err := os.ReadFile(fcs.path)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed to load config statuses from file: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ statuses := Statuses{}
|
|
|
+ err = json.Unmarshal(raw, &statuses)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return statuses, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (fcs *FileControllerStorage) save(statuses Statuses) error {
|
|
|
+
|
|
|
+ raw, err := json.Marshal(statuses)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to marshal config statuses: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = os.WriteFile(fcs.path, raw, 0644)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to save config statuses to file: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// MemoryControllerStorage is a ControllerStorage implementation that is backed by a byte array that
|
|
|
+// is marshalled in and out of to ensure that behaviours is same as the file backed version
|
|
|
+type MemoryControllerStorage struct {
|
|
|
+ bytes []byte
|
|
|
+}
|
|
|
+
|
|
|
+func (mcs *MemoryControllerStorage) load() (Statuses, error) {
|
|
|
+ if mcs.bytes == nil {
|
|
|
+ return Statuses{}, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ statuses := Statuses{}
|
|
|
+ err := json.Unmarshal(mcs.bytes, &statuses)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
|
|
|
+ }
|
|
|
+
|
|
|
+ return statuses, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (mcs *MemoryControllerStorage) save(statuses Statuses) error {
|
|
|
+
|
|
|
+ raw, err := json.Marshal(statuses)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("failed to marshal config statuses: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ mcs.bytes = raw
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|