| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- package config
- import (
- "fmt"
- "sync"
- "time"
- "github.com/opencost/opencost/pkg/cloud"
- "github.com/opencost/opencost/pkg/cloud/models"
- "github.com/opencost/opencost/pkg/cloud/provider"
- "github.com/opencost/opencost/pkg/util/timeutil"
- )
- // configID identifies the source and the ID of a configuration to handle duplicate configs from multiple sources
- type configID struct {
- source ConfigSource
- key string
- }
- func (cid configID) Equals(that configID) bool {
- return cid.source == that.source && cid.key == that.key
- }
- func newConfigID(source, key string) configID {
- return configID{
- source: GetConfigSource(source),
- key: key,
- }
- }
- type Status struct {
- Source ConfigSource
- Key string
- Active bool
- Valid bool
- Config cloud.KeyedConfig
- }
- // Controller manages the cloud.Config using config Watcher(s) to track various configuration
- // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
- // upon any change detected from the config watchers.
- type Controller struct {
- statuses map[configID]*Status
- observers []Observer
- watchers map[ConfigSource]cloud.KeyedConfigWatcher
- }
- // NewController initializes an Config Controller
- func NewController(cp models.Provider) *Controller {
- providerConfig := provider.ExtractConfigFromProviders(cp)
- watchers := GetCloudBillingWatchers(providerConfig)
- ic := &Controller{
- statuses: make(map[configID]*Status),
- watchers: watchers,
- }
- ic.load()
- ic.pullWatchers()
- go func() {
- ticker := timeutil.NewJobTicker()
- defer ticker.Close()
- for {
- ticker.TickIn(10 * time.Second)
- <-ticker.Ch
- ic.pullWatchers()
- }
- }()
- return ic
- }
- func (c *Controller) EnableConfig(key, source string) error {
- cID := newConfigID(source, key)
- cs, ok := c.statuses[cID]
- if !ok {
- return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s does not exist", key, source)
- }
- if cs.Active {
- return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s is already active", key, source)
- }
- // check for configurations with the same configuration key that are already active.
- for confID, confStat := range c.statuses {
- if confID.key != key || confID.source == cID.source {
- continue
- }
- // if active disable
- if confStat.Active == true {
- confStat.Active = false
- }
- }
- cs.Active = true
- c.putConfig(cs.Config)
- c.save()
- return nil
- }
- // DisableConfig updates an config status if it was enabled
- func (c *Controller) DisableConfig(key, source string) error {
- iID := newConfigID(source, key)
- is, ok := c.statuses[iID]
- if !ok {
- return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
- }
- if !is.Active {
- return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s is already disabled", key, source)
- }
- is.Active = false
- c.deleteConfig(iID.key)
- c.save()
- return nil
- }
- // DeleteConfig removes an config from the statuses and deletes the config on all observers if it was active
- func (c *Controller) DeleteConfig(key, source string) error {
- id := newConfigID(source, key)
- is, ok := c.statuses[id]
- if !ok {
- return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
- }
- // delete config on observers if active
- if is.Active {
- c.deleteConfig(id.key)
- }
- delete(c.statuses, id)
- c.save()
- return nil
- }
- // pullWatchers retrieve configs from watchers and update configs according to priority of sources
- func (c *Controller) pullWatchers() {
- for source, watcher := range c.watchers {
- for _, conf := range watcher.GetConfigs() {
- key := conf.Key()
- cID := configID{
- source: source,
- key: key,
- }
- err := conf.Validate()
- valid := err == nil
- status := Status{
- Key: key,
- Source: source,
- Active: valid, // active if valid, for now
- Valid: valid,
- Config: conf,
- }
- // Check existing configs for matching key and source
- if existingStatus, ok := c.statuses[cID]; ok {
- // if config has not changed continue
- if existingStatus.Config.Equals(conf) {
- continue
- }
- // if existing CS is active then it should be replaced by the updated config
- if existingStatus.Active {
- if status.Valid {
- c.putConfig(conf)
- } else {
- // if active config is being overwritten by an invalid one, delete the config, as it will not be active
- c.deleteConfig(key)
- }
- c.statuses[cID] = &status
- continue
- }
- }
- // At this point we know that the config from this watcher has changed
- // handle an config with a new unique key for a source or an update config from a source which was inactive before
- if valid {
- for matchID, matchCS := range c.statuses {
- // skip matching configs
- if matchID.Equals(cID) {
- continue
- }
- if matchCS.Active {
- // if source is non-multi-cloud disable all other non-multi-cloud sourced configs
- if cID.source == HelmSource || cID.source == ConfigFileSource {
- if matchID.source == HelmSource || matchID.source == ConfigFileSource {
- matchCS.Active = false
- c.deleteConfig(matchID.key)
- }
- }
- // check for configs with the same key that are active
- if matchID.key == key {
- // If source has higher priority disable other active configs
- matchCS.Active = false
- c.deleteConfig(matchID.key)
- }
- }
- }
- }
- // update config and put to observers if active
- c.statuses[cID] = &status
- if status.Active {
- c.putConfig(conf)
- }
- }
- }
- }
- // todo implement when building config api and persistence is necessary
- func (c *Controller) load() {}
- // todo implement when building config api and persistence is necessary
- func (c *Controller) save() {}
- func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
- configs := new(Configurations)
- activeConfigs := make(map[string]cloud.Config)
- for iID, cs := range c.statuses {
- if cs.Active {
- activeConfigs[iID.key] = cs.Config
- }
- }
- if key != "" {
- conf, ok := activeConfigs[key]
- if !ok {
- return nil, fmt.Errorf("Config with key %s does not exist or is inactive", key)
- }
- sanitizedConfig := conf.Sanitize()
- err := configs.Insert(sanitizedConfig)
- if err != nil {
- return nil, fmt.Errorf("failed to insert config: %w", err)
- }
- return configs, nil
- }
- for _, conf := range activeConfigs {
- sanitizedConfig := conf.Sanitize()
- err := configs.Insert(sanitizedConfig)
- if err != nil {
- return nil, fmt.Errorf("failed to insert config: %w", err)
- }
- }
- return configs, nil
- }
- func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
- bi := make(map[string]cloud.KeyedConfig)
- for iID, cs := range c.statuses {
- if cs.Active {
- bi[iID.key] = cs.Config
- }
- }
- return bi
- }
- // deleteConfig ask observers to remove and stop all processes related to a configuration with a given key
- func (c *Controller) deleteConfig(key string) {
- var wg sync.WaitGroup
- for _, obs := range c.observers {
- observer := obs
- wg.Add(1)
- go func() {
- defer wg.Done()
- observer.DeleteConfig(key)
- }()
- }
- wg.Wait()
- }
- // RegisterObserver gives out the current active list configs and adds the observer to the push list
- func (c *Controller) RegisterObserver(obs Observer) {
- obs.SetConfigs(c.getActiveConfigs())
- c.observers = append(c.observers, obs)
- }
- func (c *Controller) GetStatus() []Status {
- var status []Status
- for _, intStat := range c.statuses {
- status = append(status, *intStat)
- }
- return status
- }
- // putConfig gives observers a new config to handle
- func (c *Controller) putConfig(conf cloud.KeyedConfig) {
- var wg sync.WaitGroup
- for _, obs := range c.observers {
- observer := obs
- wg.Add(1)
- go func() {
- defer wg.Done()
- observer.PutConfig(conf)
- }()
- }
- wg.Wait()
- }
|