| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package watcher
- import (
- "context"
- "sync/atomic"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/opencost/opencost/pkg/clustercache"
- v1 "k8s.io/api/core/v1"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/fields"
- "k8s.io/client-go/kubernetes"
- )
- // ConfigMapWatcher represents a single configmap watcher
- type ConfigMapWatcher struct {
- ConfigMapName string
- WatchFunc func(string, map[string]string) error
- }
- type ConfigMapWatchers struct {
- kubeClientset kubernetes.Interface
- namespace string
- watchers map[string][]*ConfigMapWatcher
- watchController clustercache.WatchController
- started atomic.Bool
- stop chan struct{}
- }
- func NewConfigMapWatchers(kubeClientset kubernetes.Interface, namespace string, watchers ...*ConfigMapWatcher) *ConfigMapWatchers {
- var stopCh chan struct{}
- var watchController clustercache.WatchController
- if kubeClientset != nil {
- coreRestClient := kubeClientset.CoreV1().RESTClient()
- watchController = clustercache.NewCachingWatcher(coreRestClient, "configmaps", &v1.ConfigMap{}, namespace, fields.Everything())
- stopCh = make(chan struct{})
- // a bit awkward here, but since we'll mostly be deferring adding a watcher after initializing k8s,
- // we'll warmup and start the actual watcher here
- watchController.WarmUp(stopCh)
- go watchController.Run(1, stopCh)
- }
- cmw := &ConfigMapWatchers{
- kubeClientset: kubeClientset,
- namespace: namespace,
- watchController: watchController,
- watchers: make(map[string][]*ConfigMapWatcher),
- stop: stopCh,
- }
- for _, w := range watchers {
- cmw.AddWatcher(w)
- }
- return cmw
- }
- func (cmw *ConfigMapWatchers) AddWatcher(watcher *ConfigMapWatcher) {
- if cmw.started.Load() {
- log.Warnf("Cannot add watcher %s after starting", watcher.ConfigMapName)
- return
- }
- if watcher == nil {
- return
- }
- name := watcher.ConfigMapName
- cmw.watchers[name] = append(cmw.watchers[name], watcher)
- }
- func (cmw *ConfigMapWatchers) Add(configMapName string, watchFunc func(string, map[string]string) error) {
- cmw.AddWatcher(&ConfigMapWatcher{
- ConfigMapName: configMapName,
- WatchFunc: watchFunc,
- })
- }
- func (cmw *ConfigMapWatchers) Watch() {
- if cmw.kubeClientset == nil {
- return
- }
- if !cmw.started.CompareAndSwap(false, true) {
- log.Warnf("Already started")
- return
- }
- watchConfigFunc := cmw.toWatchFunc()
- // We need an initial invocation because the init of the cache has happened before we had access to the provider.
- for cw := range cmw.watchers {
- configs, err := cmw.kubeClientset.CoreV1().ConfigMaps(cmw.namespace).Get(context.Background(), cw, metav1.GetOptions{})
- if err != nil {
- log.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
- } else {
- log.Infof("Found configmap %s, watching...", configs.Name)
- watchConfigFunc(configs)
- }
- }
- cmw.watchController.SetUpdateHandler(watchConfigFunc)
- }
- func (cmw *ConfigMapWatchers) Stop() {
- if cmw.stop == nil {
- return
- }
- close(cmw.stop)
- cmw.stop = nil
- }
- func (cmw *ConfigMapWatchers) toWatchFunc() func(any) {
- return func(c any) {
- conf, ok := c.(*v1.ConfigMap)
- if !ok {
- return
- }
- name := conf.GetName()
- data := conf.Data
- if watchers, ok := cmw.watchers[name]; ok {
- for _, cw := range watchers {
- err := cw.WatchFunc(name, data)
- if err != nil {
- log.Infof("ERROR UPDATING %s CONFIG: %s", name, err.Error())
- }
- }
- }
- }
- }
|