controller.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package config
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/util/timeutil"
  7. "github.com/opencost/opencost/pkg/cloud"
  8. "github.com/opencost/opencost/pkg/cloud/models"
  9. "github.com/opencost/opencost/pkg/cloud/provider"
  10. )
  11. // configID identifies the source and the ID of a configuration to handle duplicate configs from multiple sources
  12. type configID struct {
  13. source ConfigSource
  14. key string
  15. }
  16. func (cid configID) Equals(that configID) bool {
  17. return cid.source == that.source && cid.key == that.key
  18. }
  19. func newConfigID(source, key string) configID {
  20. return configID{
  21. source: GetConfigSource(source),
  22. key: key,
  23. }
  24. }
  25. type Status struct {
  26. Source ConfigSource
  27. Key string
  28. Active bool
  29. Valid bool
  30. Config cloud.KeyedConfig
  31. }
  32. // Controller manages the cloud.Config using config Watcher(s) to track various configuration
  33. // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
  34. // upon any change detected from the config watchers.
  35. type Controller struct {
  36. statuses map[configID]*Status
  37. observers []Observer
  38. watchers map[ConfigSource]cloud.KeyedConfigWatcher
  39. }
  40. // NewController initializes an Config Controller
  41. func NewController(cp models.Provider) *Controller {
  42. providerConfig := provider.ExtractConfigFromProviders(cp)
  43. watchers := GetCloudBillingWatchers(providerConfig)
  44. ic := &Controller{
  45. statuses: make(map[configID]*Status),
  46. watchers: watchers,
  47. }
  48. ic.load()
  49. ic.pullWatchers()
  50. go func() {
  51. ticker := timeutil.NewJobTicker()
  52. defer ticker.Close()
  53. for {
  54. ticker.TickIn(10 * time.Second)
  55. <-ticker.Ch
  56. ic.pullWatchers()
  57. }
  58. }()
  59. return ic
  60. }
  61. func (c *Controller) EnableConfig(key, source string) error {
  62. cID := newConfigID(source, key)
  63. cs, ok := c.statuses[cID]
  64. if !ok {
  65. return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s does not exist", key, source)
  66. }
  67. if cs.Active {
  68. return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s is already active", key, source)
  69. }
  70. // check for configurations with the same configuration key that are already active.
  71. for confID, confStat := range c.statuses {
  72. if confID.key != key || confID.source == cID.source {
  73. continue
  74. }
  75. // if active disable
  76. if confStat.Active == true {
  77. confStat.Active = false
  78. }
  79. }
  80. cs.Active = true
  81. c.putConfig(cs.Config)
  82. c.save()
  83. return nil
  84. }
  85. // DisableConfig updates an config status if it was enabled
  86. func (c *Controller) DisableConfig(key, source string) error {
  87. iID := newConfigID(source, key)
  88. is, ok := c.statuses[iID]
  89. if !ok {
  90. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  91. }
  92. if !is.Active {
  93. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s is already disabled", key, source)
  94. }
  95. is.Active = false
  96. c.deleteConfig(iID.key)
  97. c.save()
  98. return nil
  99. }
  100. // DeleteConfig removes an config from the statuses and deletes the config on all observers if it was active
  101. func (c *Controller) DeleteConfig(key, source string) error {
  102. id := newConfigID(source, key)
  103. is, ok := c.statuses[id]
  104. if !ok {
  105. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  106. }
  107. // delete config on observers if active
  108. if is.Active {
  109. c.deleteConfig(id.key)
  110. }
  111. delete(c.statuses, id)
  112. c.save()
  113. return nil
  114. }
  115. // pullWatchers retrieve configs from watchers and update configs according to priority of sources
  116. func (c *Controller) pullWatchers() {
  117. for source, watcher := range c.watchers {
  118. for _, conf := range watcher.GetConfigs() {
  119. key := conf.Key()
  120. cID := configID{
  121. source: source,
  122. key: key,
  123. }
  124. err := conf.Validate()
  125. valid := err == nil
  126. status := Status{
  127. Key: key,
  128. Source: source,
  129. Active: valid, // active if valid, for now
  130. Valid: valid,
  131. Config: conf,
  132. }
  133. // Check existing configs for matching key and source
  134. if existingStatus, ok := c.statuses[cID]; ok {
  135. // if config has not changed continue
  136. if existingStatus.Config.Equals(conf) {
  137. continue
  138. }
  139. // if existing CS is active then it should be replaced by the updated config
  140. if existingStatus.Active {
  141. if status.Valid {
  142. c.putConfig(conf)
  143. } else {
  144. // if active config is being overwritten by an invalid one, delete the config, as it will not be active
  145. c.deleteConfig(key)
  146. }
  147. c.statuses[cID] = &status
  148. continue
  149. }
  150. }
  151. // At this point we know that the config from this watcher has changed
  152. // handle an config with a new unique key for a source or an update config from a source which was inactive before
  153. if valid {
  154. for matchID, matchCS := range c.statuses {
  155. // skip matching configs
  156. if matchID.Equals(cID) {
  157. continue
  158. }
  159. if matchCS.Active {
  160. // if source is non-multi-cloud disable all other non-multi-cloud sourced configs
  161. if cID.source == HelmSource || cID.source == ConfigFileSource {
  162. if matchID.source == HelmSource || matchID.source == ConfigFileSource {
  163. matchCS.Active = false
  164. c.deleteConfig(matchID.key)
  165. }
  166. }
  167. // check for configs with the same key that are active
  168. if matchID.key == key {
  169. // If source has higher priority disable other active configs
  170. matchCS.Active = false
  171. c.deleteConfig(matchID.key)
  172. }
  173. }
  174. }
  175. }
  176. // update config and put to observers if active
  177. c.statuses[cID] = &status
  178. if status.Active {
  179. c.putConfig(conf)
  180. }
  181. }
  182. }
  183. }
  184. // todo implement when building config api and persistence is necessary
  185. func (c *Controller) load() {}
  186. // todo implement when building config api and persistence is necessary
  187. func (c *Controller) save() {}
  188. func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
  189. configs := new(Configurations)
  190. activeConfigs := make(map[string]cloud.Config)
  191. for iID, cs := range c.statuses {
  192. if cs.Active {
  193. activeConfigs[iID.key] = cs.Config
  194. }
  195. }
  196. if key != "" {
  197. conf, ok := activeConfigs[key]
  198. if !ok {
  199. return nil, fmt.Errorf("Config with key %s does not exist or is inactive", key)
  200. }
  201. sanitizedConfig := conf.Sanitize()
  202. err := configs.Insert(sanitizedConfig)
  203. if err != nil {
  204. return nil, fmt.Errorf("failed to insert config: %w", err)
  205. }
  206. return configs, nil
  207. }
  208. for _, conf := range activeConfigs {
  209. sanitizedConfig := conf.Sanitize()
  210. err := configs.Insert(sanitizedConfig)
  211. if err != nil {
  212. return nil, fmt.Errorf("failed to insert config: %w", err)
  213. }
  214. }
  215. return configs, nil
  216. }
  217. func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
  218. bi := make(map[string]cloud.KeyedConfig)
  219. for iID, cs := range c.statuses {
  220. if cs.Active {
  221. bi[iID.key] = cs.Config
  222. }
  223. }
  224. return bi
  225. }
  226. // deleteConfig ask observers to remove and stop all processes related to a configuration with a given key
  227. func (c *Controller) deleteConfig(key string) {
  228. var wg sync.WaitGroup
  229. for _, obs := range c.observers {
  230. observer := obs
  231. wg.Add(1)
  232. go func() {
  233. defer wg.Done()
  234. observer.DeleteConfig(key)
  235. }()
  236. }
  237. wg.Wait()
  238. }
  239. // RegisterObserver gives out the current active list configs and adds the observer to the push list
  240. func (c *Controller) RegisterObserver(obs Observer) {
  241. obs.SetConfigs(c.getActiveConfigs())
  242. c.observers = append(c.observers, obs)
  243. }
  244. func (c *Controller) GetStatus() []Status {
  245. var status []Status
  246. for _, intStat := range c.statuses {
  247. status = append(status, *intStat)
  248. }
  249. return status
  250. }
  251. // putConfig gives observers a new config to handle
  252. func (c *Controller) putConfig(conf cloud.KeyedConfig) {
  253. var wg sync.WaitGroup
  254. for _, obs := range c.observers {
  255. observer := obs
  256. wg.Add(1)
  257. go func() {
  258. defer wg.Done()
  259. observer.PutConfig(conf)
  260. }()
  261. }
  262. wg.Wait()
  263. }