Quellcode durchsuchen

Merge pull request #946 from kubecost/bolt/metrics-exporter

Metrics Exporter
Matt Bolt vor 4 Jahren
Ursprung
Commit
cd5559ef77

+ 4 - 0
Dockerfile.metrics

@@ -18,6 +18,10 @@ RUN set -e ;\
 FROM alpine:latest
 RUN apk add --update --no-cache ca-certificates
 COPY --from=build-env /go/bin/app /go/bin/app
+ADD ./configs/default.json /models/default.json
+ADD ./configs/azure.json /models/azure.json
+ADD ./configs/aws.json /models/aws.json
+ADD ./configs/gcp.json /models/gcp.json
 
 USER 1001
 ENTRYPOINT ["/go/bin/app"]

+ 111 - 8
cmd/kubemetrics/main.go

@@ -1,15 +1,25 @@
 package main
 
 import (
+	"context"
 	"flag"
 	"fmt"
 	"net/http"
+	"time"
 
+	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/costmodel"
+	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
-	"github.com/kubecost/cost-model/pkg/metrics"
+	"github.com/kubecost/cost-model/pkg/prom"
+	"github.com/kubecost/cost-model/pkg/util/watcher"
 
+	prometheus "github.com/prometheus/client_golang/api"
+	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
 	"github.com/rs/cors"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/rest"
@@ -51,6 +61,45 @@ func newKubernetesClusterCache() (clustercache.ClusterCache, error) {
 	return k8sCache, nil
 }
 
+func newPrometheusClient() (prometheus.Client, error) {
+	address := env.GetPrometheusServerEndpoint()
+	if address == "" {
+		return nil, fmt.Errorf("No address for prometheus set in $%s. Aborting.", env.PrometheusServerEndpointEnvVar)
+	}
+
+	queryConcurrency := env.GetMaxQueryConcurrency()
+	klog.Infof("Prometheus Client Max Concurrency set to %d", queryConcurrency)
+
+	timeout := 120 * time.Second
+	keepAlive := 120 * time.Second
+
+	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
+	if err != nil {
+		return nil, fmt.Errorf("Failed to create prometheus client, Error: %v", err)
+	}
+
+	m, err := prom.Validate(promCli)
+	if err != nil || !m.Running {
+		if err != nil {
+			klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		} else if !m.Running {
+			klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
+		}
+	} else {
+		klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
+	}
+
+	api := prometheusAPI.NewAPI(promCli)
+	_, err = api.Config(context.Background())
+	if err != nil {
+		klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
+	} else {
+		klog.Infof("Retrieved a prometheus config file from: %s", address)
+	}
+
+	return promCli, nil
+}
+
 func main() {
 	klog.InitFlags(nil)
 	flag.Set("v", "3")
@@ -58,19 +107,73 @@ func main() {
 
 	klog.V(1).Infof("Starting kubecost-metrics...")
 
+	configWatchers := watcher.NewConfigMapWatchers()
+
+	scrapeInterval := time.Minute
+	promCli, err := newPrometheusClient()
+	if err != nil {
+		panic(err.Error())
+	}
+
+	// Lookup scrape interval for kubecost job, update if found
+	si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
+	if err == nil {
+		scrapeInterval = si
+	}
+
+	klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
+
 	// initialize kubernetes client and cluster cache
 	clusterCache, err := newKubernetesClusterCache()
 	if err != nil {
 		panic(err.Error())
 	}
 
-	// initialize Kubernetes Metrics
-	metrics.InitKubeMetrics(clusterCache, &metrics.KubeMetricsOpts{
-		EmitKubecostControllerMetrics: true,
-		EmitNamespaceAnnotations:      env.IsEmitNamespaceAnnotationsMetric(),
-		EmitPodAnnotations:            env.IsEmitPodAnnotationsMetric(),
-		EmitKubeStateMetrics:          true,
-	})
+	cloudProviderKey := env.GetCloudProviderAPIKey()
+	cloudProvider, err := cloud.NewProvider(clusterCache, cloudProviderKey)
+	if err != nil {
+		panic(err.Error())
+	}
+
+	// Append the pricing config watcher
+	configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
+	watchConfigFunc := configWatchers.ToWatchFunc()
+	watchedConfigs := configWatchers.GetWatchedConfigs()
+
+	k8sClient := clusterCache.GetClient()
+	kubecostNamespace := env.GetKubecostNamespace()
+
+	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
+	for _, cw := range watchedConfigs {
+		configs, err := k8sClient.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
+		if err != nil {
+			klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
+		} else {
+			watchConfigFunc(configs)
+		}
+	}
+
+	clusterCache.SetConfigMapUpdateFunc(watchConfigFunc)
+
+	// Initialize ClusterMap for maintaining ClusterInfo by ClusterID
+	clusterMap := clusters.NewClusterMap(
+		promCli,
+		costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider),
+		5*time.Minute)
+
+	costModel := costmodel.NewCostModel(promCli, cloudProvider, clusterCache, clusterMap, scrapeInterval)
+
+	// initialize Kubernetes Metrics Emitter
+	metricsEmitter := costmodel.NewCostModelMetricsEmitter(promCli, clusterCache, cloudProvider, costModel)
+
+	// download pricing data
+	err = cloudProvider.DownloadPricingData()
+	if err != nil {
+		klog.Errorf("Error downloading pricing data: %s", err)
+	}
+
+	// start emitting metrics
+	metricsEmitter.Start()
 
 	rootMux := http.NewServeMux()
 	rootMux.HandleFunc("/healthz", Healthz)

+ 13 - 0
pkg/cloud/provider.go

@@ -17,6 +17,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/clustercache"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/util/watcher"
 
 	v1 "k8s.io/api/core/v1"
 )
@@ -298,6 +299,18 @@ func CustomPricesEnabled(p Provider) bool {
 	return config.CustomPricesEnabled == "true"
 }
 
+// ConfigWatcherFor returns a new ConfigWatcher instance which watches changes to the "pricing-configs"
+// configmap
+func ConfigWatcherFor(p Provider) *watcher.ConfigMapWatcher {
+	return &watcher.ConfigMapWatcher{
+		ConfigMapName: env.GetPricingConfigmapName(),
+		WatchFunc: func(name string, data map[string]string) error {
+			_, err := p.UpdateConfigFromConfigMap(data)
+			return err
+		},
+	}
+}
+
 // AllocateIdleByDefault returns true if the application settings specify to allocate idle by default
 func AllocateIdleByDefault(p Provider) bool {
 	config, err := p.GetConfig()

+ 6 - 11
pkg/costmodel/metrics.go

@@ -263,17 +263,12 @@ func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clust
 	// init will only actually execute once to register the custom gauges
 	initCostModelMetrics(clusterCache, provider)
 
-	// if the metrics pod is not enabled, we want to emit those metrics from this pod.
-	// NOTE: This is not optimal, as we calculate costs based on run times for other containers.
-	// NOTE: The metrics for run times should be emitted separate from cost-model
-	if !env.IsKubecostMetricsPodEnabled() {
-		metrics.InitKubeMetrics(clusterCache, &metrics.KubeMetricsOpts{
-			EmitKubecostControllerMetrics: true,
-			EmitNamespaceAnnotations:      env.IsEmitNamespaceAnnotationsMetric(),
-			EmitPodAnnotations:            env.IsEmitPodAnnotationsMetric(),
-			EmitKubeStateMetrics:          env.IsEmitKsmV1Metrics(),
-		})
-	}
+	metrics.InitKubeMetrics(clusterCache, &metrics.KubeMetricsOpts{
+		EmitKubecostControllerMetrics: true,
+		EmitNamespaceAnnotations:      env.IsEmitNamespaceAnnotationsMetric(),
+		EmitPodAnnotations:            env.IsEmitPodAnnotationsMetric(),
+		EmitKubeStateMetrics:          env.IsEmitKsmV1Metrics(),
+	})
 
 	return &CostModelMetricsEmitter{
 		PrometheusClient:              promClient,

+ 0 - 22
pkg/costmodel/promparsers.go

@@ -11,30 +11,8 @@ import (
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/util"
-	"gopkg.in/yaml.v2"
 )
 
-const DEFAULT_KUBECOST_JOB_NAME = "kubecost"
-
-type ScrapeConfig struct {
-	JobName        string `yaml:"job_name,omitempty"`
-	ScrapeInterval string `yaml:"scrape_interval,omitempty"`
-}
-
-type PromCfg struct {
-	ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs,omitempty"`
-}
-
-func GetPrometheusConfig(pcfg string) (PromCfg, error) {
-	var promCfg PromCfg
-	err := yaml.Unmarshal([]byte(pcfg), &promCfg)
-	return promCfg, err
-}
-
-func GetKubecostJobName() string {
-	return DEFAULT_KUBECOST_JOB_NAME // TODO: look this up from a prometheus variable?
-}
-
 func GetPVInfoLocal(cache clustercache.ClusterCache, defaultClusterID string) (map[string]*PersistentVolumeClaimData, error) {
 	toReturn := make(map[string]*PersistentVolumeClaimData)
 

+ 54 - 137
pkg/costmodel/router.go

@@ -11,8 +11,10 @@ import (
 	"sync"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/services"
 	"github.com/kubecost/cost-model/pkg/util/httputil"
 	"github.com/kubecost/cost-model/pkg/util/timeutil"
+	"github.com/kubecost/cost-model/pkg/util/watcher"
 
 	"k8s.io/klog"
 
@@ -22,7 +24,6 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
-	cm "github.com/kubecost/cost-model/pkg/clustermanager"
 	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
@@ -32,9 +33,7 @@ import (
 	"github.com/kubecost/cost-model/pkg/thanos"
 	"github.com/kubecost/cost-model/pkg/util/json"
 	prometheus "github.com/prometheus/client_golang/api"
-	prometheusClient "github.com/prometheus/client_golang/api"
 	prometheusAPI "github.com/prometheus/client_golang/api/prometheus/v1"
-	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 
 	"github.com/patrickmn/go-cache"
@@ -45,14 +44,13 @@ import (
 )
 
 const (
-	prometheusTroubleshootingEp = "http://docs.kubecost.com/custom-prom#troubleshoot"
-	RFC3339Milli                = "2006-01-02T15:04:05.000Z"
-	maxCacheMinutes1d           = 11
-	maxCacheMinutes2d           = 17
-	maxCacheMinutes7d           = 37
-	maxCacheMinutes30d          = 137
-	CustomPricingSetting        = "CustomPricing"
-	DiscountSetting             = "Discount"
+	RFC3339Milli         = "2006-01-02T15:04:05.000Z"
+	maxCacheMinutes1d    = 11
+	maxCacheMinutes2d    = 17
+	maxCacheMinutes7d    = 37
+	maxCacheMinutes30d   = 137
+	CustomPricingSetting = "CustomPricing"
+	DiscountSetting      = "Discount"
 )
 
 var (
@@ -64,10 +62,9 @@ var (
 // Prometheus, Kubernetes, the cloud provider, and caches.
 type Accesses struct {
 	Router            *httprouter.Router
-	PrometheusClient  prometheusClient.Client
-	ThanosClient      prometheusClient.Client
+	PrometheusClient  prometheus.Client
+	ThanosClient      prometheus.Client
 	KubeClientSet     kubernetes.Interface
-	ClusterManager    *cm.ClusterManager
 	ClusterMap        clusters.ClusterMap
 	CloudProvider     cloud.Provider
 	Model             *CostModel
@@ -84,13 +81,15 @@ type Accesses struct {
 	// settings will be published in a pub/sub model
 	settingsSubscribers map[string][]chan string
 	settingsMutex       sync.Mutex
+	// registered http service instances
+	httpServices services.HTTPServices
 }
 
 // GetPrometheusClient decides whether the default Prometheus client or the Thanos client
 // should be used.
-func (a *Accesses) GetPrometheusClient(remote bool) prometheusClient.Client {
+func (a *Accesses) GetPrometheusClient(remote bool) prometheus.Client {
 	// Use Thanos Client if it exists (enabled) and remote flag set
-	var pc prometheusClient.Client
+	var pc prometheus.Client
 
 	if remote && a.ThanosClient != nil {
 		pc = a.ThanosClient
@@ -410,7 +409,7 @@ func (a *Accesses) ClusterCosts(w http.ResponseWriter, r *http.Request, ps httpr
 		return
 	}
 
-	var client prometheusClient.Client
+	var client prometheus.Client
 	if useThanos {
 		client = a.ThanosClient
 		offsetDur = thanos.OffsetDuration()
@@ -493,7 +492,7 @@ func (a *Accesses) CostDataModelRange(w http.ResponseWriter, r *http.Request, ps
 	}
 
 	// Use Thanos Client if it exists (enabled) and remote flag set
-	var pClient prometheusClient.Client
+	var pClient prometheus.Client
 	if remote != "false" && a.ThanosClient != nil {
 		pClient = a.ThanosClient
 	} else {
@@ -911,40 +910,6 @@ func (a *Accesses) GetPrometheusMetrics(w http.ResponseWriter, _ *http.Request,
 	w.Write(WrapData(result, nil))
 }
 
-// Creates a new ClusterManager instance using a boltdb storage. If that fails,
-// then we fall back to a memory-only storage.
-func newClusterManager() *cm.ClusterManager {
-	clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
-
-	// Return a memory-backed cluster manager populated by configmap
-	return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
-
-	// NOTE: The following should be used with a persistent disk store. Since the
-	// NOTE: configmap approach is currently the "persistent" source (entries are read-only
-	// NOTE: on the backend), we don't currently need to store on disk.
-	/*
-		path := env.GetConfigPath()
-		db, err := bolt.Open(path+"costmodel.db", 0600, nil)
-		if err != nil {
-			klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
-			return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
-		}
-
-		store, err := cm.NewBoltDBClusterStorage("clusters", db)
-		if err != nil {
-			klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
-			return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
-		}
-
-		return cm.NewConfiguredClusterManager(store, clustersConfigFile)
-	*/
-}
-
-type ConfigWatchers struct {
-	ConfigmapName string
-	WatchFunc     func(string, map[string]string) error
-}
-
 // captures the panic event in sentry
 func capturePanicEvent(err string, stack string) {
 	msg := fmt.Sprintf("Panic: %s\nStackTrace: %s\n", err, stack)
@@ -975,12 +940,14 @@ func handlePanic(p errors.Panic) bool {
 	return p.Type == errors.PanicTypeHTTP
 }
 
-func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
+func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses {
 	klog.InitFlags(nil)
 	flag.Set("v", "3")
 	flag.Parse()
 	klog.V(1).Infof("Starting cost-model (git commit \"%s\")", env.GetAppVersion())
 
+	configWatchers := watcher.NewConfigMapWatchers(additionalConfigWatchers...)
+
 	var err error
 	if errorReportingEnabled {
 		err = sentry.Init(sentry.ClientOptions{Release: env.GetAppVersion()})
@@ -1004,59 +971,40 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 
 	timeout := 120 * time.Second
 	keepAlive := 120 * time.Second
-	scrapeInterval, _ := time.ParseDuration("1m")
+	scrapeInterval := time.Minute
 
 	promCli, err := prom.NewPrometheusClient(address, timeout, keepAlive, queryConcurrency, "")
 	if err != nil {
 		klog.Fatalf("Failed to create prometheus client, Error: %v", err)
 	}
 
-	api := prometheusAPI.NewAPI(promCli)
-	pcfg, err := api.Config(context.Background())
-	if err != nil {
-		klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
-	} else {
-		klog.V(1).Info("Retrieved a prometheus config file from: " + address)
-		sc, err := GetPrometheusConfig(pcfg.YAML)
-		if err != nil {
-			klog.Infof("Fix YAML error %s", err)
-		}
-		for _, scrapeconfig := range sc.ScrapeConfigs {
-			if scrapeconfig.JobName == GetKubecostJobName() {
-				if scrapeconfig.ScrapeInterval != "" {
-					si := scrapeconfig.ScrapeInterval
-					sid, err := time.ParseDuration(si)
-					if err != nil {
-						klog.Infof("error parseing scrapeConfig for %s", scrapeconfig.JobName)
-					} else {
-						klog.Infof("Found Kubecost job scrape interval of: %s", si)
-						scrapeInterval = sid
-					}
-				}
-			}
-		}
-	}
-	klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
-
 	m, err := prom.Validate(promCli)
-	if err != nil || m.Running == false {
-		if err != nil {
-			klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prometheusTroubleshootingEp)
-		} else if m.Running == false {
-			klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prometheusTroubleshootingEp)
-		}
-
-		api := prometheusAPI.NewAPI(promCli)
-		_, err = api.Config(context.Background())
+	if err != nil || !m.Running {
 		if err != nil {
-			klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prometheusTroubleshootingEp)
-		} else {
-			klog.V(1).Info("Retrieved a prometheus config file from: " + address)
+			klog.Errorf("Failed to query prometheus at %s. Error: %s . Troubleshooting help available at: %s", address, err.Error(), prom.PrometheusTroubleshootingURL)
+		} else if !m.Running {
+			klog.Errorf("Prometheus at %s is not running. Troubleshooting help available at: %s", address, prom.PrometheusTroubleshootingURL)
 		}
 	} else {
 		klog.V(1).Info("Success: retrieved the 'up' query against prometheus at: " + address)
 	}
 
+	api := prometheusAPI.NewAPI(promCli)
+	_, err = api.Config(context.Background())
+	if err != nil {
+		klog.Infof("No valid prometheus config file at %s. Error: %s . Troubleshooting help available at: %s. Ignore if using cortex/thanos here.", address, err.Error(), prom.PrometheusTroubleshootingURL)
+	} else {
+		klog.Infof("Retrieved a prometheus config file from: %s", address)
+	}
+
+	// Lookup scrape interval for kubecost job, update if found
+	si, err := prom.ScrapeIntervalFor(promCli, env.GetKubecostJobName())
+	if err == nil {
+		scrapeInterval = si
+	}
+
+	klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
+
 	// Kubernetes API setup
 	var kc *rest.Config
 	if kubeconfig := env.GetKubeConfigPath(); kubeconfig != "" {
@@ -1083,37 +1031,17 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		panic(err.Error())
 	}
 
-	watchConfigFunc := func(c interface{}) {
-		conf := c.(*v1.ConfigMap)
-		if conf.GetName() == env.GetPricingConfigmapName() {
-			_, err := cloudProvider.UpdateConfigFromConfigMap(conf.Data)
-			if err != nil {
-				klog.Infof("ERROR UPDATING %s CONFIG: %s", "pricing-configs", err.Error())
-			}
-		}
-		for _, cw := range additionalConfigWatchers {
-			if conf.GetName() == cw.ConfigmapName {
-				err := cw.WatchFunc(conf.GetName(), conf.Data)
-				if err != nil {
-					klog.Infof("ERROR UPDATING %s CONFIG: %s", cw.ConfigmapName, err.Error())
-				}
-			}
-		}
-	}
+	// Append the pricing config watcher
+	configWatchers.AddWatcher(cloud.ConfigWatcherFor(cloudProvider))
+	watchConfigFunc := configWatchers.ToWatchFunc()
+	watchedConfigs := configWatchers.GetWatchedConfigs()
 
 	kubecostNamespace := env.GetKubecostNamespace()
 	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
-	configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), env.GetPricingConfigmapName(), metav1.GetOptions{})
-	if err != nil {
-		klog.Infof("No %s configmap found at installtime, using existing configs: %s", env.GetPricingConfigmapName(), err.Error())
-	} else {
-		watchConfigFunc(configs)
-	}
-
-	for _, cw := range additionalConfigWatchers {
-		configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw.ConfigmapName, metav1.GetOptions{})
+	for _, cw := range watchedConfigs {
+		configs, err := kubeClientset.CoreV1().ConfigMaps(kubecostNamespace).Get(context.Background(), cw, metav1.GetOptions{})
 		if err != nil {
-			klog.Infof("No %s configmap found at installtime, using existing configs: %s", cw.ConfigmapName, err.Error())
+			klog.Infof("No %s configmap found at install time, using existing configs: %s", cw, err.Error())
 		} else {
 			klog.Infof("Found configmap %s, watching...", configs.Name)
 			watchConfigFunc(configs)
@@ -1122,14 +1050,6 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 
 	k8sCache.SetConfigMapUpdateFunc(watchConfigFunc)
 
-	// TODO: General Architecture Note: Several passes have been made to modularize a lot of
-	// TODO: our code, but the router still continues to be the obvious entry point for new \
-	// TODO: features. We should look to split out the actual "router" functionality and
-	// TODO: implement a builder -> controller for stitching new features and other dependencies.
-	clusterManager := newClusterManager()
-
-	// Initialize metrics here
-
 	remoteEnabled := env.IsRemoteEnabled()
 	if remoteEnabled {
 		info, err := cloudProvider.ClusterInfo()
@@ -1144,7 +1064,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	}
 
 	// Thanos Client
-	var thanosClient prometheusClient.Client
+	var thanosClient prometheus.Client
 	if thanos.IsEnabled() {
 		thanosAddress := thanos.QueryURL()
 
@@ -1208,7 +1128,6 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		PrometheusClient:  promCli,
 		ThanosClient:      thanosClient,
 		KubeClientSet:     kubeClientset,
-		ClusterManager:    clusterManager,
 		ClusterMap:        clusterMap,
 		CloudProvider:     cloudProvider,
 		Model:             costModel,
@@ -1219,6 +1138,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		OutOfClusterCache: outOfClusterCache,
 		SettingsCache:     settingsCache,
 		CacheExpiration:   cacheExpiration,
+		httpServices:      services.NewCostModelServices(),
 	}
 	// Use the Accesses instance, itself, as the CostModelAggregator. This is
 	// confusing and unconventional, but necessary so that we can swap it
@@ -1242,9 +1162,9 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 		log.Infof("Init: AggregateCostModel cache warming disabled")
 	}
 
-	a.MetricsEmitter.Start()
-
-	managerEndpoints := cm.NewClusterManagerEndpoints(a.ClusterManager)
+	if !env.IsKubecostMetricsPodEnabled() {
+		a.MetricsEmitter.Start()
+	}
 
 	a.Router.GET("/costDataModel", a.CostDataModel)
 	a.Router.GET("/costDataModelRange", a.CostDataModelRange)
@@ -1274,10 +1194,7 @@ func Initialize(additionalConfigWatchers ...ConfigWatchers) *Accesses {
 	a.Router.GET("/diagnostics/requestQueue", a.GetPrometheusQueueState)
 	a.Router.GET("/diagnostics/prometheusMetrics", a.GetPrometheusMetrics)
 
-	// cluster manager endpoints
-	a.Router.GET("/clusters", managerEndpoints.GetAllClusters)
-	a.Router.PUT("/clusters", managerEndpoints.PutCluster)
-	a.Router.DELETE("/clusters/:id", managerEndpoints.DeleteCluster)
+	a.httpServices.RegisterAll(a.Router)
 
 	return a
 }

+ 7 - 1
pkg/env/costmodelenv.go

@@ -72,7 +72,8 @@ const (
 
 	PromClusterIDLabelEnvVar = "PROM_CLUSTER_ID_LABEL"
 
-	PricingConfigmapName = "PRICING_CONFIGMAP_NAME"
+	PricingConfigmapName  = "PRICING_CONFIGMAP_NAME"
+	KubecostJobNameEnvVar = "KUBECOST_JOB_NAME"
 )
 
 func GetPricingConfigmapName() string {
@@ -355,6 +356,11 @@ func GetParsedUTCOffset() time.Duration {
 	return offset
 }
 
+// GetKubecostJobName returns the environment variable value for KubecostJobNameEnvVar
+func GetKubecostJobName() string {
+	return Get(KubecostJobNameEnvVar, "kubecost")
+}
+
 func IsCacheWarmingEnabled() bool {
 	return GetBool(CacheWarmingEnabledEnvVar, true)
 }

+ 62 - 0
pkg/prom/helpers.go

@@ -0,0 +1,62 @@
+package prom
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	prometheus "github.com/prometheus/client_golang/api"
+	v1 "github.com/prometheus/client_golang/api/prometheus/v1"
+
+	"gopkg.in/yaml.v2"
+)
+
+const PrometheusTroubleshootingURL = "http://docs.kubecost.com/custom-prom#troubleshoot"
+
+// ScrapeConfig is the minimalized view of a prometheus scrape configuration
+type ScrapeConfig struct {
+	JobName        string `yaml:"job_name,omitempty"`
+	ScrapeInterval string `yaml:"scrape_interval,omitempty"`
+}
+
+// PrometheusConfig is the minimalized view of a prometheus configuration
+type PrometheusConfig struct {
+	ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs,omitempty"`
+}
+
+// GetPrometheusConfig uses the provided yaml string to parse the minimalized prometheus config
+func GetPrometheusConfig(pcfg string) (PrometheusConfig, error) {
+	var promCfg PrometheusConfig
+	err := yaml.Unmarshal([]byte(pcfg), &promCfg)
+	return promCfg, err
+}
+
+// ScrapeIntervalFor uses the provided prometheus client to locate a scrape interval for a specific job name
+func ScrapeIntervalFor(client prometheus.Client, jobName string) (time.Duration, error) {
+	api := v1.NewAPI(client)
+	promConfig, err := api.Config(context.Background())
+	if err != nil {
+		return 0, err
+	}
+
+	cfg, err := GetPrometheusConfig(promConfig.YAML)
+	if err != nil {
+		return 0, err
+	}
+
+	for _, sc := range cfg.ScrapeConfigs {
+		if sc.JobName == jobName {
+			if sc.ScrapeInterval != "" {
+				si := sc.ScrapeInterval
+				sid, err := time.ParseDuration(si)
+				if err != nil {
+					return 0, fmt.Errorf("Error parsing scrape config for %s", sc.JobName)
+				} else {
+					return sid, nil
+				}
+			}
+		}
+	}
+
+	return 0, fmt.Errorf("Failed to locate scrape config for %s", jobName)
+}

+ 8 - 7
pkg/clustermanager/boltdbstorage.go → pkg/services/clusters/boltdbstorage.go

@@ -1,15 +1,16 @@
-package clustermanager
+package clusters
 
 import (
 	bolt "go.etcd.io/bbolt"
-	_ "k8s.io/klog"
 )
 
+// BoltDBClusterStorage is a boltdb implementation of a database used to store cluster definitions
 type BoltDBClusterStorage struct {
 	bucket []byte
 	db     *bolt.DB
 }
 
+// NewBoltDBClusterStorage creates a new boltdb backed ClusterStorage implementation
 func NewBoltDBClusterStorage(bucket string, db *bolt.DB) (ClusterStorage, error) {
 	bucketKey := []byte(bucket)
 
@@ -32,7 +33,7 @@ func NewBoltDBClusterStorage(bucket string, db *bolt.DB) (ClusterStorage, error)
 	}, nil
 }
 
-// Adds the entry if the key does not exist
+// AddIfNotExists Adds the entry if the key does not exist
 func (cs *BoltDBClusterStorage) AddIfNotExists(key string, cluster []byte) error {
 	return cs.db.Update(func(tx *bolt.Tx) error {
 		k := []byte(key)
@@ -45,7 +46,7 @@ func (cs *BoltDBClusterStorage) AddIfNotExists(key string, cluster []byte) error
 	})
 }
 
-// Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
+// AddOrUpdate Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
 // value with the provided.
 func (cs *BoltDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
 	return cs.db.Update(func(tx *bolt.Tx) error {
@@ -55,7 +56,7 @@ func (cs *BoltDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
 	})
 }
 
-// Removes a key from the cluster storage
+// Remove Removes a key from the cluster storage
 func (cs *BoltDBClusterStorage) Remove(key string) error {
 	return cs.db.Update(func(tx *bolt.Tx) error {
 		bucket := tx.Bucket(cs.bucket)
@@ -64,7 +65,7 @@ func (cs *BoltDBClusterStorage) Remove(key string) error {
 	})
 }
 
-// Iterates through all key/values for the storage and calls the handler func. If a handler returns
+// Each Iterates through all key/values for the storage and calls the handler func. If a handler returns
 // an error, the iteration stops.
 func (cs *BoltDBClusterStorage) Each(handler func(string, []byte) error) error {
 	return cs.db.View(func(tx *bolt.Tx) error {
@@ -87,7 +88,7 @@ func (cs *BoltDBClusterStorage) Each(handler func(string, []byte) error) error {
 	})
 }
 
-// Closes the backing storage
+// Close Closes the backing storage
 func (cs *BoltDBClusterStorage) Close() error {
 	return cs.db.Close()
 }

+ 8 - 2
pkg/clustermanager/clustermanager.go → pkg/services/clusters/clustermanager.go

@@ -1,4 +1,4 @@
-package clustermanager
+package clusters
 
 import (
 	"encoding/base64"
@@ -71,6 +71,7 @@ type ClusterStorage interface {
 	Close() error
 }
 
+// ClusterManager provides an implementation
 type ClusterManager struct {
 	storage ClusterStorage
 	// cache   map[string]*ClusterDefinition
@@ -133,7 +134,7 @@ func NewConfiguredClusterManager(storage ClusterStorage, config string) *Cluster
 	return clusterManager
 }
 
-// Adds, but will not update an existing entry.
+// Add Adds a cluster definition, but will not update an existing entry.
 func (cm *ClusterManager) Add(cluster ClusterDefinition) (*ClusterDefinition, error) {
 	// First time add
 	if cluster.ID == "" {
@@ -153,6 +154,8 @@ func (cm *ClusterManager) Add(cluster ClusterDefinition) (*ClusterDefinition, er
 	return &cluster, nil
 }
 
+// AddOrUpdate will add the cluster definition if it doesn't exist, or update the existing definition
+// if it does exist.
 func (cm *ClusterManager) AddOrUpdate(cluster ClusterDefinition) (*ClusterDefinition, error) {
 	// First time add
 	if cluster.ID == "" {
@@ -172,10 +175,12 @@ func (cm *ClusterManager) AddOrUpdate(cluster ClusterDefinition) (*ClusterDefini
 	return &cluster, nil
 }
 
+// Remove will remove a cluster definition by id.
 func (cm *ClusterManager) Remove(id string) error {
 	return cm.storage.Remove(id)
 }
 
+// GetAll will return all of the cluster definitions
 func (cm *ClusterManager) GetAll() []*ClusterDefinition {
 	clusters := []*ClusterDefinition{}
 
@@ -198,6 +203,7 @@ func (cm *ClusterManager) GetAll() []*ClusterDefinition {
 	return clusters
 }
 
+// Close will close the backing database
 func (cm *ClusterManager) Close() error {
 	return cm.storage.Close()
 }

+ 19 - 10
pkg/clustermanager/clustersendpoints.go → pkg/services/clusters/clustersendpoints.go

@@ -1,4 +1,4 @@
-package clustermanager
+package clusters
 
 import (
 	"errors"
@@ -18,27 +18,37 @@ type DataEnvelope struct {
 	Data   interface{} `json:"data"`
 }
 
-type ClusterManagerEndpoints struct {
+// ClusterManagerHTTPService is an implementation of HTTPService which provides
+// the frontend with the ability to manage stored cluster definitions.
+type ClusterManagerHTTPService struct {
 	manager *ClusterManager
 }
 
-func NewClusterManagerEndpoints(manager *ClusterManager) *ClusterManagerEndpoints {
-	return &ClusterManagerEndpoints{
+// NewClusterManagerHTTPService creates a new cluster management http service
+func NewClusterManagerHTTPService(manager *ClusterManager) *ClusterManagerHTTPService {
+	return &ClusterManagerHTTPService{
 		manager: manager,
 	}
 }
 
-func (cme *ClusterManagerEndpoints) GetAllClusters(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+// Register assigns the endpoints and returns an error on failure.
+func (cme *ClusterManagerHTTPService) Register(router *httprouter.Router) error {
+	router.GET("/clusters", cme.GetAllClusters)
+	router.PUT("/clusters", cme.PutCluster)
+	router.DELETE("/clusters/:id", cme.DeleteCluster)
+
+	return nil
+}
+
+func (cme *ClusterManagerHTTPService) GetAllClusters(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	clusters := cme.manager.GetAll()
 	w.Write(wrapData(clusters, nil))
 }
 
-func (cme *ClusterManagerEndpoints) PutCluster(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+func (cme *ClusterManagerHTTPService) PutCluster(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	data, err := ioutil.ReadAll(r.Body)
 	if err != nil {
@@ -62,9 +72,8 @@ func (cme *ClusterManagerEndpoints) PutCluster(w http.ResponseWriter, r *http.Re
 	w.Write(wrapData(cd, nil))
 }
 
-func (cme *ClusterManagerEndpoints) DeleteCluster(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+func (cme *ClusterManagerHTTPService) DeleteCluster(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	w.Header().Set("Content-Type", "application/json")
-	w.Header().Set("Access-Control-Allow-Origin", "*")
 
 	clusterID := ps.ByName("id")
 	if clusterID == "" {

+ 8 - 10
pkg/clustermanager/mapdbstorage.go → pkg/services/clusters/mapdbstorage.go

@@ -1,20 +1,18 @@
-package clustermanager
-
-import (
-	_ "k8s.io/klog"
-)
+package clusters
 
+// MapDBClusterStorage is a map implementation of a database used to store cluster definitions
 type MapDBClusterStorage struct {
 	store map[string][]byte
 }
 
+// NewMapDBClusterStorage creates a new map backed ClusterStorage implementation
 func NewMapDBClusterStorage() ClusterStorage {
 	return &MapDBClusterStorage{
 		store: make(map[string][]byte),
 	}
 }
 
-// Adds the entry if the key does not exist
+// AddIfNotExists Adds the entry if the key does not exist
 func (cs *MapDBClusterStorage) AddIfNotExists(key string, cluster []byte) error {
 	if _, ok := cs.store[key]; !ok {
 		cs.store[key] = cluster
@@ -22,20 +20,20 @@ func (cs *MapDBClusterStorage) AddIfNotExists(key string, cluster []byte) error
 	return nil
 }
 
-// Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
+// AddOrUpdate Adds the encoded cluster to storage if it doesn't exist. Otherwise, update the existing
 // value with the provided.
 func (cs *MapDBClusterStorage) AddOrUpdate(key string, cluster []byte) error {
 	cs.store[key] = cluster
 	return nil
 }
 
-// Removes a key from the cluster storage
+// Remove Removes a key from the cluster storage
 func (cs *MapDBClusterStorage) Remove(key string) error {
 	delete(cs.store, key)
 	return nil
 }
 
-// Iterates through all key/values for the storage and calls the handler func. If a handler returns
+// Each Iterates through all key/values for the storage and calls the handler func. If a handler returns
 // an error, the iteration stops.
 func (cs *MapDBClusterStorage) Each(handler func(string, []byte) error) error {
 	for k, v := range cs.store {
@@ -49,7 +47,7 @@ func (cs *MapDBClusterStorage) Each(handler func(string, []byte) error) error {
 	return nil
 }
 
-// Closes the backing storage
+// Close Closes the backing storage
 func (cs *MapDBClusterStorage) Close() error {
 	return nil
 }

+ 37 - 0
pkg/services/clusterservice.go

@@ -0,0 +1,37 @@
+package services
+
+import "github.com/kubecost/cost-model/pkg/services/clusters"
+
+// NewClusterManagerService creates a new HTTPService implementation driving cluster definition management
+// for the frontend
+func NewClusterManagerService() HTTPService {
+	return clusters.NewClusterManagerHTTPService(newClusterManager())
+}
+
+// newClusterManager creates a new cluster manager instance for use in the service
+func newClusterManager() *clusters.ClusterManager {
+	clustersConfigFile := "/var/configs/clusters/default-clusters.yaml"
+
+	// Return a memory-backed cluster manager populated by configmap
+	return clusters.NewConfiguredClusterManager(clusters.NewMapDBClusterStorage(), clustersConfigFile)
+
+	// NOTE: The following should be used with a persistent disk store. Since the
+	// NOTE: configmap approach is currently the "persistent" source (entries are read-only
+	// NOTE: on the backend), we don't currently need to store on disk.
+	/*
+		path := env.GetConfigPath()
+		db, err := bolt.Open(path+"costmodel.db", 0600, nil)
+		if err != nil {
+			klog.V(1).Infof("[Error] Failed to create costmodel.db: %s", err.Error())
+			return cm.NewConfiguredClusterManager(cm.NewMapDBClusterStorage(), clustersConfigFile)
+		}
+
+		store, err := clusters.NewBoltDBClusterStorage("clusters", db)
+		if err != nil {
+			klog.V(1).Infof("[Error] Failed to Create Cluster Storage: %s", err.Error())
+			return clusters.NewConfiguredClusterManager(clusters.NewMapDBClusterStorage(), clustersConfigFile)
+		}
+
+		return clusters.NewConfiguredClusterManager(store, clustersConfigFile)
+	*/
+}

+ 65 - 0
pkg/services/services.go

@@ -0,0 +1,65 @@
+package services
+
+import (
+	"sync"
+
+	"github.com/julienschmidt/httprouter"
+	"github.com/kubecost/cost-model/pkg/log"
+)
+
+// HTTPService defines an implementation prototype for an object capable of registering
+// endpoints on an http router which provide an service relevant to the cost-model.
+type HTTPService interface {
+	// Register assigns the endpoints and returns an error on failure.
+	Register(*httprouter.Router) error
+}
+
+// HTTPServices defines an implementation prototype for an object capable of managing and registering
+// predefined HTTPService routes
+type HTTPServices interface {
+	// Add a HTTPService implementation for registration
+	Add(service HTTPService)
+
+	// RegisterAll registers all the services added with the provided router
+	RegisterAll(*httprouter.Router) error
+}
+
+type defaultHTTPServices struct {
+	sync.Mutex
+	services []HTTPService
+}
+
+// Add a HTTPService implementation for
+func (dhs *defaultHTTPServices) Add(service HTTPService) {
+	if service == nil {
+		log.Warningf("Attempting to Add nil HTTPService")
+		return
+	}
+
+	dhs.Lock()
+	defer dhs.Unlock()
+
+	dhs.services = append(dhs.services, service)
+}
+
+// RegisterAll registers all the services added with the provided router
+func (dhs *defaultHTTPServices) RegisterAll(router *httprouter.Router) error {
+	dhs.Lock()
+	defer dhs.Unlock()
+
+	for _, svc := range dhs.services {
+		svc.Register(router)
+	}
+
+	return nil
+}
+
+// NewCostModelServices creates an HTTPServices implementation containing any predefined
+// http services used with the cost-model
+func NewCostModelServices() HTTPServices {
+	return &defaultHTTPServices{
+		services: []HTTPService{
+			NewClusterManagerService(),
+		},
+	}
+}

+ 142 - 0
pkg/util/watcher/configwatcher_test.go

@@ -0,0 +1,142 @@
+package watcher
+
+import (
+	"testing"
+
+	v1 "k8s.io/api/core/v1"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+)
+
+const (
+	TestConfigMapName          = "test-config"
+	AlternateTestConfigMapName = "alternate-test-config"
+	TestDataProperty           = "test-prop"
+)
+
+func newTestWatcher(t *testing.T, configMapName string, instanceName string, didRun *bool) *ConfigMapWatcher {
+	return &ConfigMapWatcher{
+		ConfigMapName: configMapName,
+		WatchFunc: func(cmn string, data map[string]string) error {
+			t.Logf("ConfigMapWatcher[%s] triggered for ConfigMap: %s, data[\"test\"] = %s\n", instanceName, cmn, data[TestDataProperty])
+			*didRun = true
+			return nil
+		},
+	}
+}
+
+func newConfigMap(configMapName string, dataValue string) *v1.ConfigMap {
+	return &v1.ConfigMap{
+		Data: map[string]string{
+			TestDataProperty: dataValue,
+		},
+		ObjectMeta: metav1.ObjectMeta{
+			Name: configMapName,
+		},
+	}
+}
+
+func TestConfigWatcherSingleHandler(t *testing.T) {
+	// Test that a single watcher added for the configmap test-config is executed when
+	// triggered
+	var didRun bool = false
+
+	w := NewConfigMapWatchers(newTestWatcher(t, TestConfigMapName, "single", &didRun))
+	f := w.ToWatchFunc()
+
+	// Execute watch func with a 'test-config' configmap
+	f(newConfigMap(TestConfigMapName, "testing 1 2 3"))
+
+	if !didRun {
+		t.Errorf("Failed to run configmap handler for 'single'\n")
+	}
+}
+
+func TestConfigWatcherMultipleHandlers(t *testing.T) {
+	// Test that adding two different configmap watchers aren't both triggered on a configmap update
+	var firstDidRun bool = false
+	var secondDidRun bool = false
+
+	w := NewConfigMapWatchers(
+		newTestWatcher(t, TestConfigMapName, "single", &firstDidRun),
+		newTestWatcher(t, AlternateTestConfigMapName, "alternate", &secondDidRun))
+
+	f := w.ToWatchFunc()
+
+	// Execute watch func with a 'alternate-test-config' configmap
+	f(newConfigMap(AlternateTestConfigMapName, "oof!"))
+
+	// Assert that first did not run
+	if firstDidRun {
+		t.Errorf("Executed alternate-test-config map change, but test-config handler, 'single' executed!\n")
+	}
+
+	if !secondDidRun {
+		t.Errorf("Failed to run configmap handler for 'alternate'\n")
+	}
+}
+
+func TestConfigWatcherMultipleHandlersForSameConfig(t *testing.T) {
+	// Test that adding two different configmap watchers for the same configmap are both triggered
+	var firstDidRun bool = false
+	var secondDidRun bool = false
+	var thirdDidRun bool = false
+
+	w := NewConfigMapWatchers(
+		newTestWatcher(t, TestConfigMapName, "first", &firstDidRun),
+		newTestWatcher(t, AlternateTestConfigMapName, "alternate", &secondDidRun),
+		// third watcher watches for the same configmap as "first"
+		newTestWatcher(t, TestConfigMapName, "third", &thirdDidRun),
+	)
+
+	f := w.ToWatchFunc()
+
+	// Execute watch func with a 'test-config' configmap
+	f(newConfigMap(TestConfigMapName, "double trouble"))
+
+	// Assert that second did not run
+	if secondDidRun {
+		t.Errorf("Executed test-config map change, first handler, 'single', executed!\n")
+	}
+
+	if !firstDidRun {
+		t.Errorf("Failed to run configmap handler for 'first'\n")
+	}
+	if !thirdDidRun {
+		t.Errorf("Failed to run configmap handler for 'third'\n")
+	}
+}
+
+func TestConfigMapWatcherWithAdd(t *testing.T) {
+	// Test that adding two different configmap watchers for the same configmap are both triggered
+	// when using Add() and AddWatcher()
+	var firstDidRun bool = false
+	var secondDidRun bool = false
+	var thirdDidRun bool = false
+
+	a, b, c := newTestWatcher(t, TestConfigMapName, "first", &firstDidRun),
+		newTestWatcher(t, AlternateTestConfigMapName, "alternate", &secondDidRun),
+		// third watcher watches for the same configmap as "first"
+		newTestWatcher(t, TestConfigMapName, "third", &thirdDidRun)
+
+	w := NewConfigMapWatchers()
+	w.AddWatcher(a)
+	w.AddWatcher(b)
+	w.Add(c.ConfigMapName, c.WatchFunc)
+
+	f := w.ToWatchFunc()
+
+	// Execute watch func with a 'test-config' configmap
+	f(newConfigMap(TestConfigMapName, "double trouble"))
+
+	// Assert that second did not run
+	if secondDidRun {
+		t.Errorf("Executed test-config map change, first handler, 'single', executed!\n")
+	}
+
+	if !firstDidRun {
+		t.Errorf("Failed to run configmap handler for 'first'\n")
+	}
+	if !thirdDidRun {
+		t.Errorf("Failed to run configmap handler for 'third'\n")
+	}
+}

+ 74 - 0
pkg/util/watcher/configwatchers.go

@@ -0,0 +1,74 @@
+package watcher
+
+import (
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/klog"
+)
+
+// ConfigMapWatcher represents a single configmap watcher
+type ConfigMapWatcher struct {
+	ConfigMapName string
+	WatchFunc     func(string, map[string]string) error
+}
+
+type ConfigMapWatchers struct {
+	watchers map[string][]*ConfigMapWatcher
+}
+
+func NewConfigMapWatchers(watchers ...*ConfigMapWatcher) *ConfigMapWatchers {
+	cmw := &ConfigMapWatchers{
+		watchers: make(map[string][]*ConfigMapWatcher),
+	}
+
+	for _, w := range watchers {
+		cmw.AddWatcher(w)
+	}
+
+	return cmw
+}
+
+func (cmw *ConfigMapWatchers) AddWatcher(watcher *ConfigMapWatcher) {
+	if watcher == nil {
+		return
+	}
+
+	name := watcher.ConfigMapName
+	cmw.watchers[name] = append(cmw.watchers[name], watcher)
+}
+
+func (cmw *ConfigMapWatchers) Add(configMapName string, watchFunc func(string, map[string]string) error) {
+	cmw.AddWatcher(&ConfigMapWatcher{
+		ConfigMapName: configMapName,
+		WatchFunc:     watchFunc,
+	})
+}
+
+func (cmw *ConfigMapWatchers) GetWatchedConfigs() []string {
+	configNames := []string{}
+
+	for k := range cmw.watchers {
+		configNames = append(configNames, k)
+	}
+
+	return configNames
+}
+
+func (cmw *ConfigMapWatchers) ToWatchFunc() func(interface{}) {
+	return func(c interface{}) {
+		conf, ok := c.(*v1.ConfigMap)
+		if !ok {
+			return
+		}
+
+		name := conf.GetName()
+		data := conf.Data
+		if watchers, ok := cmw.watchers[name]; ok {
+			for _, cw := range watchers {
+				err := cw.WatchFunc(name, data)
+				if err != nil {
+					klog.Infof("ERROR UPDATING %s CONFIG: %s", name, err.Error())
+				}
+			}
+		}
+	}
+}