controller.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. package config
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/util/json"
  10. "github.com/opencost/opencost/core/pkg/util/timeutil"
  11. "github.com/opencost/opencost/pkg/cloud"
  12. "github.com/opencost/opencost/pkg/cloud/models"
  13. "github.com/opencost/opencost/pkg/env"
  14. )
  15. const configFile = "cloud-configurations.json"
  16. // Controller manages the cloud.Config using config Watcher(s) to track various configuration
  17. // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
  18. // upon any change detected from the config watchers.
  19. type Controller struct {
  20. storage controllerStorage
  21. lock sync.RWMutex
  22. observers []Observer
  23. watchers map[ConfigSource]cloud.KeyedConfigWatcher
  24. }
  25. // NewController initializes an Config Controller
  26. func NewController(providerConfig models.ProviderConfig) *Controller {
  27. watchers := GetCloudBillingWatchers(providerConfig)
  28. storage := &FileControllerStorage{
  29. path: filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile),
  30. }
  31. ic := &Controller{
  32. storage: storage,
  33. watchers: watchers,
  34. }
  35. ic.start()
  36. return ic
  37. }
  38. // NewMemoryController initializes a Config Controller backed in memory
  39. func NewMemoryController(providerConfig models.ProviderConfig) *Controller {
  40. watchers := GetCloudBillingWatchers(providerConfig)
  41. ic := &Controller{
  42. storage: &MemoryControllerStorage{},
  43. watchers: watchers,
  44. }
  45. ic.start()
  46. return ic
  47. }
  48. func (c *Controller) start() {
  49. c.pullWatchers()
  50. go func() {
  51. ticker := timeutil.NewJobTicker()
  52. defer ticker.Close()
  53. for {
  54. ticker.TickIn(10 * time.Second)
  55. <-ticker.Ch
  56. c.pullWatchers()
  57. }
  58. }()
  59. }
  60. // pullWatchers retrieve configs from watchers and update configs according to priority of sources
  61. func (c *Controller) pullWatchers() {
  62. c.lock.Lock()
  63. defer c.lock.Unlock()
  64. statuses, err := c.storage.load()
  65. if err != nil {
  66. log.Warnf("Controller: pullWatchers: %s. Proceeding to create the file", err.Error())
  67. statuses = Statuses{}
  68. err = c.storage.save(statuses)
  69. if err != nil {
  70. log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
  71. }
  72. }
  73. for source, watcher := range c.watchers {
  74. watcherConfsByKey := map[string]cloud.KeyedConfig{}
  75. for _, wConf := range watcher.GetConfigs() {
  76. watcherConfsByKey[wConf.Key()] = wConf
  77. }
  78. // remove existing configs that are no longer present in the source
  79. for _, status := range statuses.List() {
  80. if status.Source == source {
  81. if _, ok := watcherConfsByKey[status.Key]; !ok {
  82. err := c.deleteConfig(status.Key, status.Source, statuses)
  83. if err != nil {
  84. log.Errorf("Controller: pullWatchers: %s", err.Error())
  85. }
  86. }
  87. }
  88. }
  89. for key, conf := range watcherConfsByKey {
  90. // Check existing configs for matching key and source
  91. if existingStatus, ok := statuses.Get(key, source); ok {
  92. // if config has not changed continue
  93. if existingStatus.Config.Equals(conf) {
  94. continue
  95. }
  96. // remove the existing config
  97. err := c.deleteConfig(key, source, statuses)
  98. if err != nil {
  99. log.Errorf("Controller: pullWatchers: %s", err.Error())
  100. }
  101. }
  102. err := conf.Validate()
  103. valid := err == nil
  104. configType, err := ConfigTypeFromConfig(conf)
  105. if err != nil {
  106. log.Errorf("Controller: pullWatchers: failed to get config type for config with key: %s", conf.Key())
  107. continue
  108. }
  109. status := Status{
  110. Key: key,
  111. Source: source,
  112. Active: valid, // if valid, then new config will be active
  113. Valid: valid,
  114. ConfigType: configType,
  115. Config: conf,
  116. }
  117. // handle a config with a new unique key for a source or an update config from a source which was inactive before
  118. if valid {
  119. for _, matchStat := range statuses.List() {
  120. //// skip matching configs
  121. //if matchID.Equals(cID) {
  122. // continue
  123. //}
  124. if matchStat.Active {
  125. // if source is non-multi-cloud disable all other non-multi-cloud sourced configs
  126. if source == HelmSource || source == ConfigFileSource {
  127. if matchStat.Source == HelmSource || matchStat.Source == ConfigFileSource {
  128. matchStat.Active = false
  129. c.broadcastRemoveConfig(matchStat.Key)
  130. }
  131. }
  132. // check for configs with the same key that are active
  133. if matchStat.Key == key {
  134. // If source has higher priority disable other active configs
  135. matchStat.Active = false
  136. c.broadcastRemoveConfig(matchStat.Key)
  137. }
  138. }
  139. }
  140. }
  141. // update config and put to observers if active
  142. statuses.Insert(&status)
  143. if status.Active {
  144. c.broadcastAddConfig(conf)
  145. }
  146. err = c.storage.save(statuses)
  147. if err != nil {
  148. log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
  149. }
  150. }
  151. }
  152. }
  153. // CreateConfig adds a new config to status with a source of ConfigControllerSource
  154. // It will disable any config with the same key
  155. // fails if there is an existing config with the same key and source
  156. func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
  157. c.lock.Lock()
  158. defer c.lock.Unlock()
  159. err := conf.Validate()
  160. if err != nil {
  161. return fmt.Errorf("provided configuration was invalid: %w", err)
  162. }
  163. statuses, err := c.storage.load()
  164. if err != nil {
  165. return fmt.Errorf("failed to load statuses")
  166. }
  167. source := ConfigControllerSource
  168. key := conf.Key()
  169. _, ok := statuses.Get(key, source)
  170. if ok {
  171. return fmt.Errorf("config with key %s from source %s already exist", key, source.String())
  172. }
  173. configType, err := ConfigTypeFromConfig(conf)
  174. if err != nil {
  175. return fmt.Errorf("config did not have recoginzed config: %w", err)
  176. }
  177. statuses.Insert(&Status{
  178. Key: key,
  179. Source: source,
  180. Valid: true,
  181. Active: true,
  182. ConfigType: configType,
  183. Config: conf,
  184. })
  185. // check for configurations with the same configuration key that are already active.
  186. for _, confStat := range statuses.List() {
  187. if confStat.Key != key || confStat.Source == source {
  188. continue
  189. }
  190. // if active disable
  191. if confStat.Active == true {
  192. confStat.Active = false
  193. c.broadcastRemoveConfig(key)
  194. }
  195. }
  196. c.broadcastAddConfig(conf)
  197. err = c.storage.save(statuses)
  198. if err != nil {
  199. return fmt.Errorf("failed to save statues: %w", err)
  200. }
  201. return nil
  202. }
  203. // EnableConfig enables a config with the given key and source, and disables any config with a matching key
  204. func (c *Controller) EnableConfig(key, sourceStr string) error {
  205. c.lock.Lock()
  206. defer c.lock.Unlock()
  207. statuses, err := c.storage.load()
  208. if err != nil {
  209. return fmt.Errorf("failed to load statuses")
  210. }
  211. source := GetConfigSource(sourceStr)
  212. cs, ok := statuses.Get(key, source)
  213. if !ok {
  214. return fmt.Errorf("config with key %s from source %s does not exist", key, sourceStr)
  215. }
  216. if cs.Active {
  217. return fmt.Errorf("config with key %s from source %s is already active", key, sourceStr)
  218. }
  219. // check for configurations with the same configuration key that are already active.
  220. for _, confStat := range statuses.List() {
  221. if confStat.Key != key || confStat.Source == source {
  222. continue
  223. }
  224. // if active disable
  225. if confStat.Active == true {
  226. confStat.Active = false
  227. c.broadcastRemoveConfig(key)
  228. }
  229. }
  230. cs.Active = true
  231. c.broadcastAddConfig(cs.Config)
  232. c.storage.save(statuses)
  233. return nil
  234. }
  235. // DisableConfig updates an config status if it was enabled
  236. func (c *Controller) DisableConfig(key, sourceStr string) error {
  237. c.lock.Lock()
  238. defer c.lock.Unlock()
  239. statuses, err := c.storage.load()
  240. if err != nil {
  241. return fmt.Errorf("failed to load statuses")
  242. }
  243. source := GetConfigSource(sourceStr)
  244. is, ok := statuses.Get(key, source)
  245. if !ok {
  246. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  247. }
  248. if !is.Active {
  249. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s is already disabled", key, source)
  250. }
  251. is.Active = false
  252. c.broadcastRemoveConfig(key)
  253. c.storage.save(statuses)
  254. return nil
  255. }
  256. // DeleteConfig removes a config from the statuses and deletes the config on all observers if it was active
  257. // This can only be used on configs with ConfigControllerSource
  258. func (c *Controller) DeleteConfig(key, sourceStr string) error {
  259. c.lock.Lock()
  260. defer c.lock.Unlock()
  261. source := GetConfigSource(sourceStr)
  262. if source != ConfigControllerSource {
  263. return fmt.Errorf("controller does not own config with key %s from source %s, manage this config at its source", key, source.String())
  264. }
  265. statuses, err := c.storage.load()
  266. if err != nil {
  267. return fmt.Errorf("failed to load statuses")
  268. }
  269. err = c.deleteConfig(key, source, statuses)
  270. if err != nil {
  271. return fmt.Errorf("Controller: DeleteConfig: %w", err)
  272. }
  273. return nil
  274. }
  275. func (c *Controller) deleteConfig(key string, source ConfigSource, statuses Statuses) error {
  276. is, ok := statuses.Get(key, source)
  277. if !ok {
  278. return fmt.Errorf("config with key %s from source %s does not exist", key, source.String())
  279. }
  280. // delete config on observers if active
  281. if is.Active {
  282. c.broadcastRemoveConfig(key)
  283. }
  284. delete(statuses[source], key)
  285. c.storage.save(statuses)
  286. return nil
  287. }
  288. func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
  289. c.lock.RLock()
  290. defer c.lock.RUnlock()
  291. configs := new(Configurations)
  292. activeConfigs := c.getActiveConfigs()
  293. if key != "" {
  294. conf, ok := activeConfigs[key]
  295. if !ok {
  296. return nil, fmt.Errorf("Config with key %s does not exist or is inactive", key)
  297. }
  298. sanitizedConfig := conf.Sanitize()
  299. err := configs.Insert(sanitizedConfig)
  300. if err != nil {
  301. return nil, fmt.Errorf("failed to insert config: %w", err)
  302. }
  303. return configs, nil
  304. }
  305. for _, conf := range activeConfigs {
  306. sanitizedConfig := conf.Sanitize()
  307. err := configs.Insert(sanitizedConfig)
  308. if err != nil {
  309. return nil, fmt.Errorf("failed to insert config: %w", err)
  310. }
  311. }
  312. return configs, nil
  313. }
  314. func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
  315. activeConfigs := make(map[string]cloud.KeyedConfig)
  316. statuses, err := c.storage.load()
  317. if err != nil {
  318. log.Errorf("GetStatus: failed to load cloud statuses")
  319. }
  320. for _, cs := range statuses.List() {
  321. if cs.Active {
  322. activeConfigs[cs.Key] = cs.Config
  323. }
  324. }
  325. return activeConfigs
  326. }
  327. // broadcastRemoveConfig ask observers to remove and stop all processes related to a configuration with a given key
  328. func (c *Controller) broadcastRemoveConfig(key string) {
  329. var wg sync.WaitGroup
  330. for _, obs := range c.observers {
  331. observer := obs
  332. wg.Add(1)
  333. go func() {
  334. defer wg.Done()
  335. observer.DeleteConfig(key)
  336. }()
  337. }
  338. wg.Wait()
  339. }
  340. // broadcastAddConfig gives observers a new config to handle
  341. func (c *Controller) broadcastAddConfig(conf cloud.KeyedConfig) {
  342. var wg sync.WaitGroup
  343. for _, obs := range c.observers {
  344. observer := obs
  345. wg.Add(1)
  346. go func() {
  347. defer wg.Done()
  348. observer.PutConfig(conf)
  349. }()
  350. }
  351. wg.Wait()
  352. }
  353. // RegisterObserver gives out the current active list configs and adds the observer to the push list
  354. func (c *Controller) RegisterObserver(obs Observer) {
  355. c.lock.Lock()
  356. defer c.lock.Unlock()
  357. obs.SetConfigs(c.getActiveConfigs())
  358. c.observers = append(c.observers, obs)
  359. }
  360. func (c *Controller) GetStatus() []Status {
  361. c.lock.RLock()
  362. defer c.lock.RUnlock()
  363. var status []Status
  364. statuses, err := c.storage.load()
  365. if err != nil {
  366. log.Errorf("GetStatus: failed to load cloud statuses")
  367. }
  368. for _, intStat := range statuses.List() {
  369. status = append(status, *intStat)
  370. }
  371. return status
  372. }
  373. type controllerStorage interface {
  374. load() (Statuses, error)
  375. save(statuses Statuses) error
  376. }
  377. type FileControllerStorage struct {
  378. path string
  379. }
  380. func (fcs *FileControllerStorage) load() (Statuses, error) {
  381. raw, err := os.ReadFile(fcs.path)
  382. if err != nil {
  383. return nil, fmt.Errorf("failed to load config statuses from file: %w", err)
  384. }
  385. statuses := Statuses{}
  386. err = json.Unmarshal(raw, &statuses)
  387. if err != nil {
  388. return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
  389. }
  390. return statuses, nil
  391. }
  392. func (fcs *FileControllerStorage) save(statuses Statuses) error {
  393. raw, err := json.Marshal(statuses)
  394. if err != nil {
  395. return fmt.Errorf("failed to marshal config statuses: %s", err)
  396. }
  397. err = os.WriteFile(fcs.path, raw, 0644)
  398. if err != nil {
  399. return fmt.Errorf("failed to save config statuses to file: %s", err)
  400. }
  401. return nil
  402. }
  403. // MemoryControllerStorage is a ControllerStorage implementation that is backed by a byte array that
  404. // is marshalled in and out of to ensure that behaviours is same as the file backed version
  405. type MemoryControllerStorage struct {
  406. bytes []byte
  407. }
  408. func (mcs *MemoryControllerStorage) load() (Statuses, error) {
  409. if mcs.bytes == nil {
  410. return Statuses{}, nil
  411. }
  412. statuses := Statuses{}
  413. err := json.Unmarshal(mcs.bytes, &statuses)
  414. if err != nil {
  415. return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
  416. }
  417. return statuses, nil
  418. }
  419. func (mcs *MemoryControllerStorage) save(statuses Statuses) error {
  420. raw, err := json.Marshal(statuses)
  421. if err != nil {
  422. return fmt.Errorf("failed to marshal config statuses: %s", err)
  423. }
  424. mcs.bytes = raw
  425. return nil
  426. }