configwatchers.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package watcher
  2. import (
  3. "context"
  4. "sync/atomic"
  5. "github.com/opencost/opencost/core/pkg/log"
  6. "github.com/opencost/opencost/pkg/clustercache"
  7. v1 "k8s.io/api/core/v1"
  8. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  9. "k8s.io/apimachinery/pkg/fields"
  10. "k8s.io/client-go/kubernetes"
  11. )
  12. // ConfigMapWatcher represents a single configmap watcher
  13. type ConfigMapWatcher struct {
  14. ConfigMapName string
  15. WatchFunc func(string, map[string]string) error
  16. }
  17. type ConfigMapWatchers struct {
  18. kubeClientset kubernetes.Interface
  19. namespace string
  20. watchers map[string][]*ConfigMapWatcher
  21. watchController clustercache.WatchController
  22. started atomic.Bool
  23. stop chan struct{}
  24. }
  25. func NewConfigMapWatchers(kubeClientset kubernetes.Interface, namespace string, watchers ...*ConfigMapWatcher) *ConfigMapWatchers {
  26. var stopCh chan struct{}
  27. var watchController clustercache.WatchController
  28. if kubeClientset != nil {
  29. coreRestClient := kubeClientset.CoreV1().RESTClient()
  30. watchController = clustercache.NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, namespace, fields.Everything())
  31. stopCh = make(chan struct{})
  32. // a bit awkward here, but since we'll mostly be deferring adding a watcher after initializing k8s,
  33. // we'll warmup and start the actual watcher here
  34. watchController.WarmUp(stopCh)
  35. go watchController.Run(1, stopCh)
  36. }
  37. cmw := &ConfigMapWatchers{
  38. kubeClientset: kubeClientset,
  39. namespace: namespace,
  40. watchController: watchController,
  41. watchers: make(map[string][]*ConfigMapWatcher),
  42. stop: stopCh,
  43. }
  44. for _, w := range watchers {
  45. cmw.AddWatcher(w)
  46. }
  47. return cmw
  48. }
  49. func (cmw *ConfigMapWatchers) AddWatcher(watcher *ConfigMapWatcher) {
  50. if cmw.started.Load() {
  51. log.Warnf("Cannot add watcher %s after starting", watcher.ConfigMapName)
  52. return
  53. }
  54. if watcher == nil {
  55. return
  56. }
  57. name := watcher.ConfigMapName
  58. cmw.watchers[name] = append(cmw.watchers[name], watcher)
  59. }
  60. func (cmw *ConfigMapWatchers) Add(configMapName string, watchFunc func(string, map[string]string) error) {
  61. cmw.AddWatcher(&ConfigMapWatcher{
  62. ConfigMapName: configMapName,
  63. WatchFunc: watchFunc,
  64. })
  65. }
  66. func (cmw *ConfigMapWatchers) Watch() {
  67. if cmw.kubeClientset == nil {
  68. return
  69. }
  70. if !cmw.started.CompareAndSwap(false, true) {
  71. log.Warnf("Already started")
  72. return
  73. }
  74. watchConfigFunc := cmw.toWatchFunc()
  75. // We need an initial invocation because the init of the cache has happened before we had access to the provider.
  76. for cw := range cmw.watchers {
  77. configs, err := cmw.kubeClientset.CoreV1().ConfigMaps(cmw.namespace).Get(context.Background(), cw, metav1.GetOptions{})
  78. if err != nil {
  79. log.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
  80. } else {
  81. log.Infof("Found configmap %s, watching...", configs.Name)
  82. watchConfigFunc(configs)
  83. }
  84. }
  85. cmw.watchController.SetUpdateHandler(watchConfigFunc)
  86. }
  87. func (cmw *ConfigMapWatchers) Stop() {
  88. if cmw.stop == nil {
  89. return
  90. }
  91. close(cmw.stop)
  92. cmw.stop = nil
  93. }
  94. func (cmw *ConfigMapWatchers) toWatchFunc() func(any) {
  95. return func(c any) {
  96. conf, ok := c.(*v1.ConfigMap)
  97. if !ok {
  98. return
  99. }
  100. name := conf.GetName()
  101. data := conf.Data
  102. if watchers, ok := cmw.watchers[name]; ok {
  103. for _, cw := range watchers {
  104. err := cw.WatchFunc(name, data)
  105. if err != nil {
  106. log.Infof("ERROR UPDATING %s CONFIG: %s", name, err.Error())
  107. }
  108. }
  109. }
  110. }
  111. }