Просмотр исходного кода

Sth/memory config controller (#2785)

* Handle errors from pricing api

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>

* memory config controller

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>

---------

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 1 год назад
Родитель
Сommit
e72d8f9579
3 измененных файлов с 123 добавлено и 60 удалено
  1. 115 57
      pkg/cloud/config/controller.go
  2. 6 1
      pkg/cmd/costmodel/costmodel.go
  3. 2 2
      pkg/costmodel/router.go

+ 115 - 57
pkg/cloud/config/controller.go

@@ -12,7 +12,6 @@ import (
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud/models"
-	"github.com/opencost/opencost/pkg/cloud/provider"
 	"github.com/opencost/opencost/pkg/env"
 )
 
@@ -22,27 +21,47 @@ const configFile = "cloud-configurations.json"
 // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
 // upon any change detected from the config watchers.
 type Controller struct {
-	path      string
+	storage   controllerStorage
 	lock      sync.RWMutex
 	observers []Observer
 	watchers  map[ConfigSource]cloud.KeyedConfigWatcher
 }
 
 // NewController initializes an Config Controller
-func NewController(cp models.Provider) *Controller {
-	var watchers map[ConfigSource]cloud.KeyedConfigWatcher
-	if env.IsKubernetesEnabled() && cp != nil {
-		providerConfig := provider.ExtractConfigFromProviders(cp)
-		watchers = GetCloudBillingWatchers(providerConfig)
-	} else {
-		watchers = GetCloudBillingWatchers(nil)
+func NewController(providerConfig models.ProviderConfig) *Controller {
+
+	watchers := GetCloudBillingWatchers(providerConfig)
+
+	storage := &FileControllerStorage{
+		path: filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile),
+	}
+
+	ic := &Controller{
+		storage:  storage,
+		watchers: watchers,
 	}
+
+	ic.start()
+
+	return ic
+}
+
+// NewMemoryController initializes a Config Controller backed in memory
+func NewMemoryController(providerConfig models.ProviderConfig) *Controller {
+	watchers := GetCloudBillingWatchers(providerConfig)
+
 	ic := &Controller{
-		path:     filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile),
+		storage:  &MemoryControllerStorage{},
 		watchers: watchers,
 	}
 
-	ic.pullWatchers()
+	ic.start()
+
+	return ic
+}
+
+func (c *Controller) start() {
+	c.pullWatchers()
 
 	go func() {
 		ticker := timeutil.NewJobTicker()
@@ -53,22 +72,20 @@ func NewController(cp models.Provider) *Controller {
 
 			<-ticker.Ch
 
-			ic.pullWatchers()
+			c.pullWatchers()
 		}
 	}()
-
-	return ic
 }
 
 // pullWatchers retrieve configs from watchers and update configs according to priority of sources
 func (c *Controller) pullWatchers() {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	statuses, err := c.load()
+	statuses, err := c.storage.load()
 	if err != nil {
 		log.Warnf("Controller: pullWatchers: %s. Proceeding to create the file", err.Error())
 		statuses = Statuses{}
-		err = c.save(statuses)
+		err = c.storage.save(statuses)
 		if err != nil {
 			log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
 		}
@@ -158,7 +175,7 @@ func (c *Controller) pullWatchers() {
 			if status.Active {
 				c.broadcastAddConfig(conf)
 			}
-			err = c.save(statuses)
+			err = c.storage.save(statuses)
 			if err != nil {
 				log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
 			}
@@ -178,7 +195,7 @@ func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
 		return fmt.Errorf("provided configuration was invalid: %w", err)
 	}
 
-	statuses, err := c.load()
+	statuses, err := c.storage.load()
 	if err != nil {
 		return fmt.Errorf("failed to load statuses")
 	}
@@ -219,7 +236,7 @@ func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
 	}
 
 	c.broadcastAddConfig(conf)
-	err = c.save(statuses)
+	err = c.storage.save(statuses)
 	if err != nil {
 		return fmt.Errorf("failed to save statues: %w", err)
 	}
@@ -231,7 +248,7 @@ func (c *Controller) EnableConfig(key, sourceStr string) error {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 
-	statuses, err := c.load()
+	statuses, err := c.storage.load()
 	if err != nil {
 		return fmt.Errorf("failed to load statuses")
 	}
@@ -261,7 +278,7 @@ func (c *Controller) EnableConfig(key, sourceStr string) error {
 
 	cs.Active = true
 	c.broadcastAddConfig(cs.Config)
-	c.save(statuses)
+	c.storage.save(statuses)
 	return nil
 }
 
@@ -269,7 +286,7 @@ func (c *Controller) EnableConfig(key, sourceStr string) error {
 func (c *Controller) DisableConfig(key, sourceStr string) error {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	statuses, err := c.load()
+	statuses, err := c.storage.load()
 	if err != nil {
 		return fmt.Errorf("failed to load statuses")
 	}
@@ -284,7 +301,7 @@ func (c *Controller) DisableConfig(key, sourceStr string) error {
 
 	is.Active = false
 	c.broadcastRemoveConfig(key)
-	c.save(statuses)
+	c.storage.save(statuses)
 	return nil
 }
 
@@ -298,7 +315,7 @@ func (c *Controller) DeleteConfig(key, sourceStr string) error {
 		return fmt.Errorf("controller does not own config with key %s from source %s, manage this config at its source", key, source.String())
 	}
 
-	statuses, err := c.load()
+	statuses, err := c.storage.load()
 	if err != nil {
 		return fmt.Errorf("failed to load statuses")
 	}
@@ -321,37 +338,7 @@ func (c *Controller) deleteConfig(key string, source ConfigSource, statuses Stat
 		c.broadcastRemoveConfig(key)
 	}
 	delete(statuses[source], key)
-	c.save(statuses)
-	return nil
-}
-
-func (c *Controller) load() (Statuses, error) {
-	raw, err := os.ReadFile(c.path)
-	if err != nil {
-		return nil, fmt.Errorf("failed to load config statuses from file: %w", err)
-	}
-
-	statuses := Statuses{}
-	err = json.Unmarshal(raw, &statuses)
-	if err != nil {
-		return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
-	}
-
-	return statuses, nil
-}
-
-func (c *Controller) save(statuses Statuses) error {
-
-	raw, err := json.Marshal(statuses)
-	if err != nil {
-		return fmt.Errorf("failed to marshal config statuses: %s", err)
-	}
-
-	err = os.WriteFile(c.path, raw, 0644)
-	if err != nil {
-		return fmt.Errorf("failed to save config statuses to file: %s", err)
-	}
-
+	c.storage.save(statuses)
 	return nil
 }
 
@@ -386,7 +373,7 @@ func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
 
 func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
 	activeConfigs := make(map[string]cloud.KeyedConfig)
-	statuses, err := c.load()
+	statuses, err := c.storage.load()
 	if err != nil {
 		log.Errorf("GetStatus: failed to load cloud statuses")
 	}
@@ -438,7 +425,7 @@ func (c *Controller) GetStatus() []Status {
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 	var status []Status
-	statuses, err := c.load()
+	statuses, err := c.storage.load()
 	if err != nil {
 		log.Errorf("GetStatus: failed to load cloud statuses")
 	}
@@ -447,3 +434,74 @@ func (c *Controller) GetStatus() []Status {
 	}
 	return status
 }
+
+type controllerStorage interface {
+	load() (Statuses, error)
+	save(statuses Statuses) error
+}
+
+type FileControllerStorage struct {
+	path string
+}
+
+func (fcs *FileControllerStorage) load() (Statuses, error) {
+	raw, err := os.ReadFile(fcs.path)
+	if err != nil {
+		return nil, fmt.Errorf("failed to load config statuses from file: %w", err)
+	}
+
+	statuses := Statuses{}
+	err = json.Unmarshal(raw, &statuses)
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
+	}
+
+	return statuses, nil
+}
+
+func (fcs *FileControllerStorage) save(statuses Statuses) error {
+
+	raw, err := json.Marshal(statuses)
+	if err != nil {
+		return fmt.Errorf("failed to marshal config statuses: %s", err)
+	}
+
+	err = os.WriteFile(fcs.path, raw, 0644)
+	if err != nil {
+		return fmt.Errorf("failed to save config statuses to file: %s", err)
+	}
+
+	return nil
+}
+
+// MemoryControllerStorage is a ControllerStorage implementation that is backed by a byte array that
+// is marshalled in and out of to ensure that behaviours is same as the file backed version
+type MemoryControllerStorage struct {
+	bytes []byte
+}
+
+func (mcs *MemoryControllerStorage) load() (Statuses, error) {
+	if mcs.bytes == nil {
+		return Statuses{}, nil
+	}
+
+	statuses := Statuses{}
+	err := json.Unmarshal(mcs.bytes, &statuses)
+	if err != nil {
+		return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
+	}
+
+	return statuses, nil
+}
+
+func (mcs *MemoryControllerStorage) save(statuses Statuses) error {
+
+	raw, err := json.Marshal(statuses)
+	if err != nil {
+		return fmt.Errorf("failed to marshal config statuses: %s", err)
+	}
+
+	mcs.bytes = raw
+
+	return nil
+}

+ 6 - 1
pkg/cmd/costmodel/costmodel.go

@@ -10,6 +10,7 @@ import (
 	"github.com/julienschmidt/httprouter"
 	"github.com/opencost/opencost/core/pkg/util/json"
 	"github.com/opencost/opencost/pkg/cloud/models"
+	"github.com/opencost/opencost/pkg/cloud/provider"
 	"github.com/opencost/opencost/pkg/customcost"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"github.com/rs/cors"
@@ -62,7 +63,11 @@ func Execute(opts *CostModelOpts) error {
 
 	log.Infof("Cloud Costs enabled: %t", env.IsCloudCostEnabled())
 	if env.IsCloudCostEnabled() {
-		costmodel.InitializeCloudCost(router, cp)
+		var providerConfig models.ProviderConfig
+		if cp != nil {
+			providerConfig = provider.ExtractConfigFromProviders(cp)
+		}
+		costmodel.InitializeCloudCost(router, providerConfig)
 	}
 
 	log.Infof("Custom Costs enabled: %t", env.IsCustomCostEnabled())

+ 2 - 2
pkg/costmodel/router.go

@@ -1786,9 +1786,9 @@ func Initialize(router *httprouter.Router, additionalConfigWatchers ...*watcher.
 }
 
 // InitializeCloudCost Initializes Cloud Cost pipeline and querier and registers endpoints
-func InitializeCloudCost(router *httprouter.Router, cp models.Provider) {
+func InitializeCloudCost(router *httprouter.Router, providerConfig models.ProviderConfig) {
 	log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())
-	cloudConfigController := cloudconfig.NewController(cp)
+	cloudConfigController := cloudconfig.NewMemoryController(providerConfig)
 
 	repo := cloudcost.NewMemoryRepository()
 	cloudCostPipelineService := cloudcost.NewPipelineService(repo, cloudConfigController, cloudcost.DefaultIngestorConfiguration())