controller.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. package config
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/opencost/opencost/core/pkg/log"
  9. "github.com/opencost/opencost/core/pkg/util/json"
  10. "github.com/opencost/opencost/core/pkg/util/timeutil"
  11. "github.com/opencost/opencost/pkg/cloud"
  12. "github.com/opencost/opencost/pkg/cloud/models"
  13. "github.com/opencost/opencost/pkg/cloud/provider"
  14. "github.com/opencost/opencost/pkg/env"
  15. )
  16. // Controller manages the cloud.Config using config Watcher(s) to track various configuration
  17. // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
  18. // upon any change detected from the config watchers.
  19. type Controller struct {
  20. lock sync.RWMutex
  21. observers []Observer
  22. watchers map[ConfigSource]cloud.KeyedConfigWatcher
  23. }
  24. // NewController initializes an Config Controller
  25. func NewController(cp models.Provider) *Controller {
  26. var watchers map[ConfigSource]cloud.KeyedConfigWatcher
  27. if env.IsKubernetesEnabled() {
  28. providerConfig := provider.ExtractConfigFromProviders(cp)
  29. watchers = GetCloudBillingWatchers(providerConfig)
  30. } else {
  31. watchers = GetCloudBillingWatchers(nil)
  32. }
  33. ic := &Controller{
  34. watchers: watchers,
  35. }
  36. ic.pullWatchers()
  37. go func() {
  38. ticker := timeutil.NewJobTicker()
  39. defer ticker.Close()
  40. for {
  41. ticker.TickIn(10 * time.Second)
  42. <-ticker.Ch
  43. ic.pullWatchers()
  44. }
  45. }()
  46. return ic
  47. }
  48. // pullWatchers retrieve configs from watchers and update configs according to priority of sources
  49. func (c *Controller) pullWatchers() {
  50. c.lock.Lock()
  51. defer c.lock.Unlock()
  52. statuses, err := c.load()
  53. if err != nil {
  54. log.Errorf("failed to load statuses when pulling watchers %s", err)
  55. statuses = Statuses{}
  56. }
  57. for source, watcher := range c.watchers {
  58. watcherConfsByKey := map[string]cloud.KeyedConfig{}
  59. for _, wConf := range watcher.GetConfigs() {
  60. watcherConfsByKey[wConf.Key()] = wConf
  61. }
  62. // remove existing configs that are no longer present in the source
  63. for _, status := range statuses.List() {
  64. if status.Source == source {
  65. if _, ok := watcherConfsByKey[status.Key]; !ok {
  66. err := c.deleteConfig(status.Key, status.Source, statuses)
  67. if err != nil {
  68. log.Errorf("Conrtoller: pullWatchers: %s", err.Error())
  69. }
  70. }
  71. }
  72. }
  73. for key, conf := range watcherConfsByKey {
  74. // Check existing configs for matching key and source
  75. if existingStatus, ok := statuses.Get(key, source); ok {
  76. // if config has not changed continue
  77. if existingStatus.Config.Equals(conf) {
  78. continue
  79. }
  80. // remove the existing config
  81. err := c.deleteConfig(key, source, statuses)
  82. if err != nil {
  83. log.Errorf("Conrtoller: pullWatchers: %s", err.Error())
  84. }
  85. }
  86. err := conf.Validate()
  87. valid := err == nil
  88. configType, err := ConfigTypeFromConfig(conf)
  89. if err != nil {
  90. log.Errorf("failed to get config type for config with key: %s", conf.Key())
  91. continue
  92. }
  93. status := Status{
  94. Key: key,
  95. Source: source,
  96. Active: valid, // if valid, then new config will be active
  97. Valid: valid,
  98. ConfigType: configType,
  99. Config: conf,
  100. }
  101. // handle a config with a new unique key for a source or an update config from a source which was inactive before
  102. if valid {
  103. for _, matchStat := range statuses.List() {
  104. //// skip matching configs
  105. //if matchID.Equals(cID) {
  106. // continue
  107. //}
  108. if matchStat.Active {
  109. // if source is non-multi-cloud disable all other non-multi-cloud sourced configs
  110. if source == HelmSource || source == ConfigFileSource {
  111. if matchStat.Source == HelmSource || matchStat.Source == ConfigFileSource {
  112. matchStat.Active = false
  113. c.broadcastRemoveConfig(matchStat.Key)
  114. }
  115. }
  116. // check for configs with the same key that are active
  117. if matchStat.Key == key {
  118. // If source has higher priority disable other active configs
  119. matchStat.Active = false
  120. c.broadcastRemoveConfig(matchStat.Key)
  121. }
  122. }
  123. }
  124. }
  125. // update config and put to observers if active
  126. statuses.Insert(&status)
  127. if status.Active {
  128. c.broadcastAddConfig(conf)
  129. }
  130. err = c.save(statuses)
  131. if err != nil {
  132. fmt.Errorf("failed to save statuses %s", err.Error())
  133. }
  134. }
  135. }
  136. }
  137. // CreateConfig adds a new config to status with a source of ConfigControllerSource
  138. // It will disable any config with the same key
  139. // fails if there is an existing config with the same key and source
  140. func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
  141. c.lock.Lock()
  142. defer c.lock.Unlock()
  143. err := conf.Validate()
  144. if err != nil {
  145. return fmt.Errorf("provided configuration was invalid: %s", err.Error())
  146. }
  147. statuses, err := c.load()
  148. if err != nil {
  149. return fmt.Errorf("failed to load statuses")
  150. }
  151. source := ConfigControllerSource
  152. key := conf.Key()
  153. _, ok := statuses.Get(key, source)
  154. if ok {
  155. return fmt.Errorf("config with key %s from source %s already exist", key, source.String())
  156. }
  157. statuses.Insert(&Status{
  158. Key: key,
  159. Source: source,
  160. Valid: true,
  161. Active: true,
  162. Config: conf,
  163. })
  164. // check for configurations with the same configuration key that are already active.
  165. for _, confStat := range statuses.List() {
  166. if confStat.Key != key || confStat.Source == source {
  167. continue
  168. }
  169. // if active disable
  170. if confStat.Active == true {
  171. confStat.Active = false
  172. c.broadcastRemoveConfig(key)
  173. }
  174. }
  175. c.broadcastAddConfig(conf)
  176. c.save(statuses)
  177. return nil
  178. }
  179. // EnableConfig enables a config with the given key and source, and disables any config with a matching key
  180. func (c *Controller) EnableConfig(key, sourceStr string) error {
  181. c.lock.Lock()
  182. defer c.lock.Unlock()
  183. statuses, err := c.load()
  184. if err != nil {
  185. return fmt.Errorf("failed to load statuses")
  186. }
  187. source := GetConfigSource(sourceStr)
  188. cs, ok := statuses.Get(key, source)
  189. if !ok {
  190. return fmt.Errorf("config with key %s from source %s does not exist", key, sourceStr)
  191. }
  192. if cs.Active {
  193. return fmt.Errorf("config with key %s from source %s is already active", key, sourceStr)
  194. }
  195. // check for configurations with the same configuration key that are already active.
  196. for _, confStat := range statuses.List() {
  197. if confStat.Key != key || confStat.Source == source {
  198. continue
  199. }
  200. // if active disable
  201. if confStat.Active == true {
  202. confStat.Active = false
  203. c.broadcastRemoveConfig(key)
  204. }
  205. }
  206. cs.Active = true
  207. c.broadcastAddConfig(cs.Config)
  208. c.save(statuses)
  209. return nil
  210. }
  211. // DisableConfig updates an config status if it was enabled
  212. func (c *Controller) DisableConfig(key, sourceStr string) error {
  213. c.lock.Lock()
  214. defer c.lock.Unlock()
  215. statuses, err := c.load()
  216. if err != nil {
  217. return fmt.Errorf("failed to load statuses")
  218. }
  219. source := GetConfigSource(sourceStr)
  220. is, ok := statuses.Get(key, source)
  221. if !ok {
  222. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
  223. }
  224. if !is.Active {
  225. return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s is already disabled", key, source)
  226. }
  227. is.Active = false
  228. c.broadcastRemoveConfig(key)
  229. c.save(statuses)
  230. return nil
  231. }
  232. // DeleteConfig removes an config from the statuses and deletes the config on all observers if it was active
  233. func (c *Controller) DeleteConfig(key, source string) error {
  234. c.lock.Lock()
  235. defer c.lock.Unlock()
  236. statuses, err := c.load()
  237. if err != nil {
  238. return fmt.Errorf("failed to load statuses")
  239. }
  240. err = c.deleteConfig(key, GetConfigSource(source), statuses)
  241. if err != nil {
  242. return fmt.Errorf("Controller: DeleteConfig: %w", err)
  243. }
  244. return nil
  245. }
  246. func (c *Controller) deleteConfig(key string, source ConfigSource, statuses Statuses) error {
  247. if source != ConfigControllerSource {
  248. return fmt.Errorf("controller does not own config with key %s from source %s, manage this config at its source", key, source.String())
  249. }
  250. is, ok := statuses.Get(key, source)
  251. if !ok {
  252. return fmt.Errorf("config with key %s from source %s does not exist", key, source.String())
  253. }
  254. // delete config on observers if active
  255. if is.Active {
  256. c.broadcastRemoveConfig(key)
  257. }
  258. delete(statuses[source], key)
  259. c.save(statuses)
  260. return nil
  261. }
  262. const configFile = "cloud-configurations.json"
  263. func (c *Controller) load() (Statuses, error) {
  264. filePath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile)
  265. raw, err := os.ReadFile(filePath)
  266. if err != nil {
  267. return nil, fmt.Errorf("ConfigController: failed to load config statuses from file: %w", err)
  268. }
  269. statuses := Statuses{}
  270. err = json.Unmarshal(raw, &statuses)
  271. if err != nil {
  272. return nil, fmt.Errorf("ConfigController: failed to marshal config statuses: %s", err.Error())
  273. }
  274. return statuses, nil
  275. }
  276. func (c *Controller) save(statuses Statuses) error {
  277. filePath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile)
  278. raw, err := json.Marshal(statuses)
  279. if err != nil {
  280. return fmt.Errorf("ConfigController: failed to marshal config statuses: %s", err)
  281. }
  282. err = os.WriteFile(filePath, raw, 0644)
  283. if err != nil {
  284. return fmt.Errorf("ConfigController: failed to save config statuses to file: %s", err)
  285. }
  286. return nil
  287. }
  288. func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
  289. c.lock.RLock()
  290. defer c.lock.RUnlock()
  291. configs := new(Configurations)
  292. activeConfigs := c.getActiveConfigs()
  293. if key != "" {
  294. conf, ok := activeConfigs[key]
  295. if !ok {
  296. return nil, fmt.Errorf("Config with key %s does not exist or is inactive", key)
  297. }
  298. sanitizedConfig := conf.Sanitize()
  299. err := configs.Insert(sanitizedConfig)
  300. if err != nil {
  301. return nil, fmt.Errorf("failed to insert config: %w", err)
  302. }
  303. return configs, nil
  304. }
  305. for _, conf := range activeConfigs {
  306. sanitizedConfig := conf.Sanitize()
  307. err := configs.Insert(sanitizedConfig)
  308. if err != nil {
  309. return nil, fmt.Errorf("failed to insert config: %w", err)
  310. }
  311. }
  312. return configs, nil
  313. }
  314. func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
  315. activeConfigs := make(map[string]cloud.KeyedConfig)
  316. statuses, err := c.load()
  317. if err != nil {
  318. log.Errorf("GetStatus: failed to load cloud statuses")
  319. }
  320. for _, cs := range statuses.List() {
  321. if cs.Active {
  322. activeConfigs[cs.Key] = cs.Config
  323. }
  324. }
  325. return activeConfigs
  326. }
  327. // broadcastRemoveConfig ask observers to remove and stop all processes related to a configuration with a given key
  328. func (c *Controller) broadcastRemoveConfig(key string) {
  329. var wg sync.WaitGroup
  330. for _, obs := range c.observers {
  331. observer := obs
  332. wg.Add(1)
  333. go func() {
  334. defer wg.Done()
  335. observer.DeleteConfig(key)
  336. }()
  337. }
  338. wg.Wait()
  339. }
  340. // broadcastAddConfig gives observers a new config to handle
  341. func (c *Controller) broadcastAddConfig(conf cloud.KeyedConfig) {
  342. var wg sync.WaitGroup
  343. for _, obs := range c.observers {
  344. observer := obs
  345. wg.Add(1)
  346. go func() {
  347. defer wg.Done()
  348. observer.PutConfig(conf)
  349. }()
  350. }
  351. wg.Wait()
  352. }
  353. // RegisterObserver gives out the current active list configs and adds the observer to the push list
  354. func (c *Controller) RegisterObserver(obs Observer) {
  355. c.lock.Lock()
  356. defer c.lock.Unlock()
  357. obs.SetConfigs(c.getActiveConfigs())
  358. c.observers = append(c.observers, obs)
  359. }
  360. func (c *Controller) GetStatus() []Status {
  361. c.lock.RLock()
  362. defer c.lock.RUnlock()
  363. var status []Status
  364. statuses, err := c.load()
  365. if err != nil {
  366. log.Errorf("GetStatus: failed to load cloud statuses")
  367. }
  368. for _, intStat := range statuses.List() {
  369. status = append(status, *intStat)
  370. }
  371. return status
  372. }