2
0

controller.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. package config
  2. import (
  3. "fmt"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/util/json"
  9. "github.com/opencost/opencost/core/pkg/util/timeutil"
  10. "github.com/opencost/opencost/pkg/cloud"
  11. "github.com/opencost/opencost/pkg/cloud/models"
  12. "github.com/opencost/opencost/pkg/env"
  13. )
  14. // Controller manages the cloud.Config using config Watcher(s) to track various configuration
  15. // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
  16. // upon any change detected from the config watchers.
  17. type Controller struct {
  18. storage controllerStorage
  19. lock sync.RWMutex
  20. observers []Observer
  21. watchers map[ConfigSource]cloud.KeyedConfigWatcher
  22. }
  23. // NewController initializes an Config Controller
  24. func NewController(providerConfig models.ProviderConfig) *Controller {
  25. watchers := GetCloudBillingWatchers(providerConfig)
  26. storage := &FileControllerStorage{
  27. path: env.GetCloudCostConfigControllerStateFile(),
  28. }
  29. ic := &Controller{
  30. storage: storage,
  31. watchers: watchers,
  32. }
  33. ic.start()
  34. return ic
  35. }
  36. // NewMemoryController initializes a Config Controller backed in memory
  37. func NewMemoryController(providerConfig models.ProviderConfig) *Controller {
  38. watchers := GetCloudBillingWatchers(providerConfig)
  39. ic := &Controller{
  40. storage: &MemoryControllerStorage{},
  41. watchers: watchers,
  42. }
  43. ic.start()
  44. return ic
  45. }
  46. func (c *Controller) start() {
  47. c.pullWatchers()
  48. go func() {
  49. ticker := timeutil.NewJobTicker()
  50. defer ticker.Close()
  51. for {
  52. ticker.TickIn(10 * time.Second)
  53. <-ticker.Ch
  54. c.pullWatchers()
  55. }
  56. }()
  57. }
  58. // pullWatchers retrieve configs from watchers and update configs according to priority of sources
  59. func (c *Controller) pullWatchers() {
  60. c.lock.Lock()
  61. defer c.lock.Unlock()
  62. statuses, err := c.storage.load()
  63. if err != nil {
  64. log.Warnf("Controller: pullWatchers: %s. Proceeding to create the file", err.Error())
  65. statuses = Statuses{}
  66. err = c.storage.save(statuses)
  67. if err != nil {
  68. log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
  69. }
  70. }
  71. for source, watcher := range c.watchers {
  72. watcherConfsByKey := map[string]cloud.KeyedConfig{}
  73. for _, wConf := range watcher.GetConfigs() {
  74. watcherConfsByKey[wConf.Key()] = wConf
  75. }
  76. // remove existing configs that are no longer present in the source
  77. for _, status := range statuses.List() {
  78. if status.Source == source {
  79. if _, ok := watcherConfsByKey[status.Key]; !ok {
  80. err := c.deleteConfig(status.Key, status.Source, statuses)
  81. if err != nil {
  82. log.Errorf("Controller: pullWatchers: %s", err.Error())
  83. }
  84. }
  85. }
  86. }
  87. for key, conf := range watcherConfsByKey {
  88. // Check existing configs for matching key and source
  89. if existingStatus, ok := statuses.Get(key, source); ok {
  90. // if config has not changed continue
  91. if existingStatus.Config.Equals(conf) {
  92. continue
  93. }
  94. // remove the existing config
  95. err := c.deleteConfig(key, source, statuses)
  96. if err != nil {
  97. log.Errorf("Controller: pullWatchers: %s", err.Error())
  98. }
  99. }
  100. err := conf.Validate()
  101. valid := err == nil
  102. configType, err := ConfigTypeFromConfig(conf)
  103. if err != nil {
  104. log.Errorf("Controller: pullWatchers: failed to get config type for config with key: %s", conf.Key())
  105. continue
  106. }
  107. status := Status{
  108. Key: key,
  109. Source: source,
  110. Active: valid, // if valid, then new config will be active
  111. Valid: valid,
  112. ConfigType: configType,
  113. Config: conf,
  114. }
  115. // handle a config with a new unique key for a source or an update config from a source which was inactive before
  116. if valid {
  117. for _, matchStat := range statuses.List() {
  118. //// skip matching configs
  119. //if matchID.Equals(cID) {
  120. // continue
  121. //}
  122. if matchStat.Active {
  123. // if source is non-multi-cloud disable all other non-multi-cloud sourced configs
  124. if source == HelmSource || source == ConfigFileSource {
  125. if matchStat.Source == HelmSource || matchStat.Source == ConfigFileSource {
  126. matchStat.Active = false
  127. c.broadcastRemoveConfig(matchStat.Key)
  128. }
  129. }
  130. // check for configs with the same key that are active
  131. if matchStat.Key == key {
  132. // If source has higher priority disable other active configs
  133. matchStat.Active = false
  134. c.broadcastRemoveConfig(matchStat.Key)
  135. }
  136. }
  137. }
  138. }
  139. // update config and put to observers if active
  140. statuses.Insert(&status)
  141. if status.Active {
  142. c.broadcastAddConfig(conf)
  143. }
  144. err = c.storage.save(statuses)
  145. if err != nil {
  146. log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
  147. }
  148. }
  149. }
  150. }
  151. // CreateConfig adds a new config to status with a source of ConfigControllerSource
  152. // It will disable any config with the same key
  153. // fails if there is an existing config with the same key and source
  154. func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
  155. c.lock.Lock()
  156. defer c.lock.Unlock()
  157. err := conf.Validate()
  158. if err != nil {
  159. return fmt.Errorf("provided configuration was invalid: %w", err)
  160. }
  161. statuses, err := c.storage.load()
  162. if err != nil {
  163. return fmt.Errorf("failed to load statuses: %w", err)
  164. }
  165. source := ConfigControllerSource
  166. key := conf.Key()
  167. _, ok := statuses.Get(key, source)
  168. if ok {
  169. return fmt.Errorf("config with key %s from source %s already exists", key, source.String())
  170. }
  171. configType, err := ConfigTypeFromConfig(conf)
  172. if err != nil {
  173. return fmt.Errorf("provided config did not have recoginzed config type: %w", err)
  174. }
  175. statuses.Insert(&Status{
  176. Key: key,
  177. Source: source,
  178. Valid: true,
  179. Active: true,
  180. ConfigType: configType,
  181. Config: conf,
  182. })
  183. // check for configurations with the same configuration key that are already active.
  184. for _, confStat := range statuses.List() {
  185. if confStat.Key != key || confStat.Source == source {
  186. continue
  187. }
  188. // if active, disable and remove from observers
  189. if confStat.Active {
  190. confStat.Active = false
  191. c.broadcastRemoveConfig(key)
  192. }
  193. }
  194. c.broadcastAddConfig(conf)
  195. err = c.storage.save(statuses)
  196. if err != nil {
  197. return fmt.Errorf("failed to save statues: %w", err)
  198. }
  199. return nil
  200. }
  201. // UpdateConfig updates an existing config in status with a source of ConfigControllerSource
  202. // It fails if the provided config does not already exist and if the config to be updated was not created by the config controller
  203. func (c *Controller) UpdateConfig(conf cloud.KeyedConfig) error {
  204. c.lock.Lock()
  205. defer c.lock.Unlock()
  206. err := conf.Validate()
  207. if err != nil {
  208. return fmt.Errorf("provided configuration was invalid: %w", err)
  209. }
  210. statuses, err := c.storage.load()
  211. if err != nil {
  212. return fmt.Errorf("failed to load statuses: %w", err)
  213. }
  214. source := ConfigControllerSource
  215. key := conf.Key()
  216. _, ok := statuses.Get(key, source)
  217. // we fail here if we're attempting to update any config that isn't created by the config controller
  218. if !ok {
  219. return fmt.Errorf("unable to find existing config with key %s and source %s", key, source.String())
  220. }
  221. configType, err := ConfigTypeFromConfig(conf)
  222. if err != nil {
  223. return fmt.Errorf("config did not have recoginzed config: %w", err)
  224. }
  225. // check for configurations with the same configuration key that are already active
  226. // and disable them before inserting the updated config
  227. for _, confStat := range statuses.List() {
  228. if confStat.Key != key {
  229. continue
  230. }
  231. // disable previous active configurations
  232. // note that this will not disable configs with the same key that are not created by config controller
  233. if confStat.Source == ConfigControllerSource && confStat.Active {
  234. confStat.Active = false
  235. c.broadcastRemoveConfig(key)
  236. }
  237. if confStat.Source != ConfigControllerSource && confStat.Active {
  238. log.Debugf("Detected active config with key %s that was not created by the config controller", key)
  239. }
  240. }
  241. statuses.Insert(&Status{
  242. Key: key,
  243. Source: source,
  244. Valid: true,
  245. Active: true,
  246. ConfigType: configType,
  247. Config: conf,
  248. })
  249. c.broadcastAddConfig(conf)
  250. err = c.storage.save(statuses)
  251. if err != nil {
  252. return fmt.Errorf("failed to save statues: %w", err)
  253. }
  254. return nil
  255. }
  256. // EnableConfig enables a config with the given key and source, and disables any config with a matching key
  257. func (c *Controller) EnableConfig(key, sourceStr string) error {
  258. c.lock.Lock()
  259. defer c.lock.Unlock()
  260. statuses, err := c.storage.load()
  261. if err != nil {
  262. return fmt.Errorf("failed to load statuses")
  263. }
  264. source := GetConfigSource(sourceStr)
  265. cs, ok := statuses.Get(key, source)
  266. if !ok {
  267. return fmt.Errorf("config with key %s from source %s does not exist", key, sourceStr)
  268. }
  269. if cs.Active {
  270. return fmt.Errorf("config with key %s from source %s is already active", key, sourceStr)
  271. }
  272. // check for configurations with the same configuration key that are already active.
  273. for _, confStat := range statuses.List() {
  274. if confStat.Key != key || confStat.Source == source {
  275. continue
  276. }
  277. // if active disable
  278. if confStat.Active == true {
  279. confStat.Active = false
  280. c.broadcastRemoveConfig(key)
  281. }
  282. }
  283. cs.Active = true
  284. c.broadcastAddConfig(cs.Config)
  285. c.storage.save(statuses)
  286. return nil
  287. }
  288. // DisableConfig updates an config status if it was enabled
  289. func (c *Controller) DisableConfig(key, sourceStr string) error {
  290. c.lock.Lock()
  291. defer c.lock.Unlock()
  292. statuses, err := c.storage.load()
  293. if err != nil {
  294. return fmt.Errorf("failed to load statuses")
  295. }
  296. source := GetConfigSource(sourceStr)
  297. is, ok := statuses.Get(key, source)
  298. if !ok {
  299. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  300. }
  301. if !is.Active {
  302. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s is already disabled", key, source)
  303. }
  304. is.Active = false
  305. c.broadcastRemoveConfig(key)
  306. c.storage.save(statuses)
  307. return nil
  308. }
  309. // DeleteConfig removes a config from the statuses and deletes the config on all observers if it was active
  310. // This can only be used on configs with ConfigControllerSource
  311. func (c *Controller) DeleteConfig(key, sourceStr string) error {
  312. c.lock.Lock()
  313. defer c.lock.Unlock()
  314. source := GetConfigSource(sourceStr)
  315. if source != ConfigControllerSource {
  316. return fmt.Errorf("controller does not own config with key %s from source %s, manage this config at its source", key, source.String())
  317. }
  318. statuses, err := c.storage.load()
  319. if err != nil {
  320. return fmt.Errorf("failed to load statuses")
  321. }
  322. err = c.deleteConfig(key, source, statuses)
  323. if err != nil {
  324. return fmt.Errorf("Controller: DeleteConfig: %w", err)
  325. }
  326. return nil
  327. }
  328. func (c *Controller) deleteConfig(key string, source ConfigSource, statuses Statuses) error {
  329. is, ok := statuses.Get(key, source)
  330. if !ok {
  331. return fmt.Errorf("config with key %s from source %s does not exist", key, source.String())
  332. }
  333. // delete config on observers if active
  334. if is.Active {
  335. c.broadcastRemoveConfig(key)
  336. }
  337. delete(statuses[source], key)
  338. c.storage.save(statuses)
  339. return nil
  340. }
  341. func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
  342. c.lock.RLock()
  343. defer c.lock.RUnlock()
  344. configs := new(Configurations)
  345. activeConfigs := c.getActiveConfigs()
  346. if key != "" {
  347. conf, ok := activeConfigs[key]
  348. if !ok {
  349. return nil, fmt.Errorf("Config with key %s does not exist or is inactive", key)
  350. }
  351. sanitizedConfig := conf.Sanitize()
  352. err := configs.Insert(sanitizedConfig)
  353. if err != nil {
  354. return nil, fmt.Errorf("failed to insert config: %w", err)
  355. }
  356. return configs, nil
  357. }
  358. for _, conf := range activeConfigs {
  359. sanitizedConfig := conf.Sanitize()
  360. err := configs.Insert(sanitizedConfig)
  361. if err != nil {
  362. return nil, fmt.Errorf("failed to insert config: %w", err)
  363. }
  364. }
  365. return configs, nil
  366. }
  367. func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
  368. activeConfigs := make(map[string]cloud.KeyedConfig)
  369. statuses, err := c.storage.load()
  370. if err != nil {
  371. log.Errorf("GetStatus: failed to load cloud statuses")
  372. }
  373. for _, cs := range statuses.List() {
  374. if cs.Active {
  375. activeConfigs[cs.Key] = cs.Config
  376. }
  377. }
  378. return activeConfigs
  379. }
  380. // broadcastRemoveConfig ask observers to remove and stop all processes related to a configuration with a given key
  381. func (c *Controller) broadcastRemoveConfig(key string) {
  382. var wg sync.WaitGroup
  383. for _, obs := range c.observers {
  384. observer := obs
  385. wg.Add(1)
  386. go func() {
  387. defer wg.Done()
  388. observer.DeleteConfig(key)
  389. }()
  390. }
  391. wg.Wait()
  392. }
  393. // broadcastAddConfig gives observers a new config to handle
  394. func (c *Controller) broadcastAddConfig(conf cloud.KeyedConfig) {
  395. var wg sync.WaitGroup
  396. for _, obs := range c.observers {
  397. observer := obs
  398. wg.Add(1)
  399. go func() {
  400. defer wg.Done()
  401. observer.PutConfig(conf)
  402. }()
  403. }
  404. wg.Wait()
  405. }
  406. // RegisterObserver gives out the current active list configs and adds the observer to the push list
  407. func (c *Controller) RegisterObserver(obs Observer) {
  408. c.lock.Lock()
  409. defer c.lock.Unlock()
  410. obs.SetConfigs(c.getActiveConfigs())
  411. c.observers = append(c.observers, obs)
  412. }
  413. func (c *Controller) GetStatus() []Status {
  414. c.lock.RLock()
  415. defer c.lock.RUnlock()
  416. var status []Status
  417. statuses, err := c.storage.load()
  418. if err != nil {
  419. log.Errorf("GetStatus: failed to load cloud statuses")
  420. }
  421. for _, intStat := range statuses.List() {
  422. status = append(status, *intStat)
  423. }
  424. return status
  425. }
  426. type controllerStorage interface {
  427. load() (Statuses, error)
  428. save(statuses Statuses) error
  429. }
  430. type FileControllerStorage struct {
  431. path string
  432. }
  433. func (fcs *FileControllerStorage) load() (Statuses, error) {
  434. raw, err := os.ReadFile(fcs.path)
  435. if err != nil {
  436. return nil, fmt.Errorf("failed to load config statuses from file: %w", err)
  437. }
  438. statuses := Statuses{}
  439. err = json.Unmarshal(raw, &statuses)
  440. if err != nil {
  441. return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
  442. }
  443. return statuses, nil
  444. }
  445. func (fcs *FileControllerStorage) save(statuses Statuses) error {
  446. raw, err := json.Marshal(statuses)
  447. if err != nil {
  448. return fmt.Errorf("failed to marshal config statuses: %s", err)
  449. }
  450. err = os.WriteFile(fcs.path, raw, 0644)
  451. if err != nil {
  452. return fmt.Errorf("failed to save config statuses to file: %s", err)
  453. }
  454. return nil
  455. }
  456. // MemoryControllerStorage is a ControllerStorage implementation that is backed by a byte array that
  457. // is marshalled in and out of to ensure that behaviours is same as the file backed version
  458. type MemoryControllerStorage struct {
  459. bytes []byte
  460. }
  461. func (mcs *MemoryControllerStorage) load() (Statuses, error) {
  462. if mcs.bytes == nil {
  463. return Statuses{}, nil
  464. }
  465. statuses := Statuses{}
  466. err := json.Unmarshal(mcs.bytes, &statuses)
  467. if err != nil {
  468. return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
  469. }
  470. return statuses, nil
  471. }
  472. func (mcs *MemoryControllerStorage) save(statuses Statuses) error {
  473. raw, err := json.Marshal(statuses)
  474. if err != nil {
  475. return fmt.Errorf("failed to marshal config statuses: %s", err)
  476. }
  477. mcs.bytes = raw
  478. return nil
  479. }