controller.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. package config
  2. import (
  3. "fmt"
  4. "os"
  5. "sync"
  6. "time"
  7. "github.com/opencost/opencost/core/pkg/log"
  8. "github.com/opencost/opencost/core/pkg/util/json"
  9. "github.com/opencost/opencost/core/pkg/util/timeutil"
  10. "github.com/opencost/opencost/pkg/cloud"
  11. "github.com/opencost/opencost/pkg/cloud/models"
  12. "github.com/opencost/opencost/pkg/env"
  13. )
  14. // Controller manages the cloud.Config using config Watcher(s) to track various configuration
  15. // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
  16. // upon any change detected from the config watchers.
  17. type Controller struct {
  18. storage controllerStorage
  19. lock sync.RWMutex
  20. observers []Observer
  21. watchers map[ConfigSource]cloud.KeyedConfigWatcher
  22. }
  23. // NewController initializes an Config Controller
  24. func NewController(providerConfig models.ProviderConfig) *Controller {
  25. watchers := GetCloudBillingWatchers(providerConfig)
  26. storage := &FileControllerStorage{
  27. path: env.GetCloudCostConfigControllerStateFile(),
  28. }
  29. ic := &Controller{
  30. storage: storage,
  31. watchers: watchers,
  32. }
  33. ic.start()
  34. return ic
  35. }
  36. // NewMemoryController initializes a Config Controller backed in memory
  37. func NewMemoryController(providerConfig models.ProviderConfig) *Controller {
  38. watchers := GetCloudBillingWatchers(providerConfig)
  39. ic := &Controller{
  40. storage: &MemoryControllerStorage{},
  41. watchers: watchers,
  42. }
  43. ic.start()
  44. return ic
  45. }
  46. func (c *Controller) start() {
  47. c.pullWatchers()
  48. go func() {
  49. ticker := timeutil.NewJobTicker()
  50. defer ticker.Close()
  51. for {
  52. ticker.TickIn(10 * time.Second)
  53. <-ticker.Ch
  54. c.pullWatchers()
  55. }
  56. }()
  57. }
  58. // pullWatchers retrieve configs from watchers and update configs according to priority of sources
  59. func (c *Controller) pullWatchers() {
  60. c.lock.Lock()
  61. defer c.lock.Unlock()
  62. statuses, err := c.storage.load()
  63. if err != nil {
  64. log.Warnf("Controller: pullWatchers: %s. Proceeding to create the file", err.Error())
  65. statuses = Statuses{}
  66. err = c.storage.save(statuses)
  67. if err != nil {
  68. log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
  69. }
  70. }
  71. for source, watcher := range c.watchers {
  72. watcherConfsByKey := map[string]cloud.KeyedConfig{}
  73. for _, wConf := range watcher.GetConfigs() {
  74. watcherConfsByKey[wConf.Key()] = wConf
  75. }
  76. // remove existing configs that are no longer present in the source
  77. for _, status := range statuses.List() {
  78. if status.Source == source {
  79. if _, ok := watcherConfsByKey[status.Key]; !ok {
  80. err := c.deleteConfig(status.Key, status.Source, statuses)
  81. if err != nil {
  82. log.Errorf("Controller: pullWatchers: %s", err.Error())
  83. }
  84. }
  85. }
  86. }
  87. for key, conf := range watcherConfsByKey {
  88. // Check existing configs for matching key and source
  89. if existingStatus, ok := statuses.Get(key, source); ok {
  90. // if config has not changed continue
  91. if existingStatus.Config.Equals(conf) {
  92. continue
  93. }
  94. // remove the existing config
  95. err := c.deleteConfig(key, source, statuses)
  96. if err != nil {
  97. log.Errorf("Controller: pullWatchers: %s", err.Error())
  98. }
  99. }
  100. err := conf.Validate()
  101. valid := err == nil
  102. configType, err := ConfigTypeFromConfig(conf)
  103. if err != nil {
  104. log.Errorf("Controller: pullWatchers: failed to get config type for config with key: %s", conf.Key())
  105. continue
  106. }
  107. status := Status{
  108. Key: key,
  109. Source: source,
  110. Active: valid, // if valid, then new config will be active
  111. Valid: valid,
  112. ConfigType: configType,
  113. Config: conf,
  114. }
  115. // handle a config with a new unique key for a source or an update config from a source which was inactive before
  116. if valid {
  117. for _, matchStat := range statuses.List() {
  118. //// skip matching configs
  119. //if matchID.Equals(cID) {
  120. // continue
  121. //}
  122. if matchStat.Active {
  123. // if source is non-multi-cloud disable all other non-multi-cloud sourced configs
  124. if source == HelmSource || source == ConfigFileSource {
  125. if matchStat.Source == HelmSource || matchStat.Source == ConfigFileSource {
  126. matchStat.Active = false
  127. c.broadcastRemoveConfig(matchStat.Key)
  128. }
  129. }
  130. // check for configs with the same key that are active
  131. if matchStat.Key == key {
  132. // If source has higher priority disable other active configs
  133. matchStat.Active = false
  134. c.broadcastRemoveConfig(matchStat.Key)
  135. }
  136. }
  137. }
  138. }
  139. // update config and put to observers if active
  140. statuses.Insert(&status)
  141. if status.Active {
  142. c.broadcastAddConfig(conf)
  143. }
  144. err = c.storage.save(statuses)
  145. if err != nil {
  146. log.Errorf("Controller: pullWatchers: failed to save statuses %s", err.Error())
  147. }
  148. }
  149. }
  150. }
  151. // CreateConfig adds a new config to status with a source of ConfigControllerSource
  152. // It will disable any config with the same key
  153. // fails if there is an existing config with the same key and source
  154. func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
  155. c.lock.Lock()
  156. defer c.lock.Unlock()
  157. err := conf.Validate()
  158. if err != nil {
  159. return fmt.Errorf("provided configuration was invalid: %w", err)
  160. }
  161. statuses, err := c.storage.load()
  162. if err != nil {
  163. return fmt.Errorf("failed to load statuses")
  164. }
  165. source := ConfigControllerSource
  166. key := conf.Key()
  167. _, ok := statuses.Get(key, source)
  168. if ok {
  169. return fmt.Errorf("config with key %s from source %s already exist", key, source.String())
  170. }
  171. configType, err := ConfigTypeFromConfig(conf)
  172. if err != nil {
  173. return fmt.Errorf("config did not have recoginzed config: %w", err)
  174. }
  175. statuses.Insert(&Status{
  176. Key: key,
  177. Source: source,
  178. Valid: true,
  179. Active: true,
  180. ConfigType: configType,
  181. Config: conf,
  182. })
  183. // check for configurations with the same configuration key that are already active.
  184. for _, confStat := range statuses.List() {
  185. if confStat.Key != key || confStat.Source == source {
  186. continue
  187. }
  188. // if active disable
  189. if confStat.Active == true {
  190. confStat.Active = false
  191. c.broadcastRemoveConfig(key)
  192. }
  193. }
  194. c.broadcastAddConfig(conf)
  195. err = c.storage.save(statuses)
  196. if err != nil {
  197. return fmt.Errorf("failed to save statues: %w", err)
  198. }
  199. return nil
  200. }
  201. // EnableConfig enables a config with the given key and source, and disables any config with a matching key
  202. func (c *Controller) EnableConfig(key, sourceStr string) error {
  203. c.lock.Lock()
  204. defer c.lock.Unlock()
  205. statuses, err := c.storage.load()
  206. if err != nil {
  207. return fmt.Errorf("failed to load statuses")
  208. }
  209. source := GetConfigSource(sourceStr)
  210. cs, ok := statuses.Get(key, source)
  211. if !ok {
  212. return fmt.Errorf("config with key %s from source %s does not exist", key, sourceStr)
  213. }
  214. if cs.Active {
  215. return fmt.Errorf("config with key %s from source %s is already active", key, sourceStr)
  216. }
  217. // check for configurations with the same configuration key that are already active.
  218. for _, confStat := range statuses.List() {
  219. if confStat.Key != key || confStat.Source == source {
  220. continue
  221. }
  222. // if active disable
  223. if confStat.Active == true {
  224. confStat.Active = false
  225. c.broadcastRemoveConfig(key)
  226. }
  227. }
  228. cs.Active = true
  229. c.broadcastAddConfig(cs.Config)
  230. c.storage.save(statuses)
  231. return nil
  232. }
  233. // DisableConfig updates an config status if it was enabled
  234. func (c *Controller) DisableConfig(key, sourceStr string) error {
  235. c.lock.Lock()
  236. defer c.lock.Unlock()
  237. statuses, err := c.storage.load()
  238. if err != nil {
  239. return fmt.Errorf("failed to load statuses")
  240. }
  241. source := GetConfigSource(sourceStr)
  242. is, ok := statuses.Get(key, source)
  243. if !ok {
  244. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  245. }
  246. if !is.Active {
  247. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s is already disabled", key, source)
  248. }
  249. is.Active = false
  250. c.broadcastRemoveConfig(key)
  251. c.storage.save(statuses)
  252. return nil
  253. }
  254. // DeleteConfig removes a config from the statuses and deletes the config on all observers if it was active
  255. // This can only be used on configs with ConfigControllerSource
  256. func (c *Controller) DeleteConfig(key, sourceStr string) error {
  257. c.lock.Lock()
  258. defer c.lock.Unlock()
  259. source := GetConfigSource(sourceStr)
  260. if source != ConfigControllerSource {
  261. return fmt.Errorf("controller does not own config with key %s from source %s, manage this config at its source", key, source.String())
  262. }
  263. statuses, err := c.storage.load()
  264. if err != nil {
  265. return fmt.Errorf("failed to load statuses")
  266. }
  267. err = c.deleteConfig(key, source, statuses)
  268. if err != nil {
  269. return fmt.Errorf("Controller: DeleteConfig: %w", err)
  270. }
  271. return nil
  272. }
  273. func (c *Controller) deleteConfig(key string, source ConfigSource, statuses Statuses) error {
  274. is, ok := statuses.Get(key, source)
  275. if !ok {
  276. return fmt.Errorf("config with key %s from source %s does not exist", key, source.String())
  277. }
  278. // delete config on observers if active
  279. if is.Active {
  280. c.broadcastRemoveConfig(key)
  281. }
  282. delete(statuses[source], key)
  283. c.storage.save(statuses)
  284. return nil
  285. }
  286. func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
  287. c.lock.RLock()
  288. defer c.lock.RUnlock()
  289. configs := new(Configurations)
  290. activeConfigs := c.getActiveConfigs()
  291. if key != "" {
  292. conf, ok := activeConfigs[key]
  293. if !ok {
  294. return nil, fmt.Errorf("Config with key %s does not exist or is inactive", key)
  295. }
  296. sanitizedConfig := conf.Sanitize()
  297. err := configs.Insert(sanitizedConfig)
  298. if err != nil {
  299. return nil, fmt.Errorf("failed to insert config: %w", err)
  300. }
  301. return configs, nil
  302. }
  303. for _, conf := range activeConfigs {
  304. sanitizedConfig := conf.Sanitize()
  305. err := configs.Insert(sanitizedConfig)
  306. if err != nil {
  307. return nil, fmt.Errorf("failed to insert config: %w", err)
  308. }
  309. }
  310. return configs, nil
  311. }
  312. func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
  313. activeConfigs := make(map[string]cloud.KeyedConfig)
  314. statuses, err := c.storage.load()
  315. if err != nil {
  316. log.Errorf("GetStatus: failed to load cloud statuses")
  317. }
  318. for _, cs := range statuses.List() {
  319. if cs.Active {
  320. activeConfigs[cs.Key] = cs.Config
  321. }
  322. }
  323. return activeConfigs
  324. }
  325. // broadcastRemoveConfig ask observers to remove and stop all processes related to a configuration with a given key
  326. func (c *Controller) broadcastRemoveConfig(key string) {
  327. var wg sync.WaitGroup
  328. for _, obs := range c.observers {
  329. observer := obs
  330. wg.Add(1)
  331. go func() {
  332. defer wg.Done()
  333. observer.DeleteConfig(key)
  334. }()
  335. }
  336. wg.Wait()
  337. }
  338. // broadcastAddConfig gives observers a new config to handle
  339. func (c *Controller) broadcastAddConfig(conf cloud.KeyedConfig) {
  340. var wg sync.WaitGroup
  341. for _, obs := range c.observers {
  342. observer := obs
  343. wg.Add(1)
  344. go func() {
  345. defer wg.Done()
  346. observer.PutConfig(conf)
  347. }()
  348. }
  349. wg.Wait()
  350. }
  351. // RegisterObserver gives out the current active list configs and adds the observer to the push list
  352. func (c *Controller) RegisterObserver(obs Observer) {
  353. c.lock.Lock()
  354. defer c.lock.Unlock()
  355. obs.SetConfigs(c.getActiveConfigs())
  356. c.observers = append(c.observers, obs)
  357. }
  358. func (c *Controller) GetStatus() []Status {
  359. c.lock.RLock()
  360. defer c.lock.RUnlock()
  361. var status []Status
  362. statuses, err := c.storage.load()
  363. if err != nil {
  364. log.Errorf("GetStatus: failed to load cloud statuses")
  365. }
  366. for _, intStat := range statuses.List() {
  367. status = append(status, *intStat)
  368. }
  369. return status
  370. }
  371. type controllerStorage interface {
  372. load() (Statuses, error)
  373. save(statuses Statuses) error
  374. }
  375. type FileControllerStorage struct {
  376. path string
  377. }
  378. func (fcs *FileControllerStorage) load() (Statuses, error) {
  379. raw, err := os.ReadFile(fcs.path)
  380. if err != nil {
  381. return nil, fmt.Errorf("failed to load config statuses from file: %w", err)
  382. }
  383. statuses := Statuses{}
  384. err = json.Unmarshal(raw, &statuses)
  385. if err != nil {
  386. return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
  387. }
  388. return statuses, nil
  389. }
  390. func (fcs *FileControllerStorage) save(statuses Statuses) error {
  391. raw, err := json.Marshal(statuses)
  392. if err != nil {
  393. return fmt.Errorf("failed to marshal config statuses: %s", err)
  394. }
  395. err = os.WriteFile(fcs.path, raw, 0644)
  396. if err != nil {
  397. return fmt.Errorf("failed to save config statuses to file: %s", err)
  398. }
  399. return nil
  400. }
  401. // MemoryControllerStorage is a ControllerStorage implementation that is backed by a byte array that
  402. // is marshalled in and out of to ensure that behaviours is same as the file backed version
  403. type MemoryControllerStorage struct {
  404. bytes []byte
  405. }
  406. func (mcs *MemoryControllerStorage) load() (Statuses, error) {
  407. if mcs.bytes == nil {
  408. return Statuses{}, nil
  409. }
  410. statuses := Statuses{}
  411. err := json.Unmarshal(mcs.bytes, &statuses)
  412. if err != nil {
  413. return nil, fmt.Errorf("failed to marshal config statuses: %s", err.Error())
  414. }
  415. return statuses, nil
  416. }
  417. func (mcs *MemoryControllerStorage) save(statuses Statuses) error {
  418. raw, err := json.Marshal(statuses)
  419. if err != nil {
  420. return fmt.Errorf("failed to marshal config statuses: %s", err)
  421. }
  422. mcs.bytes = raw
  423. return nil
  424. }