2
0

controller.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package config
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/opencost/opencost/core/pkg/util/timeutil"
  7. "github.com/opencost/opencost/pkg/cloud"
  8. "github.com/opencost/opencost/pkg/cloud/models"
  9. "github.com/opencost/opencost/pkg/cloud/provider"
  10. "github.com/opencost/opencost/pkg/env"
  11. )
  12. // configID identifies the source and the ID of a configuration to handle duplicate configs from multiple sources
  13. type configID struct {
  14. source ConfigSource
  15. key string
  16. }
  17. func (cid configID) Equals(that configID) bool {
  18. return cid.source == that.source && cid.key == that.key
  19. }
  20. func newConfigID(source, key string) configID {
  21. return configID{
  22. source: GetConfigSource(source),
  23. key: key,
  24. }
  25. }
  26. type Status struct {
  27. Source ConfigSource
  28. Key string
  29. Active bool
  30. Valid bool
  31. Config cloud.KeyedConfig
  32. }
  33. // Controller manages the cloud.Config using config Watcher(s) to track various configuration
  34. // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
  35. // upon any change detected from the config watchers.
  36. type Controller struct {
  37. statuses map[configID]*Status
  38. observers []Observer
  39. watchers map[ConfigSource]cloud.KeyedConfigWatcher
  40. }
  41. // NewController initializes an Config Controller
  42. func NewController(cp models.Provider) *Controller {
  43. var watchers map[ConfigSource]cloud.KeyedConfigWatcher
  44. if env.IsKubernetesEnabled() {
  45. providerConfig := provider.ExtractConfigFromProviders(cp)
  46. watchers = GetCloudBillingWatchers(providerConfig)
  47. } else {
  48. watchers = GetCloudBillingWatchers(nil)
  49. }
  50. ic := &Controller{
  51. statuses: make(map[configID]*Status),
  52. watchers: watchers,
  53. }
  54. ic.load()
  55. ic.pullWatchers()
  56. go func() {
  57. ticker := timeutil.NewJobTicker()
  58. defer ticker.Close()
  59. for {
  60. ticker.TickIn(10 * time.Second)
  61. <-ticker.Ch
  62. ic.pullWatchers()
  63. }
  64. }()
  65. return ic
  66. }
  67. func (c *Controller) EnableConfig(key, source string) error {
  68. cID := newConfigID(source, key)
  69. cs, ok := c.statuses[cID]
  70. if !ok {
  71. return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s does not exist", key, source)
  72. }
  73. if cs.Active {
  74. return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s is already active", key, source)
  75. }
  76. // check for configurations with the same configuration key that are already active.
  77. for confID, confStat := range c.statuses {
  78. if confID.key != key || confID.source == cID.source {
  79. continue
  80. }
  81. // if active disable
  82. if confStat.Active == true {
  83. confStat.Active = false
  84. }
  85. }
  86. cs.Active = true
  87. c.putConfig(cs.Config)
  88. c.save()
  89. return nil
  90. }
  91. // DisableConfig updates an config status if it was enabled
  92. func (c *Controller) DisableConfig(key, source string) error {
  93. iID := newConfigID(source, key)
  94. is, ok := c.statuses[iID]
  95. if !ok {
  96. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  97. }
  98. if !is.Active {
  99. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s is already disabled", key, source)
  100. }
  101. is.Active = false
  102. c.deleteConfig(iID.key)
  103. c.save()
  104. return nil
  105. }
  106. // DeleteConfig removes an config from the statuses and deletes the config on all observers if it was active
  107. func (c *Controller) DeleteConfig(key, source string) error {
  108. id := newConfigID(source, key)
  109. is, ok := c.statuses[id]
  110. if !ok {
  111. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  112. }
  113. // delete config on observers if active
  114. if is.Active {
  115. c.deleteConfig(id.key)
  116. }
  117. delete(c.statuses, id)
  118. c.save()
  119. return nil
  120. }
  121. // pullWatchers retrieve configs from watchers and update configs according to priority of sources
  122. func (c *Controller) pullWatchers() {
  123. for source, watcher := range c.watchers {
  124. for _, conf := range watcher.GetConfigs() {
  125. key := conf.Key()
  126. cID := configID{
  127. source: source,
  128. key: key,
  129. }
  130. err := conf.Validate()
  131. valid := err == nil
  132. status := Status{
  133. Key: key,
  134. Source: source,
  135. Active: valid, // active if valid, for now
  136. Valid: valid,
  137. Config: conf,
  138. }
  139. // Check existing configs for matching key and source
  140. if existingStatus, ok := c.statuses[cID]; ok {
  141. // if config has not changed continue
  142. if existingStatus.Config.Equals(conf) {
  143. continue
  144. }
  145. // if existing CS is active then it should be replaced by the updated config
  146. if existingStatus.Active {
  147. if status.Valid {
  148. c.putConfig(conf)
  149. } else {
  150. // if active config is being overwritten by an invalid one, delete the config, as it will not be active
  151. c.deleteConfig(key)
  152. }
  153. c.statuses[cID] = &status
  154. continue
  155. }
  156. }
  157. // At this point we know that the config from this watcher has changed
  158. // handle an config with a new unique key for a source or an update config from a source which was inactive before
  159. if valid {
  160. for matchID, matchCS := range c.statuses {
  161. // skip matching configs
  162. if matchID.Equals(cID) {
  163. continue
  164. }
  165. if matchCS.Active {
  166. // if source is non-multi-cloud disable all other non-multi-cloud sourced configs
  167. if cID.source == HelmSource || cID.source == ConfigFileSource {
  168. if matchID.source == HelmSource || matchID.source == ConfigFileSource {
  169. matchCS.Active = false
  170. c.deleteConfig(matchID.key)
  171. }
  172. }
  173. // check for configs with the same key that are active
  174. if matchID.key == key {
  175. // If source has higher priority disable other active configs
  176. matchCS.Active = false
  177. c.deleteConfig(matchID.key)
  178. }
  179. }
  180. }
  181. }
  182. // update config and put to observers if active
  183. c.statuses[cID] = &status
  184. if status.Active {
  185. c.putConfig(conf)
  186. }
  187. }
  188. }
  189. }
  190. // todo implement when building config api and persistence is necessary
  191. func (c *Controller) load() {}
  192. // todo implement when building config api and persistence is necessary
  193. func (c *Controller) save() {}
  194. func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
  195. configs := new(Configurations)
  196. activeConfigs := make(map[string]cloud.Config)
  197. for iID, cs := range c.statuses {
  198. if cs.Active {
  199. activeConfigs[iID.key] = cs.Config
  200. }
  201. }
  202. if key != "" {
  203. conf, ok := activeConfigs[key]
  204. if !ok {
  205. return nil, fmt.Errorf("Config with key %s does not exist or is inactive", key)
  206. }
  207. sanitizedConfig := conf.Sanitize()
  208. err := configs.Insert(sanitizedConfig)
  209. if err != nil {
  210. return nil, fmt.Errorf("failed to insert config: %w", err)
  211. }
  212. return configs, nil
  213. }
  214. for _, conf := range activeConfigs {
  215. sanitizedConfig := conf.Sanitize()
  216. err := configs.Insert(sanitizedConfig)
  217. if err != nil {
  218. return nil, fmt.Errorf("failed to insert config: %w", err)
  219. }
  220. }
  221. return configs, nil
  222. }
  223. func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
  224. bi := make(map[string]cloud.KeyedConfig)
  225. for iID, cs := range c.statuses {
  226. if cs.Active {
  227. bi[iID.key] = cs.Config
  228. }
  229. }
  230. return bi
  231. }
  232. // deleteConfig ask observers to remove and stop all processes related to a configuration with a given key
  233. func (c *Controller) deleteConfig(key string) {
  234. var wg sync.WaitGroup
  235. for _, obs := range c.observers {
  236. observer := obs
  237. wg.Add(1)
  238. go func() {
  239. defer wg.Done()
  240. observer.DeleteConfig(key)
  241. }()
  242. }
  243. wg.Wait()
  244. }
  245. // RegisterObserver gives out the current active list configs and adds the observer to the push list
  246. func (c *Controller) RegisterObserver(obs Observer) {
  247. obs.SetConfigs(c.getActiveConfigs())
  248. c.observers = append(c.observers, obs)
  249. }
  250. func (c *Controller) GetStatus() []Status {
  251. var status []Status
  252. for _, intStat := range c.statuses {
  253. status = append(status, *intStat)
  254. }
  255. return status
  256. }
  257. // putConfig gives observers a new config to handle
  258. func (c *Controller) putConfig(conf cloud.KeyedConfig) {
  259. var wg sync.WaitGroup
  260. for _, obs := range c.observers {
  261. observer := obs
  262. wg.Add(1)
  263. go func() {
  264. defer wg.Done()
  265. observer.PutConfig(conf)
  266. }()
  267. }
  268. wg.Wait()
  269. }