Просмотр исходного кода

* Configuration Management
* Cluster Info Synchronization on ConfigFile
* Cluster Cache Synchronization on ConfigFile

Matt Bolt 4 лет назад
Родитель
Сommit
2488ef976f

+ 34 - 12
cmd/kubemetrics/main.go

@@ -9,6 +9,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/costmodel"
 	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
@@ -27,6 +28,13 @@ import (
 	"k8s.io/klog"
 )
 
+// ClusterExportInterval is the interval used to export the cluster if env.IsExportClusterCacheEnabled() is true
+const ClusterExportInterval = 5 * time.Minute
+
+// clusterExporter is used if env.IsExportClusterCacheEnabled() is set to true
+// it will export the kubernetes cluster data to a file on a specific interval
+var clusterExporter *clustercache.ClusterExporter
+
 func Healthz(w http.ResponseWriter, _ *http.Request) {
 	w.WriteHeader(200)
 	w.Header().Set("Content-Length", "0")
@@ -34,7 +42,7 @@ func Healthz(w http.ResponseWriter, _ *http.Request) {
 }
 
 // initializes the kubernetes client cache
-func newKubernetesClusterCache() (clustercache.ClusterCache, error) {
+func newKubernetesClusterCache() (kubernetes.Interface, clustercache.ClusterCache, error) {
 	var err error
 
 	// Kubernetes API setup
@@ -46,19 +54,19 @@ func newKubernetesClusterCache() (clustercache.ClusterCache, error) {
 	}
 
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	kubeClientset, err := kubernetes.NewForConfig(kc)
 	if err != nil {
-		return nil, err
+		return nil, nil, err
 	}
 
 	// Create Kubernetes Cluster Cache + Watchers
 	k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
 	k8sCache.Run()
 
-	return k8sCache, nil
+	return kubeClientset, k8sCache, nil
 }
 
 func newPrometheusClient() (prometheus.Client, error) {
@@ -124,13 +132,19 @@ func main() {
 	klog.Infof("Using scrape interval of %f", scrapeInterval.Seconds())
 
 	// initialize kubernetes client and cluster cache
-	clusterCache, err := newKubernetesClusterCache()
+	k8sClient, clusterCache, err := newKubernetesClusterCache()
 	if err != nil {
 		panic(err.Error())
 	}
 
+	// Create ConfigFileManager for synchronization of shared configuration
+	confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		BucketStoreConfig: env.GetKubecostConfigBucket(),
+		LocalConfigPath:   "/",
+	})
+
 	cloudProviderKey := env.GetCloudProviderAPIKey()
-	cloudProvider, err := cloud.NewProvider(clusterCache, cloudProviderKey)
+	cloudProvider, err := cloud.NewProvider(clusterCache, cloudProviderKey, confManager)
 	if err != nil {
 		panic(err.Error())
 	}
@@ -140,7 +154,6 @@ func main() {
 	watchConfigFunc := configWatchers.ToWatchFunc()
 	watchedConfigs := configWatchers.GetWatchedConfigs()
 
-	k8sClient := clusterCache.GetClient()
 	kubecostNamespace := env.GetKubecostNamespace()
 
 	// We need an initial invocation because the init of the cache has happened before we had access to the provider.
@@ -155,16 +168,25 @@ func main() {
 
 	clusterCache.SetConfigMapUpdateFunc(watchConfigFunc)
 
+	// Initialize cluster exporting if it's enabled
+	if env.IsExportClusterCacheEnabled() {
+		cacheLocation := confManager.ConfigFileAt("/var/configs/cluster-cache.json")
+		clusterExporter = clustercache.NewClusterExporter(clusterCache, cacheLocation, ClusterExportInterval)
+		clusterExporter.Run()
+	}
+
+	// ClusterInfo Provider to provide the cluster map with local and remote cluster data
+	clusterInfoConf := confManager.ConfigFileAt("/var/configs/cluster-info.json")
+	localClusterInfo := costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider)
+	clusterInfoProvider := costmodel.NewClusterInfoWriteOnRequest(localClusterInfo, clusterInfoConf)
+
 	// Initialize ClusterMap for maintaining ClusterInfo by ClusterID
-	clusterMap := clusters.NewClusterMap(
-		promCli,
-		costmodel.NewLocalClusterInfoProvider(k8sClient, cloudProvider),
-		5*time.Minute)
+	clusterMap := clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
 
 	costModel := costmodel.NewCostModel(promCli, cloudProvider, clusterCache, clusterMap, scrapeInterval)
 
 	// initialize Kubernetes Metrics Emitter
-	metricsEmitter := costmodel.NewCostModelMetricsEmitter(promCli, clusterCache, cloudProvider, costModel)
+	metricsEmitter := costmodel.NewCostModelMetricsEmitter(promCli, clusterCache, cloudProvider, clusterInfoProvider, costModel)
 
 	// download pricing data
 	err = cloudProvider.DownloadPricingData()

+ 1 - 1
go.mod

@@ -15,7 +15,7 @@ require (
 	github.com/aws/aws-sdk-go-v2 v1.9.0
 	github.com/davecgh/go-spew v1.1.1
 	github.com/getsentry/sentry-go v0.6.1
-	github.com/google/uuid v1.1.2
+	github.com/google/uuid v1.3.0
 	github.com/json-iterator/go v1.1.10
 	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.3.0

+ 2 - 1
go.sum

@@ -208,8 +208,9 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf
 github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
 github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=

+ 1 - 1
pkg/cloud/awsprovider.go

@@ -2020,7 +2020,7 @@ func (a *AWS) ExternalAllocations(start string, end string, aggregators []string
 		}
 
 		if customPricing.BillingDataDataset != "" && !crossCluster { // There is GCP data, meaning someone has tried to configure a GCP out-of-cluster allocation.
-			gcp, err := NewCrossClusterProvider("gcp", "aws.json", a.Clientset)
+			gcp, err := NewCrossClusterProvider("gcp", a.Config.ConfigFileManager(), "aws.json", a.Clientset)
 			if err != nil {
 				klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
 			}

+ 1 - 1
pkg/cloud/gcpprovider.go

@@ -352,7 +352,7 @@ func (gcp *GCP) ExternalAllocations(start string, end string, aggregators []stri
 
 	var s []*OutOfClusterAllocation
 	if c.ServiceKeyName != "" && c.ServiceKeySecret != "" && !crossCluster {
-		aws, err := NewCrossClusterProvider("aws", "gcp.json", gcp.Clientset)
+		aws, err := NewCrossClusterProvider("aws", gcp.Config.ConfigFileManager(), "gcp.json", gcp.Clientset)
 		if err != nil {
 			klog.Infof("Could not instantiate cross-cluster provider %s", err.Error())
 		}

+ 19 - 17
pkg/cloud/provider.go

@@ -4,18 +4,20 @@ import (
 	"database/sql"
 	"errors"
 	"fmt"
-	"github.com/kubecost/cost-model/pkg/util"
 	"io"
 	"regexp"
 	"strconv"
 	"strings"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/util"
+
 	"k8s.io/klog"
 
 	"cloud.google.com/go/compute/metadata"
 
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util/watcher"
@@ -383,31 +385,31 @@ func ShareTenancyCosts(p Provider) bool {
 	return config.ShareTenancyCosts == "true"
 }
 
-func NewCrossClusterProvider(ctype string, overrideConfigPath string, cache clustercache.ClusterCache) (Provider, error) {
+func NewCrossClusterProvider(ctype string, config *config.ConfigFileManager, overrideConfigPath string, cache clustercache.ClusterCache) (Provider, error) {
 	if ctype == "aws" {
 		return &AWS{
 			Clientset: cache,
-			Config:    NewProviderConfig(overrideConfigPath),
+			Config:    NewProviderConfig(config, overrideConfigPath),
 		}, nil
 	} else if ctype == "gcp" {
 		return &GCP{
 			Clientset: cache,
-			Config:    NewProviderConfig(overrideConfigPath),
+			Config:    NewProviderConfig(config, overrideConfigPath),
 		}, nil
 	} else if ctype == "azure" {
 		return &Azure{
 			Clientset: cache,
-			Config:    NewProviderConfig(overrideConfigPath),
+			Config:    NewProviderConfig(config, overrideConfigPath),
 		}, nil
 	}
 	return &CustomProvider{
 		Clientset: cache,
-		Config:    NewProviderConfig(overrideConfigPath),
+		Config:    NewProviderConfig(config, overrideConfigPath),
 	}, nil
 }
 
 // NewProvider looks at the nodespec or provider metadata server to decide which provider to instantiate.
-func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, error) {
+func NewProvider(cache clustercache.ClusterCache, apiKey string, config *config.ConfigFileManager) (Provider, error) {
 	nodes := cache.GetAllNodes()
 	if len(nodes) == 0 {
 		return nil, fmt.Errorf("Could not locate any nodes for cluster.")
@@ -422,7 +424,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 			CSVLocation: env.GetCSVPath(),
 			CustomProvider: &CustomProvider{
 				Clientset: cache,
-				Config:    NewProviderConfig(cp.configFileName),
+				Config:    NewProviderConfig(config, cp.configFileName),
 			},
 		}, nil
 	case "GCP":
@@ -433,7 +435,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		return &GCP{
 			Clientset:        cache,
 			APIKey:           apiKey,
-			Config:           NewProviderConfig(cp.configFileName),
+			Config:           NewProviderConfig(config, cp.configFileName),
 			clusterRegion:    cp.region,
 			clusterProjectId: cp.projectID,
 		}, nil
@@ -441,7 +443,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		klog.V(2).Info("Found ProviderID starting with \"aws\", using AWS Provider")
 		return &AWS{
 			Clientset:        cache,
-			Config:           NewProviderConfig(cp.configFileName),
+			Config:           NewProviderConfig(config, cp.configFileName),
 			clusterRegion:    cp.region,
 			clusterAccountId: cp.accountID,
 		}, nil
@@ -449,7 +451,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		klog.V(2).Info("Found ProviderID starting with \"azure\", using Azure Provider")
 		return &Azure{
 			Clientset:        cache,
-			Config:           NewProviderConfig(cp.configFileName),
+			Config:           NewProviderConfig(config, cp.configFileName),
 			clusterRegion:    cp.region,
 			clusterAccountId: cp.accountID,
 		}, nil
@@ -457,7 +459,7 @@ func NewProvider(cache clustercache.ClusterCache, apiKey string) (Provider, erro
 		klog.V(2).Info("Unsupported provider, falling back to default")
 		return &CustomProvider{
 			Clientset: cache,
-			Config:    NewProviderConfig(cp.configFileName),
+			Config:    NewProviderConfig(config, cp.configFileName),
 		}, nil
 	}
 }
@@ -470,15 +472,15 @@ type clusterProperties struct {
 	projectID      string
 }
 
-func getClusterProperties(node *v1.Node) (clusterProperties) {
+func getClusterProperties(node *v1.Node) clusterProperties {
 	providerID := strings.ToLower(node.Spec.ProviderID)
 	region, _ := util.GetRegion(node.Labels)
 	cp := clusterProperties{
-		provider: "DEFAULT",
+		provider:       "DEFAULT",
 		configFileName: "default.json",
-		region: region,
-		accountID: "",
-		projectID: "",
+		region:         region,
+		accountID:      "",
+		projectID:      "",
 	}
 	if metadata.OnGCE() {
 		cp.provider = "GCP"

+ 69 - 34
pkg/cloud/providerconfig.go

@@ -2,14 +2,15 @@ package cloud
 
 import (
 	"fmt"
-	"io/ioutil"
+	gopath "path"
 	"reflect"
 	"strconv"
 	"strings"
 	"sync"
 
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/env"
-	"github.com/kubecost/cost-model/pkg/util/fileutil"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/util/json"
 	"github.com/microcosm-cc/bluemonday"
 
@@ -18,23 +19,61 @@ import (
 
 var sanitizePolicy = bluemonday.UGCPolicy()
 
-// ProviderConfig is a utility class that provides a thread-safe configuration
-// storage/cache for all Provider implementations
+// ProviderConfig is a utility class that provides a thread-safe configuration storage/cache for all Provider
+// implementations, and provides a
 type ProviderConfig struct {
-	lock          *sync.Mutex
-	fileName      string
-	configPath    string
-	customPricing *CustomPricing
+	lock            *sync.Mutex
+	configManager   *config.ConfigFileManager
+	configFile      *config.ConfigFile
+	customPricing   *CustomPricing
+	watcherHandleID config.HandlerID
 }
 
-// Creates a new ProviderConfig instance
-func NewProviderConfig(file string) *ProviderConfig {
-	return &ProviderConfig{
+// NewProviderConfig creates a new ConfigFile and returns the ProviderConfig
+func NewProviderConfig(configManager *config.ConfigFileManager, fileName string) *ProviderConfig {
+	configFile := configManager.ConfigFileAt(configPathFor(fileName))
+	pc := &ProviderConfig{
 		lock:          new(sync.Mutex),
-		fileName:      file,
-		configPath:    configPathFor(file),
+		configManager: configManager,
+		configFile:    configFile,
 		customPricing: nil,
 	}
+
+	// add the provider config func as handler for the config file changes
+	pc.watcherHandleID = configFile.AddChangeHandler(pc.onConfigFileUpdated)
+	return pc
+}
+
+// onConfigFileUpdated handles any time the config file contents are updated, created, or deleted
+func (pc *ProviderConfig) onConfigFileUpdated(changeType config.ChangeType, data []byte) {
+	// TODO: (bolt) Currently this has the side-effect of setting pc.customPricing twice when the update
+	// TODO: (bolt) is made from this ProviderConfig instance. We'll need to implement a way of identifying
+	// TODO: (bolt) when to ignore updates when the change and handler are the same source
+	log.Infof("CustomPricing Config Updated: %s", changeType)
+
+	switch changeType {
+	case config.ChangeTypeCreated:
+		fallthrough
+	case config.ChangeTypeModified:
+		pc.lock.Lock()
+		defer pc.lock.Unlock()
+
+		customPricing := new(CustomPricing)
+		err := json.Unmarshal(data, customPricing)
+		if err != nil {
+			klog.Infof("Could not decode Custom Pricing file at path %s", pc.configFile.Path())
+			customPricing = DefaultPricing()
+		}
+
+		pc.customPricing = customPricing
+		if pc.customPricing.SpotGPU == "" {
+			pc.customPricing.SpotGPU = DefaultPricing().SpotGPU // Migration for users without this value set by default.
+		}
+
+		if pc.customPricing.ShareTenancyCosts == "" {
+			pc.customPricing.ShareTenancyCosts = defaultShareTenancyCost
+		}
+	}
 }
 
 // Non-ThreadSafe logic to load the config file if a cache does not exist. Flag to write
@@ -44,16 +83,16 @@ func (pc *ProviderConfig) loadConfig(writeIfNotExists bool) (*CustomPricing, err
 		return pc.customPricing, nil
 	}
 
-	exists, err := fileExists(pc.configPath)
+	exists, err := pc.configFile.Exists()
 	// File Error other than NotExists
 	if err != nil {
-		klog.Infof("Custom Pricing file at path '%s' read error: '%s'", pc.configPath, err.Error())
+		klog.Infof("Custom Pricing file at path '%s' read error: '%s'", pc.configFile.Path(), err.Error())
 		return DefaultPricing(), err
 	}
 
 	// File Doesn't Exist
 	if !exists {
-		klog.Infof("Could not find Custom Pricing file at path '%s'", pc.configPath)
+		klog.Infof("Could not find Custom Pricing file at path '%s'", pc.configFile.Path())
 		pc.customPricing = DefaultPricing()
 
 		// Only write the file if flag enabled
@@ -63,9 +102,9 @@ func (pc *ProviderConfig) loadConfig(writeIfNotExists bool) (*CustomPricing, err
 				return pc.customPricing, err
 			}
 
-			err = ioutil.WriteFile(pc.configPath, cj, 0644)
+			err = pc.configFile.Write(cj)
 			if err != nil {
-				klog.Infof("Could not write Custom Pricing file to path '%s'", pc.configPath)
+				klog.Infof("Could not write Custom Pricing file to path '%s'", pc.configFile.Path())
 				return pc.customPricing, err
 			}
 		}
@@ -74,9 +113,9 @@ func (pc *ProviderConfig) loadConfig(writeIfNotExists bool) (*CustomPricing, err
 	}
 
 	// File Exists - Read all contents of file, unmarshal json
-	byteValue, err := ioutil.ReadFile(pc.configPath)
+	byteValue, err := pc.configFile.Read()
 	if err != nil {
-		klog.Infof("Could not read Custom Pricing file at path %s", pc.configPath)
+		klog.Infof("Could not read Custom Pricing file at path %s", pc.configFile.Path())
 		// If read fails, we don't want to cache default, assuming that the file is valid
 		return DefaultPricing(), err
 	}
@@ -84,7 +123,7 @@ func (pc *ProviderConfig) loadConfig(writeIfNotExists bool) (*CustomPricing, err
 	var customPricing CustomPricing
 	err = json.Unmarshal(byteValue, &customPricing)
 	if err != nil {
-		klog.Infof("Could not decode Custom Pricing file at path %s", pc.configPath)
+		klog.Infof("Could not decode Custom Pricing file at path %s", pc.configFile.Path())
 		return DefaultPricing(), err
 	}
 
@@ -108,6 +147,13 @@ func (pc *ProviderConfig) GetCustomPricingData() (*CustomPricing, error) {
 	return pc.loadConfig(true)
 }
 
+// ConfigFileManager returns the ConfigFileManager instance used to manage the CustomPricing
+// configuration. In the event of a multi-provider setup, this instance should be used to
+// configure any other configuration providers.
+func (pc *ProviderConfig) ConfigFileManager() *config.ConfigFileManager {
+	return pc.configManager
+}
+
 // Allows a call to manually update the configuration while maintaining proper thread-safety
 // for read/write methods.
 func (pc *ProviderConfig) Update(updateFunc func(*CustomPricing) error) (*CustomPricing, error) {
@@ -132,7 +178,7 @@ func (pc *ProviderConfig) Update(updateFunc func(*CustomPricing) error) (*Custom
 	if err != nil {
 		return c, err
 	}
-	err = ioutil.WriteFile(pc.configPath, cj, 0644)
+	err = pc.configFile.Write(cj)
 
 	if err != nil {
 		return c, err
@@ -210,19 +256,8 @@ func SetCustomPricingField(obj *CustomPricing, name string, value string) error
 	return nil
 }
 
-// File exists has three different return cases that should be handled:
-//   1. File exists and is not a directory (true, nil)
-//   2. File does not exist (false, nil)
-//   3. File may or may not exist. Error occurred during stat (false, error)
-// The third case represents the scenario where the stat returns an error,
-// but the error isn't relevant to the path. This can happen when the current
-// user doesn't have permission to access the file.
-func fileExists(filename string) (bool, error) {
-	return fileutil.FileExists(filename) // delegate to utility method
-}
-
 // Returns the configuration directory concatenated with a specific config file name
 func configPathFor(filename string) string {
 	path := env.GetConfigPathWithDefault("/models/")
-	return path + filename
+	return gopath.Join(path, filename)
 }

+ 20 - 10
pkg/clustercache/clustercache.go

@@ -10,6 +10,7 @@ import (
 	autoscaling "k8s.io/api/autoscaling/v2beta1"
 	batchv1 "k8s.io/api/batch/v1"
 	v1 "k8s.io/api/core/v1"
+	"k8s.io/api/policy/v1beta1"
 	stv1 "k8s.io/api/storage/v1"
 	"k8s.io/apimachinery/pkg/fields"
 	"k8s.io/client-go/kubernetes"
@@ -24,10 +25,6 @@ type ClusterCache interface {
 	// Stops the watcher processes
 	Stop()
 
-	// Gets the underlying clientset
-	// TODO: Remove once we support all cached cluster components
-	GetClient() kubernetes.Interface
-
 	// GetAllNamespaces returns all the cached namespaces
 	GetAllNamespaces() []*v1.Namespace
 
@@ -64,9 +61,12 @@ type ClusterCache interface {
 	// GetAllJobs returns all the cached jobs
 	GetAllJobs() []*batchv1.Job
 
-	// GetAllHorizontalPodAutoscalers() returns all cached horizontal pod autoscalers
+	// GetAllHorizontalPodAutoscalers returns all cached horizontal pod autoscalers
 	GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler
 
+	// GetAllPodDisruptionBudgets returns all cached pod disruption budgets
+	GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget
+
 	// SetConfigMapUpdateFunc sets the configmap update function
 	SetConfigMapUpdateFunc(func(interface{}))
 }
@@ -89,6 +89,7 @@ type KubernetesClusterCache struct {
 	storageClassWatch      WatchController
 	jobsWatch              WatchController
 	hpaWatch               WatchController
+	pdbWatch               WatchController
 	stop                   chan struct{}
 }
 
@@ -103,6 +104,7 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	storageRestClient := client.StorageV1().RESTClient()
 	batchClient := client.BatchV1().RESTClient()
 	autoscalingClient := client.AutoscalingV2beta1().RESTClient()
+	pdbClient := client.PolicyV1beta1().RESTClient()
 
 	kubecostNamespace := env.GetKubecostNamespace()
 	klog.Infof("NAMESPACE: %s", kubecostNamespace)
@@ -123,11 +125,12 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 		storageClassWatch:      NewCachingWatcher(storageRestClient, "storageclasses", &stv1.StorageClass{}, "", fields.Everything()),
 		jobsWatch:              NewCachingWatcher(batchClient, "jobs", &batchv1.Job{}, "", fields.Everything()),
 		hpaWatch:               NewCachingWatcher(autoscalingClient, "horizontalpodautoscalers", &autoscaling.HorizontalPodAutoscaler{}, "", fields.Everything()),
+		pdbWatch:               NewCachingWatcher(pdbClient, "poddisruptionbudgets", &v1beta1.PodDisruptionBudget{}, "", fields.Everything()),
 	}
 
 	// Wait for each caching watcher to initialize
 	var wg sync.WaitGroup
-	wg.Add(14)
+	wg.Add(15)
 
 	cancel := make(chan struct{})
 
@@ -145,6 +148,7 @@ func NewKubernetesClusterCache(client kubernetes.Interface) ClusterCache {
 	go initializeCache(kcc.storageClassWatch, &wg, cancel)
 	go initializeCache(kcc.jobsWatch, &wg, cancel)
 	go initializeCache(kcc.hpaWatch, &wg, cancel)
+	go initializeCache(kcc.podWatch, &wg, cancel)
 
 	wg.Wait()
 
@@ -171,6 +175,7 @@ func (kcc *KubernetesClusterCache) Run() {
 	go kcc.storageClassWatch.Run(1, stopCh)
 	go kcc.jobsWatch.Run(1, stopCh)
 	go kcc.hpaWatch.Run(1, stopCh)
+	go kcc.pdbWatch.Run(1, stopCh)
 
 	kcc.stop = stopCh
 }
@@ -184,10 +189,6 @@ func (kcc *KubernetesClusterCache) Stop() {
 	kcc.stop = nil
 }
 
-func (kcc *KubernetesClusterCache) GetClient() kubernetes.Interface {
-	return kcc.client
-}
-
 func (kcc *KubernetesClusterCache) GetAllNamespaces() []*v1.Namespace {
 	var namespaces []*v1.Namespace
 	items := kcc.namespaceWatch.GetAll()
@@ -305,6 +306,15 @@ func (kcc *KubernetesClusterCache) GetAllHorizontalPodAutoscalers() []*autoscali
 	return hpas
 }
 
+func (kcc *KubernetesClusterCache) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
+	var pdbs []*v1beta1.PodDisruptionBudget
+	items := kcc.pdbWatch.GetAll()
+	for _, pdb := range items {
+		pdbs = append(pdbs, pdb.(*v1beta1.PodDisruptionBudget))
+	}
+	return pdbs
+}
+
 func (kcc *KubernetesClusterCache) SetConfigMapUpdateFunc(f func(interface{})) {
 	kcc.kubecostConfigMapWatch.SetUpdateHandler(f)
 }

+ 114 - 0
pkg/clustercache/clusterexporter.go

@@ -0,0 +1,114 @@
+package clustercache
+
+import (
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/config"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/util/json"
+
+	appsv1 "k8s.io/api/apps/v1"
+	autoscaling "k8s.io/api/autoscaling/v2beta1"
+	batchv1 "k8s.io/api/batch/v1"
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/api/policy/v1beta1"
+	stv1 "k8s.io/api/storage/v1"
+)
+
+// clusterEncoding is used to represent the cluster objects in the encoded states.
+type clusterEncoding struct {
+	Namespaces               []*v1.Namespace                        `json:"namespaces,omitempty"`
+	Nodes                    []*v1.Node                             `json:"nodes,omitempty"`
+	Pods                     []*v1.Pod                              `json:"pods,omitempty"`
+	Services                 []*v1.Service                          `json:"services,omitempty"`
+	DaemonSets               []*appsv1.DaemonSet                    `json:"daemonSets,omitempty"`
+	Deployments              []*appsv1.Deployment                   `json:"deployments,omitempty"`
+	StatefulSets             []*appsv1.StatefulSet                  `json:"statefulSets,omitempty"`
+	ReplicaSets              []*appsv1.ReplicaSet                   `json:"replicaSets,omitempty"`
+	PersistentVolumes        []*v1.PersistentVolume                 `json:"persistentVolumes,omitempty"`
+	PersistentVolumeClaims   []*v1.PersistentVolumeClaim            `json:"persistentVolumeClaims,omitempty"`
+	StorageClasses           []*stv1.StorageClass                   `json:"storageClasses,omitempty"`
+	Jobs                     []*batchv1.Job                         `json:"jobs,omitempty"`
+	HorizontalPodAutoscalers []*autoscaling.HorizontalPodAutoscaler `json:"horizontalPodAutoscalers,omitempty"`
+	PodDisruptionBudgets     []*v1beta1.PodDisruptionBudget         `json:"podDisruptionBudgets,omitEmpty"`
+}
+
+// ClusterExporter manages and runs an file export process which dumps the local kubernetes cluster to a target location.
+type ClusterExporter struct {
+	cluster  ClusterCache
+	target   *config.ConfigFile
+	interval time.Duration
+	stop     chan struct{}
+}
+
+// NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
+func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interval time.Duration) *ClusterExporter {
+	return &ClusterExporter{
+		cluster:  cluster,
+		target:   target,
+		interval: interval,
+	}
+}
+
+// Run starts the automated process of running Export on a specific interval.
+func (ce *ClusterExporter) Run() {
+	if ce.stop != nil {
+		return
+	}
+
+	ce.stop = make(chan struct{})
+	go func() {
+		for {
+			err := ce.Export()
+			if err != nil {
+				log.Warningf("Failed to export cluster: %s", err)
+			}
+
+			select {
+			case <-time.After(ce.interval):
+			case <-ce.stop:
+				return
+			}
+		}
+	}()
+}
+
+// Stop halts the Cluster export on an interval
+func (ce *ClusterExporter) Stop() {
+	if ce.stop == nil {
+		log.Warningf("Cluster exporter is already stopped.")
+		return
+	}
+
+	close(ce.stop)
+	ce.stop = nil
+}
+
+// Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the
+// target location.
+func (ce *ClusterExporter) Export() error {
+	c := ce.cluster
+	encoding := &clusterEncoding{
+		Namespaces:               c.GetAllNamespaces(),
+		Nodes:                    c.GetAllNodes(),
+		Pods:                     c.GetAllPods(),
+		Services:                 c.GetAllServices(),
+		DaemonSets:               c.GetAllDaemonSets(),
+		Deployments:              c.GetAllDeployments(),
+		StatefulSets:             c.GetAllStatefulSets(),
+		ReplicaSets:              c.GetAllReplicaSets(),
+		PersistentVolumes:        c.GetAllPersistentVolumes(),
+		PersistentVolumeClaims:   c.GetAllPersistentVolumeClaims(),
+		StorageClasses:           c.GetAllStorageClasses(),
+		Jobs:                     c.GetAllJobs(),
+		HorizontalPodAutoscalers: c.GetAllHorizontalPodAutoscalers(),
+		PodDisruptionBudgets:     c.GetAllPodDisruptionBudgets(),
+	}
+
+	data, err := json.Marshal(encoding)
+	if err != nil {
+		return err
+	}
+
+	return ce.target.Write(data)
+}

+ 304 - 0
pkg/clustercache/clusterimporter.go

@@ -0,0 +1,304 @@
+package clustercache
+
+import (
+	"sync"
+
+	"github.com/kubecost/cost-model/pkg/config"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/util/json"
+	appsv1 "k8s.io/api/apps/v1"
+	autoscaling "k8s.io/api/autoscaling/v2beta1"
+	batchv1 "k8s.io/api/batch/v1"
+	v1 "k8s.io/api/core/v1"
+	"k8s.io/api/policy/v1beta1"
+	stv1 "k8s.io/api/storage/v1"
+)
+
+// ClusterImporter is an implementation of ClusterCache which leverages a backing configuration file
+// as it's source of the cluster data.
+type ClusterImporter struct {
+	source          *config.ConfigFile
+	sourceHandlerID config.HandlerID
+	dataLock        *sync.Mutex
+	data            *clusterEncoding
+}
+
+// Creates a new ClusterCache implementation which uses an import process to provide cluster data
+func NewClusterImporter(source *config.ConfigFile) ClusterCache {
+	return &ClusterImporter{
+		source:   source,
+		dataLock: new(sync.Mutex),
+		data:     new(clusterEncoding),
+	}
+}
+
+// onImportSourceChanged handles the source data updating
+func (ci *ClusterImporter) onImportSourceChanged(changeType config.ChangeType, data []byte) {
+	if changeType == config.ChangeTypeDeleted {
+		ci.dataLock.Lock()
+		ci.data = new(clusterEncoding)
+		ci.dataLock.Unlock()
+		return
+	}
+
+	ci.update(data)
+}
+
+// update replaces the underlying cluster data with the provided new data if it decodes
+func (ci *ClusterImporter) update(data []byte) {
+	ce := new(clusterEncoding)
+	err := json.Unmarshal(data, ce)
+	if err != nil {
+		log.Warningf("Failed to unmarshal cluster during import: %s", err)
+		return
+	}
+
+	ci.dataLock.Lock()
+	ci.data = ce
+	ci.dataLock.Unlock()
+}
+
+// Run starts the watcher processes
+func (ci *ClusterImporter) Run() {
+	exists, err := ci.source.Exists()
+	if err != nil {
+		log.Errorf("Failed to import source for cluster: %s", err)
+		return
+	}
+
+	if exists {
+		data, err := ci.source.Read()
+		if err != nil {
+			log.Warningf("Failed to import cluster: %s", err)
+		} else {
+			ci.update(data)
+		}
+	}
+
+	ci.sourceHandlerID = ci.source.AddChangeHandler(ci.onImportSourceChanged)
+}
+
+// Stops the watcher processes
+func (ci *ClusterImporter) Stop() {
+	if ci.sourceHandlerID != "" {
+		ci.source.RemoveChangeHandler(ci.sourceHandlerID)
+		ci.sourceHandlerID = ""
+	}
+}
+
+// GetAllNamespaces returns all the cached namespaces
+func (ci *ClusterImporter) GetAllNamespaces() []*v1.Namespace {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	namespaces := ci.data.Namespaces
+	cloneList := make([]*v1.Namespace, 0, len(namespaces))
+	for _, v := range namespaces {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllNodes returns all the cached nodes
+func (ci *ClusterImporter) GetAllNodes() []*v1.Node {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	nodes := ci.data.Nodes
+	cloneList := make([]*v1.Node, 0, len(nodes))
+	for _, v := range nodes {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllPods returns all the cached pods
+func (ci *ClusterImporter) GetAllPods() []*v1.Pod {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	pods := ci.data.Pods
+	cloneList := make([]*v1.Pod, 0, len(pods))
+	for _, v := range pods {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllServices returns all the cached services
+func (ci *ClusterImporter) GetAllServices() []*v1.Service {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	services := ci.data.Services
+	cloneList := make([]*v1.Service, 0, len(services))
+	for _, v := range services {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllDaemonSets returns all the cached DaemonSets
+func (ci *ClusterImporter) GetAllDaemonSets() []*appsv1.DaemonSet {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	daemonSets := ci.data.DaemonSets
+	cloneList := make([]*appsv1.DaemonSet, 0, len(daemonSets))
+	for _, v := range daemonSets {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllDeployments returns all the cached deployments
+func (ci *ClusterImporter) GetAllDeployments() []*appsv1.Deployment {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	deployments := ci.data.Deployments
+	cloneList := make([]*appsv1.Deployment, 0, len(deployments))
+	for _, v := range deployments {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllStatfulSets returns all the cached StatefulSets
+func (ci *ClusterImporter) GetAllStatefulSets() []*appsv1.StatefulSet {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	statefulSets := ci.data.StatefulSets
+	cloneList := make([]*appsv1.StatefulSet, 0, len(statefulSets))
+	for _, v := range statefulSets {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllReplicaSets returns all the cached ReplicaSets
+func (ci *ClusterImporter) GetAllReplicaSets() []*appsv1.ReplicaSet {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	replicaSets := ci.data.ReplicaSets
+	cloneList := make([]*appsv1.ReplicaSet, 0, len(replicaSets))
+	for _, v := range replicaSets {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllPersistentVolumes returns all the cached persistent volumes
+func (ci *ClusterImporter) GetAllPersistentVolumes() []*v1.PersistentVolume {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	pvs := ci.data.PersistentVolumes
+	cloneList := make([]*v1.PersistentVolume, 0, len(pvs))
+	for _, v := range pvs {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllPersistentVolumeClaims returns all the cached persistent volume claims
+func (ci *ClusterImporter) GetAllPersistentVolumeClaims() []*v1.PersistentVolumeClaim {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	pvcs := ci.data.PersistentVolumeClaims
+	cloneList := make([]*v1.PersistentVolumeClaim, 0, len(pvcs))
+	for _, v := range pvcs {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllStorageClasses returns all the cached storage classes
+func (ci *ClusterImporter) GetAllStorageClasses() []*stv1.StorageClass {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	storageClasses := ci.data.StorageClasses
+	cloneList := make([]*stv1.StorageClass, 0, len(storageClasses))
+	for _, v := range storageClasses {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllJobs returns all the cached jobs
+func (ci *ClusterImporter) GetAllJobs() []*batchv1.Job {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	jobs := ci.data.Jobs
+	cloneList := make([]*batchv1.Job, 0, len(jobs))
+	for _, v := range jobs {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllHorizontalPodAutoscalers() returns all cached horizontal pod autoscalers
+func (ci *ClusterImporter) GetAllHorizontalPodAutoscalers() []*autoscaling.HorizontalPodAutoscaler {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	hpas := ci.data.HorizontalPodAutoscalers
+	cloneList := make([]*autoscaling.HorizontalPodAutoscaler, 0, len(hpas))
+	for _, v := range hpas {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// GetAllPodDisruptionBudgets returns all cached pod disruption budgets
+func (ci *ClusterImporter) GetAllPodDisruptionBudgets() []*v1beta1.PodDisruptionBudget {
+	ci.dataLock.Lock()
+	defer ci.dataLock.Unlock()
+
+	// Deep copy here to avoid callers from corrupting the cache
+	// This also mimic's the behavior of the default cluster cache impl.
+	pdbs := ci.data.PodDisruptionBudgets
+	cloneList := make([]*v1beta1.PodDisruptionBudget, 0, len(pdbs))
+	for _, v := range pdbs {
+		cloneList = append(cloneList, v.DeepCopy())
+	}
+	return cloneList
+}
+
+// SetConfigMapUpdateFunc sets the configmap update function
+func (ci *ClusterImporter) SetConfigMapUpdateFunc(_ func(interface{})) {
+	// TODO: (bolt) This function is still a bit strange to me for the ClusterCache interface.
+	// TODO: (bolt) no-op for now.
+	log.Warningf("SetConfigMapUpdateFunc is disabled for imported cluster data.")
+}

+ 335 - 9
pkg/config/configfile.go

@@ -1,15 +1,341 @@
 package config
 
-import "github.com/kubecost/cost-model/pkg/storage"
+import (
+	"os"
+	"sort"
+	"sync"
+	"time"
 
-// Configuration
-type Configuration interface {
-	Get(result interface{}) error
-	Set(new interface{}) error
+	"github.com/google/uuid"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/storage"
+)
+
+// HandlerID is a unique identifier assigned to a provided ConfigChangedHandler. This is used to remove a handler
+// from the ConfigFile when it is no longer needed.
+type HandlerID string
+
+//--------------------------------------------------------------------------
+//  ChangeType
+//--------------------------------------------------------------------------
+
+// ChangeType is used to specifically categorize the change that was made on a ConfigFile
+type ChangeType string
+
+// ChangeType constants contain the different types of updates passed through the ConfigChangedHandler
+const (
+	ChangeTypeCreated  ChangeType = "created"
+	ChangeTypeModified ChangeType = "modified"
+	ChangeTypeDeleted  ChangeType = "deleted"
+)
+
+//--------------------------------------------------------------------------
+//  ConfigChangedHandler
+//--------------------------------------------------------------------------
+
+// ConfigChangedHandler is the func handler used to receive change updates about the
+// config file. Both ChangeTypeCreated and ChangeTypeModified yield a valid []byte, while
+// ChangeTypeDeleted yields a nil []byte.
+type ConfigChangedHandler func(ChangeType, []byte)
+
+//--------------------------------------------------------------------------
+//  ConfigFile
+//--------------------------------------------------------------------------
+
+// DefaultHandlerPriority is used as the priority for any handlers added via AddChangeHandler
+const DefaultHandlerPriority int = 1000
+
+// ConfigFile is representation of a configuration file that can be written to, read, and watched
+// for updates
+type ConfigFile struct {
+	store      storage.Storage
+	file       string
+	dataLock   *sync.Mutex
+	data       []byte
+	watchLock  *sync.Mutex
+	watchers   []*pHandler
+	watchStop  chan struct{}
+	lastChange time.Time
+}
+
+// NewConfigFile creates a new ConfigFile instance using a specific storage.Storage and path relative
+// to the storage.
+func NewConfigFile(store storage.Storage, file string) *ConfigFile {
+	return &ConfigFile{
+		store:     store,
+		file:      file,
+		dataLock:  new(sync.Mutex),
+		data:      nil,
+		watchLock: new(sync.Mutex),
+		watchStop: nil,
+	}
+}
+
+// Path returns the fully qualified path of the config file.
+func (cf *ConfigFile) Path() string {
+	return cf.store.FullPath(cf.file)
+}
+
+// Write will write the binary data to the file.
+func (cf *ConfigFile) Write(data []byte) error {
+	e := cf.store.Write(cf.file, data)
+	// update cache on successful write
+	if e == nil {
+		cf.dataLock.Lock()
+		cf.data = data
+		cf.dataLock.Unlock()
+	}
+	return e
+}
+
+// Read will read the binary data from the file and return it. If an error is returned,
+// the byte array will be nil.
+func (cf *ConfigFile) Read() ([]byte, error) {
+	return cf.internalRead(false)
+}
+
+// internalRead is used to allow a forced override of data cache to refresh data
+func (cf *ConfigFile) internalRead(force bool) ([]byte, error) {
+	cf.dataLock.Lock()
+	defer cf.dataLock.Unlock()
+	if !force {
+		if cf.data != nil {
+			return cf.data, nil
+		}
+	}
+
+	d, e := cf.store.Read(cf.file)
+	if e != nil {
+		return nil, e
+	}
+	cf.data = d
+	return cf.data, nil
+}
+
+// Stat returns the StorageStats for the file.
+func (cf *ConfigFile) Stat() (*storage.StorageInfo, error) {
+	return cf.store.Stat(cf.file)
+}
+
+// Exists returns true if the file exist. If an error other than a NotExist error is returned,
+// the result will be false with the provided error.
+func (cf *ConfigFile) Exists() (bool, error) {
+	return cf.store.Exists(cf.file)
+}
+
+// Delete removes the file from storage permanently.
+func (cf *ConfigFile) Delete() error {
+	e := cf.store.Remove(cf.file)
+
+	// on removal, clear data cache
+	if e == nil {
+		cf.dataLock.Lock()
+		cf.data = nil
+		cf.dataLock.Unlock()
+	}
+	return e
+}
+
+// Refresh allows external callers to force reload the config file from internal storage. This is
+// particularly useful when there exist no change listeners on the config, which would prevent the
+// data cache from automatically updating on change
+func (cf *ConfigFile) Refresh() ([]byte, error) {
+	return cf.internalRead(true)
+}
+
+// AddChangeHandler accepts a ConfigChangedHandler function which will be called whenever the implementation
+// detects that a change has been made. A unique HandlerID is returned that can be used to remove the handler
+// if necessary.
+func (cf *ConfigFile) AddChangeHandler(handler ConfigChangedHandler) HandlerID {
+	return cf.AddPriorityChangeHandler(handler, DefaultHandlerPriority)
+}
+
+// AddPriorityChangeHandler allows adding a config change handler with a specific priority. By default,
+// any handlers added via AddChangeHandler have a default priority of 1000. The lower the priority, the
+// sooner in the handler execution it will be called.
+func (cf *ConfigFile) AddPriorityChangeHandler(handler ConfigChangedHandler, priority int) HandlerID {
+	cf.watchLock.Lock()
+	defer cf.watchLock.Unlock()
+
+	h := &pHandler{
+		id:       HandlerID(uuid.NewString()),
+		handler:  handler,
+		priority: priority,
+	}
+
+	cf.watchers = append(cf.watchers, h)
+
+	// create the actual file watcher once we have at least one active watcher func registered
+	if len(cf.watchers) == 1 {
+		cf.runWatcher()
+	}
+
+	return h.id
+}
+
+// RemoveChangeHandler removes the change handler with the provided identifier if it exists. True
+// is returned if the handler was removed (it existed), false otherwise.
+func (cf *ConfigFile) RemoveChangeHandler(id HandlerID) bool {
+	cf.watchLock.Lock()
+	defer cf.watchLock.Unlock()
+
+	for i := range cf.watchers {
+		if cf.watchers[i] != nil && cf.watchers[i].id == id {
+			copy(cf.watchers[i:], cf.watchers[i+1:])
+			cf.watchers[len(cf.watchers)-1] = nil
+			cf.watchers = cf.watchers[:len(cf.watchers)-1]
+
+			// stop watching the file for changes if there are no more external watchers
+			if len(cf.watchers) == 0 {
+				cf.stopWatcher()
+			}
+
+			return true
+		}
+	}
+	return false
+}
+
+// RemoveAllHandlers removes all added handlers
+func (cf *ConfigFile) RemoveAllHandlers() {
+	cf.watchLock.Lock()
+	defer cf.watchLock.Unlock()
+
+	cf.watchers = nil
+
+	cf.stopWatcher()
+}
+
+// runWatcher creates a go routine which will poll the stat of a storage target on a specific
+// interval and dispatch created, modified, and deleted events for that file.
+func (cf *ConfigFile) runWatcher() {
+	if cf.watchStop != nil {
+		log.Warningf("Run watcher already running for file: %s", cf.file)
+		return
+	}
+
+	cf.watchStop = make(chan struct{})
+
+	go func() {
+		first := true
+
+		var last time.Time
+		var exists bool
+
+		for {
+			// Each iteration, check for the stop trigger, or wait 10 seconds
+			select {
+			case <-cf.watchStop:
+				return
+			case <-time.After(10 * time.Second):
+			}
+
+			// Query stat on the file, on errors other than exists,
+			// we'll need to log the error, and perhaps limit the retries
+			st, err := cf.Stat()
+			if err != nil && !os.IsNotExist(err) {
+				log.Errorf("Storage Stat Error: %s", err)
+				continue
+			}
+
+			// On first iteration, set exists and last modification time (if applicable)
+			// and flip flag
+			if first {
+				exists = !os.IsNotExist(err)
+				if exists {
+					last = st.ModTime
+				}
+				first = false
+				continue
+			}
+
+			// File does not exist in storage, need to check to see if that is different
+			// from last state check
+			if os.IsNotExist(err) {
+				// check to see if the file has gone from exists to !exists
+				if exists {
+					exists = false
+					cf.onFileChange(ChangeTypeDeleted, nil)
+				}
+				continue
+			}
+
+			// check to see if the file has gone from !exists to exists
+			if !exists {
+				data, err := cf.internalRead(true)
+				if err != nil {
+					log.Warningf("Read() Error: %s\n", err)
+					continue
+				}
+				exists = true
+				last = st.ModTime
+				cf.onFileChange(ChangeTypeCreated, data)
+				continue
+			}
+
+			mtime := st.ModTime
+			if mtime != last {
+				last = mtime
+				data, err := cf.internalRead(true)
+				if err != nil {
+					log.Errorf("Read() Error: %s\n", err)
+					continue
+				}
+				cf.onFileChange(ChangeTypeModified, data)
+			}
+		}
+	}()
 }
 
-type BinaryConfigStore interface {
-	Read() ([]byte, error)
-	Write([]byte) error
-	Stat() (*storage.StorageInfo, error)
+// stopWatcher closes the stop channel, returning from the runWatcher go routine. Allows us
+// to remove any polling stat checks on files when there are no change handlers.
+func (cf *ConfigFile) stopWatcher() {
+	if cf.watchStop == nil {
+		return
+	}
+
+	close(cf.watchStop)
+	cf.watchStop = nil
 }
+
+// onFileChange is internally called when the core watcher recognizes a change in the ConfigFile. This
+// method dispatches that change to all added watchers
+func (cf *ConfigFile) onFileChange(changeType ChangeType, newData []byte) {
+	// On change, we copy out the handlers to a separate slice for processing for a few reasons:
+	// 1. We don't want to lock while executing the handlers
+	// 2. Handlers may want to operate on the ConfigFile instance, which would result in a deadlock
+	// 3. Allows us to implement priority sorting outside of the lock as well
+	cf.watchLock.Lock()
+	if len(cf.watchers) == 0 {
+		cf.watchLock.Unlock()
+		return
+	}
+
+	toNotify := make([]*pHandler, len(cf.watchers))
+	copy(toNotify, cf.watchers)
+	cf.watchLock.Unlock()
+
+	sort.Sort(byPriority(toNotify))
+
+	for _, handler := range toNotify {
+		handler.handler(changeType, newData)
+	}
+}
+
+//--------------------------------------------------------------------------
+//  pHandler
+//--------------------------------------------------------------------------
+
+// pHandler is a wrapper type used to assign a ConfigChangedHandler a unique identifier and priority.
+type pHandler struct {
+	id       HandlerID
+	handler  ConfigChangedHandler
+	priority int
+}
+
+// byPriority is an implementation of sort.Interface to allow sorting a slice of pHandlers by priority
+type byPriority []*pHandler
+
+func (b byPriority) Len() int           { return len(b) }
+func (b byPriority) Less(i, j int) bool { return b[i].priority < b[j].priority }
+func (b byPriority) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }

+ 98 - 0
pkg/config/configmanager.go

@@ -0,0 +1,98 @@
+package config
+
+import (
+	"io/ioutil"
+	"sync"
+
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/storage"
+)
+
+//--------------------------------------------------------------------------
+//  ConfigFileManagerOpts
+//--------------------------------------------------------------------------
+
+// ConfigFileManagerOpts describes how to configure the ConfigFileManager for
+// serving configuration files
+type ConfigFileManagerOpts struct {
+	// BucketStoreConfig is the local file location for the configuration used to
+	// write and read configuration data to/from the bucket. The format of this
+	// configuration file should be compatible with storage.NewBucketStorage
+	BucketStoreConfig string
+
+	// LocalConfigPath provides a backup location for storing the configuration
+	// files
+	LocalConfigPath string
+}
+
+// IsBucketStorageEnabled returns true if bucket storage is enabled.
+func (cfmo *ConfigFileManagerOpts) IsBucketStorageEnabled() bool {
+	return cfmo.BucketStoreConfig != ""
+}
+
+// DefaultConfigFileManagerOpts returns the default configuration options for the
+// config file manager
+func DefaultConfigFileManagerOpts() *ConfigFileManagerOpts {
+	return &ConfigFileManagerOpts{
+		BucketStoreConfig: "",
+		LocalConfigPath:   "/",
+	}
+}
+
+//--------------------------------------------------------------------------
+//  ConfigFileManager
+//--------------------------------------------------------------------------
+
+// ConfigFileManager is a fascade for a central API used to create and watch
+// config files.
+type ConfigFileManager struct {
+	lock  *sync.Mutex
+	store storage.Storage
+	files map[string]*ConfigFile
+}
+
+// NewConfigFileManager creates a new backing storage and configuration file manager
+func NewConfigFileManager(opts *ConfigFileManagerOpts) *ConfigFileManager {
+	if opts == nil {
+		opts = DefaultConfigFileManagerOpts()
+	}
+
+	var configStore storage.Storage
+	if opts.IsBucketStorageEnabled() {
+		bucketConfig, err := ioutil.ReadFile(opts.BucketStoreConfig)
+		if err != nil {
+			log.Warningf("Failed to initialize config bucket storage: %s", err)
+		} else {
+			bucketStore, err := storage.NewBucketStorage(bucketConfig)
+			if err != nil {
+				log.Warningf("Failed to create config bucket storage: %s", err)
+			} else {
+				configStore = bucketStore
+			}
+		}
+	} else {
+		configStore = storage.NewFileStorage(opts.LocalConfigPath)
+	}
+
+	return &ConfigFileManager{
+		lock:  new(sync.Mutex),
+		store: configStore,
+		files: make(map[string]*ConfigFile),
+	}
+}
+
+// ConfigFileAt returns an existing configuration file for the provided path if it exists. Otherwise,
+// a new instance is created and returned. Note that the path does not have to exist in order for the
+// instance to be created. It can exist as a potential file path on the storage, and be written to
+// later
+func (cfm *ConfigFileManager) ConfigFileAt(path string) *ConfigFile {
+	cfm.lock.Lock()
+	defer cfm.lock.Unlock()
+	if cf, ok := cfm.files[path]; ok {
+		return cf
+	}
+
+	cf := NewConfigFile(cfm.store, path)
+	cfm.files[path] = cf
+	return cf
+}

+ 82 - 21
pkg/costmodel/clusterinfo.go

@@ -4,9 +4,12 @@ import (
 	"fmt"
 
 	cloudProvider "github.com/kubecost/cost-model/pkg/cloud"
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
+	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/thanos"
+	"github.com/kubecost/cost-model/pkg/util/json"
 
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/klog"
@@ -33,6 +36,7 @@ func writeClusterProfile(clusterInfo map[string]string) {
 	clusterInfo["clusterProfile"] = clusterProfile
 }
 
+// writeThanosFlags includes the configured thanos flags on the cluster info
 func writeThanosFlags(clusterInfo map[string]string) {
 	// Include Thanos Offset Duration if Applicable
 	clusterInfo["thanosEnabled"] = fmt.Sprintf("%t", thanos.IsEnabled())
@@ -41,38 +45,22 @@ func writeThanosFlags(clusterInfo map[string]string) {
 	}
 }
 
-// default local cluster info provider implementation which provides an instanced object for
-// getting the local cluster info
-type defaultLocalClusterInfoProvider struct {
+// localClusterInfoProvider gets the local cluster info from the cloud provider and kubernetes
+type localClusterInfoProvider struct {
 	k8s      kubernetes.Interface
 	provider cloudProvider.Provider
 }
 
 // GetClusterInfo returns a string map containing the local cluster info
-func (dlcip *defaultLocalClusterInfoProvider) GetClusterInfo() map[string]string {
-	return GetClusterInfo(dlcip.k8s, dlcip.provider)
-}
-
-// NewLocalClusterInfoProvider creates a new clusters.LocalClusterInfoProvider implementation for providing local
-// cluster information
-func NewLocalClusterInfoProvider(k8s kubernetes.Interface, cloud cloudProvider.Provider) clusters.LocalClusterInfoProvider {
-	return &defaultLocalClusterInfoProvider{
-		k8s:      k8s,
-		provider: cloud,
-	}
-}
-
-// GetClusterInfo provides specific information about the cluster cloud provider as well as
-// generic configuration values.
-func GetClusterInfo(kubeClient kubernetes.Interface, cloud cloudProvider.Provider) map[string]string {
-	data, err := cloud.ClusterInfo()
+func (dlcip *localClusterInfoProvider) GetClusterInfo() map[string]string {
+	data, err := dlcip.provider.ClusterInfo()
 
 	// Ensure we create the info object if it doesn't exist
 	if data == nil {
 		data = make(map[string]string)
 	}
 
-	kc, ok := kubeClient.(*kubernetes.Clientset)
+	kc, ok := dlcip.k8s.(*kubernetes.Clientset)
 	if ok && data != nil {
 		v, err := kc.ServerVersion()
 		if err != nil {
@@ -90,3 +78,76 @@ func GetClusterInfo(kubeClient kubernetes.Interface, cloud cloudProvider.Provide
 
 	return data
 }
+
+// NewLocalClusterInfoProvider creates a new clusters.LocalClusterInfoProvider implementation for providing local
+// cluster information
+func NewLocalClusterInfoProvider(k8s kubernetes.Interface, cloud cloudProvider.Provider) clusters.ClusterInfoProvider {
+	return &localClusterInfoProvider{
+		k8s:      k8s,
+		provider: cloud,
+	}
+}
+
+// configuredClusterInfoProvider just provides the cluster info directly from the config file source.
+type configuredClusterInfoProvider struct {
+	config *config.ConfigFile
+}
+
+// GetClusterInfo returns a string map containing the local cluster info
+func (ccip *configuredClusterInfoProvider) GetClusterInfo() map[string]string {
+	clusterInfo := map[string]string{}
+
+	data, err := ccip.config.Refresh()
+	if err != nil {
+		return clusterInfo
+	}
+
+	err = json.Unmarshal(data, &clusterInfo)
+	if err != nil {
+		log.Warningf("ClusterInfo failed to load from configuration: %s", err)
+		return clusterInfo
+	}
+
+	return clusterInfo
+}
+
+// NewConfiguredClusterInfoProvider instantiates and returns a cluster info provider which loads cluster info from
+// a config file.
+func NewConfiguredClusterInfoProvider(config *config.ConfigFile) clusters.ClusterInfoProvider {
+	return &configuredClusterInfoProvider{
+		config: config,
+	}
+}
+
+// clusterInfoWriteOnRequest writes the cluster info result to a config whenever it's requested
+type clusterInfoWriteOnRequest struct {
+	clusterInfo clusters.ClusterInfoProvider
+	config      *config.ConfigFile
+}
+
+// GetClusterInfo returns a string map containing the local cluster info
+func (ciw *clusterInfoWriteOnRequest) GetClusterInfo() map[string]string {
+	cInfo := ciw.clusterInfo.GetClusterInfo()
+
+	result, err := json.Marshal(cInfo)
+	if err != nil {
+		log.Warningf("Failed to write the cluster info: %s", err)
+		return cInfo
+	}
+
+	err = ciw.config.Write(result)
+	if err != nil {
+		log.Warningf("Failed to write the cluster info to config: %s", err)
+	}
+
+	return cInfo
+}
+
+// NewClusterInfoWriteOnRequest instantiates and returns a cluster info provider which writes the cluster info to a configuration
+// before each request.
+func NewClusterInfoWriteOnRequest(clusterInfo clusters.ClusterInfoProvider, config *config.ConfigFile) clusters.ClusterInfoProvider {
+	return &clusterInfoWriteOnRequest{
+		clusterInfo: clusterInfo,
+		config:      config,
+	}
+}

+ 23 - 24
pkg/costmodel/clusters/clustermap.go

@@ -7,7 +7,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/prom"
 	"github.com/kubecost/cost-model/pkg/thanos"
@@ -76,31 +75,31 @@ type ClusterMap interface {
 	StopRefresh()
 }
 
-// LocalClusterInfoProvider is a contract which is capable of performing local cluster info lookups.
-type LocalClusterInfoProvider interface {
-	// GetClusterInfo returns a string map containing the local cluster info
+// ClusterInfoProvider is a contract which is capable of performing cluster info lookups.
+type ClusterInfoProvider interface {
+	// GetClusterInfo returns a string map containing the local/remote connected cluster info
 	GetClusterInfo() map[string]string
 }
 
 // ClusterMap keeps records of all known cost-model clusters.
 type PrometheusClusterMap struct {
-	lock         *sync.RWMutex
-	client       prometheus.Client
-	clusters     map[string]*ClusterInfo
-	localCluster LocalClusterInfoProvider
-	stop         chan struct{}
+	lock        *sync.RWMutex
+	client      prometheus.Client
+	clusters    map[string]*ClusterInfo
+	clusterInfo ClusterInfoProvider
+	stop        chan struct{}
 }
 
 // NewClusterMap creates a new ClusterMap implementation using a prometheus or thanos client
-func NewClusterMap(client prometheus.Client, lcip LocalClusterInfoProvider, refresh time.Duration) ClusterMap {
+func NewClusterMap(client prometheus.Client, cip ClusterInfoProvider, refresh time.Duration) ClusterMap {
 	stop := make(chan struct{})
 
 	cm := &PrometheusClusterMap{
-		lock:         new(sync.RWMutex),
-		client:       client,
-		clusters:     make(map[string]*ClusterInfo),
-		localCluster: lcip,
-		stop:         stop,
+		lock:        new(sync.RWMutex),
+		client:      client,
+		clusters:    make(map[string]*ClusterInfo),
+		clusterInfo: cip,
+		stop:        stop,
 	}
 
 	// Run an updater to ensure cluster data stays relevant over time
@@ -210,14 +209,14 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 	}
 
 	// populate the local cluster if it doesn't exist
-	localID := env.GetClusterID()
-	if _, ok := clusters[localID]; !ok {
-		localInfo, err := pcm.getLocalClusterInfo()
-		if err != nil {
-			log.Warningf("Failed to load local cluster info: %s", err)
-		} else {
-			clusters[localInfo.ID] = localInfo
-		}
+	localInfo, err := pcm.getLocalClusterInfo()
+	if err != nil {
+		return clusters, nil
+	}
+
+	// Check to see if the local cluster's id is part of our loaded clusters, and include if not
+	if _, ok := clusters[localInfo.ID]; !ok {
+		clusters[localInfo.ID] = localInfo
 	}
 
 	return clusters, nil
@@ -225,7 +224,7 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 
 // getLocalClusterInfo returns the local cluster info in the event there does not exist a metric available.
 func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
-	info := pcm.localCluster.GetClusterInfo()
+	info := pcm.clusterInfo.GetClusterInfo()
 
 	var id string
 	var name string

+ 7 - 10
pkg/costmodel/metrics.go

@@ -9,6 +9,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 	"github.com/kubecost/cost-model/pkg/env"
 	"github.com/kubecost/cost-model/pkg/errors"
 	"github.com/kubecost/cost-model/pkg/log"
@@ -21,8 +22,6 @@ import (
 	dto "github.com/prometheus/client_model/go"
 	v1 "k8s.io/api/core/v1"
 
-	"k8s.io/client-go/kubernetes"
-
 	"k8s.io/klog"
 )
 
@@ -32,8 +31,7 @@ import (
 
 // ClusterInfoCollector is a prometheus collector that generates ClusterInfoMetrics
 type ClusterInfoCollector struct {
-	Cloud         cloud.Provider
-	KubeClientSet kubernetes.Interface
+	ClusterInfo clusters.ClusterInfoProvider
 }
 
 // Describe sends the super-set of all possible descriptors of metrics
@@ -44,7 +42,7 @@ func (cic ClusterInfoCollector) Describe(ch chan<- *prometheus.Desc) {
 
 // Collect is called by the Prometheus registry when collecting metrics.
 func (cic ClusterInfoCollector) Collect(ch chan<- prometheus.Metric) {
-	clusterInfo := GetClusterInfo(cic.KubeClientSet, cic.Cloud)
+	clusterInfo := cic.ClusterInfo.GetClusterInfo()
 	labels := prom.MapToLabels(clusterInfo)
 
 	m := newClusterInfoMetric("kubecost_cluster_info", labels)
@@ -126,7 +124,7 @@ var (
 )
 
 // initCostModelMetrics uses a sync.Once to ensure that these metrics are only created once
-func initCostModelMetrics(clusterCache clustercache.ClusterCache, provider cloud.Provider) {
+func initCostModelMetrics(clusterCache clustercache.ClusterCache, provider cloud.Provider, clusterInfo clusters.ClusterInfoProvider) {
 	metricsInit.Do(func() {
 		cpuGv = prometheus.NewGaugeVec(prometheus.GaugeOpts{
 			Name: "node_cpu_hourly_cost",
@@ -216,8 +214,7 @@ func initCostModelMetrics(clusterCache clustercache.ClusterCache, provider cloud
 
 		// General Metric Collectors
 		prometheus.MustRegister(ClusterInfoCollector{
-			KubeClientSet: clusterCache.GetClient(),
-			Cloud:         provider,
+			ClusterInfo: clusterInfo,
 		})
 	})
 }
@@ -259,9 +256,9 @@ type CostModelMetricsEmitter struct {
 }
 
 // NewCostModelMetricsEmitter creates a new cost-model metrics emitter. Use Start() to begin metric emission.
-func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clustercache.ClusterCache, provider cloud.Provider, model *CostModel) *CostModelMetricsEmitter {
+func NewCostModelMetricsEmitter(promClient promclient.Client, clusterCache clustercache.ClusterCache, provider cloud.Provider, clusterInfo clusters.ClusterInfoProvider, model *CostModel) *CostModelMetricsEmitter {
 	// init will only actually execute once to register the custom gauges
-	initCostModelMetrics(clusterCache, provider)
+	initCostModelMetrics(clusterCache, provider, clusterInfo)
 
 	metrics.InitKubeMetrics(clusterCache, &metrics.KubeMetricsOpts{
 		EmitKubecostControllerMetrics: true,

+ 61 - 36
pkg/costmodel/router.go

@@ -14,6 +14,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/services"
 	"github.com/kubecost/cost-model/pkg/util/httputil"
 	"github.com/kubecost/cost-model/pkg/util/timeutil"
@@ -72,20 +73,22 @@ var (
 // Accesses defines a singleton application instance, providing access to
 // Prometheus, Kubernetes, the cloud provider, and caches.
 type Accesses struct {
-	Router            *httprouter.Router
-	PrometheusClient  prometheus.Client
-	ThanosClient      prometheus.Client
-	KubeClientSet     kubernetes.Interface
-	ClusterMap        clusters.ClusterMap
-	CloudProvider     cloud.Provider
-	Model             *CostModel
-	MetricsEmitter    *CostModelMetricsEmitter
-	OutOfClusterCache *cache.Cache
-	AggregateCache    *cache.Cache
-	CostDataCache     *cache.Cache
-	ClusterCostsCache *cache.Cache
-	CacheExpiration   map[time.Duration]time.Duration
-	AggAPI            Aggregator
+	Router              *httprouter.Router
+	PrometheusClient    prometheus.Client
+	ThanosClient        prometheus.Client
+	KubeClientSet       kubernetes.Interface
+	ClusterMap          clusters.ClusterMap
+	CloudProvider       cloud.Provider
+	ConfigFileManager   *config.ConfigFileManager
+	ClusterInfoProvider clusters.ClusterInfoProvider
+	Model               *CostModel
+	MetricsEmitter      *CostModelMetricsEmitter
+	OutOfClusterCache   *cache.Cache
+	AggregateCache      *cache.Cache
+	CostDataCache       *cache.Cache
+	ClusterCostsCache   *cache.Cache
+	CacheExpiration     map[time.Duration]time.Duration
+	AggAPI              Aggregator
 	// SettingsCache stores current state of app settings
 	SettingsCache *cache.Cache
 	// settingsSubscribers tracks channels through which changes to different
@@ -691,7 +694,7 @@ func (a *Accesses) ClusterInfo(w http.ResponseWriter, r *http.Request, ps httpro
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Access-Control-Allow-Origin", "*")
 
-	data := GetClusterInfo(a.KubeClientSet, a.CloudProvider)
+	data := a.ClusterInfoProvider.GetClusterInfo()
 
 	w.Write(WrapData(data, nil))
 }
@@ -1438,12 +1441,24 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		panic(err.Error())
 	}
 
+	// Create ConfigFileManager for synchronization of shared configuration
+	confManager := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		BucketStoreConfig: env.GetKubecostConfigBucket(),
+		LocalConfigPath:   "/",
+	})
+
 	// Create Kubernetes Cluster Cache + Watchers
-	k8sCache := clustercache.NewKubernetesClusterCache(kubeClientset)
+	var k8sCache clustercache.ClusterCache
+	if env.IsClusterCacheFileEnabled() {
+		importLocation := confManager.ConfigFileAt("/var/configs/cluster-cache.json")
+		k8sCache = clustercache.NewClusterImporter(importLocation)
+	} else {
+		k8sCache = clustercache.NewKubernetesClusterCache(kubeClientset)
+	}
 	k8sCache.Run()
 
 	cloudProviderKey := env.GetCloudProviderAPIKey()
-	cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey)
+	cloudProvider, err := cloud.NewProvider(k8sCache, cloudProviderKey, confManager)
 	if err != nil {
 		panic(err.Error())
 	}
@@ -1503,13 +1518,21 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		}
 	}
 
+	// ClusterInfo Provider to provide the cluster map with local and remote cluster data
+	var clusterInfoProvider clusters.ClusterInfoProvider
+	if env.IsClusterInfoFileEnabled() {
+		clusterInfoFile := confManager.ConfigFileAt("/var/configs/cluster-info.json")
+		clusterInfoProvider = NewConfiguredClusterInfoProvider(clusterInfoFile)
+	} else {
+		clusterInfoProvider = NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
+	}
+
 	// Initialize ClusterMap for maintaining ClusterInfo by ClusterID
 	var clusterMap clusters.ClusterMap
-	localCIProvider := NewLocalClusterInfoProvider(kubeClientset, cloudProvider)
 	if thanosClient != nil {
-		clusterMap = clusters.NewClusterMap(thanosClient, localCIProvider, 10*time.Minute)
+		clusterMap = clusters.NewClusterMap(thanosClient, clusterInfoProvider, 10*time.Minute)
 	} else {
-		clusterMap = clusters.NewClusterMap(promCli, localCIProvider, 5*time.Minute)
+		clusterMap = clusters.NewClusterMap(promCli, clusterInfoProvider, 5*time.Minute)
 	}
 
 	// cache responses from model and aggregation for a default of 10 minutes;
@@ -1538,24 +1561,26 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 		pc = promCli
 	}
 	costModel := NewCostModel(pc, cloudProvider, k8sCache, clusterMap, scrapeInterval)
-	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, costModel)
+	metricsEmitter := NewCostModelMetricsEmitter(promCli, k8sCache, cloudProvider, clusterInfoProvider, costModel)
 
 	a := &Accesses{
-		Router:            httprouter.New(),
-		PrometheusClient:  promCli,
-		ThanosClient:      thanosClient,
-		KubeClientSet:     kubeClientset,
-		ClusterMap:        clusterMap,
-		CloudProvider:     cloudProvider,
-		Model:             costModel,
-		MetricsEmitter:    metricsEmitter,
-		AggregateCache:    aggregateCache,
-		CostDataCache:     costDataCache,
-		ClusterCostsCache: clusterCostsCache,
-		OutOfClusterCache: outOfClusterCache,
-		SettingsCache:     settingsCache,
-		CacheExpiration:   cacheExpiration,
-		httpServices:      services.NewCostModelServices(),
+		Router:              httprouter.New(),
+		PrometheusClient:    promCli,
+		ThanosClient:        thanosClient,
+		KubeClientSet:       kubeClientset,
+		ClusterMap:          clusterMap,
+		CloudProvider:       cloudProvider,
+		ConfigFileManager:   confManager,
+		ClusterInfoProvider: clusterInfoProvider,
+		Model:               costModel,
+		MetricsEmitter:      metricsEmitter,
+		AggregateCache:      aggregateCache,
+		CostDataCache:       costDataCache,
+		ClusterCostsCache:   clusterCostsCache,
+		OutOfClusterCache:   outOfClusterCache,
+		SettingsCache:       settingsCache,
+		CacheExpiration:     cacheExpiration,
+		httpServices:        services.NewCostModelServices(),
 	}
 	// Use the Accesses instance, itself, as the CostModelAggregator. This is
 	// confusing and unconventional, but necessary so that we can swap it

+ 22 - 0
pkg/env/costmodelenv.go

@@ -75,8 +75,30 @@ const (
 
 	PricingConfigmapName  = "PRICING_CONFIGMAP_NAME"
 	KubecostJobNameEnvVar = "KUBECOST_JOB_NAME"
+
+	KubecostConfigBucketEnvVar    = "KUBECOST_CONFIG_BUCKET"
+	ClusterInfoFileEnabledEnvVar  = "CLUSTER_INFO_FILE_ENABLED"
+	ClusterCacheFileEnabledEnvVar = "CLUSTER_CACHE_FILE_ENABLED"
 )
 
+// GetKubecostConfigBucket returns a file location for a mounted bucket configuration which is used to store
+// a subset of kubecost configurations that require sharing via remote storage.
+func GetKubecostConfigBucket() string {
+	return Get(KubecostConfigBucketEnvVar, "")
+}
+
+// IsClusterInfoFileEnabled returns true if the cluster info is read from a file or pulled from the local
+// cloud provider and kubernetes.
+func IsClusterInfoFileEnabled() bool {
+	return GetBool(ClusterInfoFileEnabledEnvVar, false)
+}
+
+// IsClusterCacheFileEnabled returns true if the kubernetes cluster data is read from a file or pulled from the local
+// kubernetes API.
+func IsClusterCacheFileEnabled() bool {
+	return GetBool(ClusterCacheFileEnabledEnvVar, false)
+}
+
 func GetPricingConfigmapName() string {
 	return Get(PricingConfigmapName, "pricing-configs")
 }

+ 7 - 0
pkg/env/kubemetricsenv.go

@@ -3,6 +3,7 @@ package env
 const (
 	KubecostMetricsPodEnabledEnvVar = "KUBECOST_METRICS_POD_ENABLED"
 	KubecostMetricsPodPortEnvVar    = "KUBECOST_METRICS_PORT"
+	ExportClusterCacheEnabledEnvVar = "EXPORT_CLUSTER_CACHE_ENABLED"
 )
 
 func GetKubecostMetricsPort() int {
@@ -13,3 +14,9 @@ func GetKubecostMetricsPort() int {
 func IsKubecostMetricsPodEnabled() bool {
 	return GetBool(KubecostMetricsPodEnabledEnvVar, false)
 }
+
+// IsExportClusterCacheEnabled is set to true if the metrics pod should export the cluster cache
+// data to a target file location
+func IsExportClusterCacheEnabled() bool {
+	return GetBool(ExportClusterCacheEnabledEnvVar, false)
+}

+ 1 - 0
pkg/prom/query.go

@@ -171,6 +171,7 @@ func (ctx *Context) RawQuery(query string) ([]byte, error) {
 	u := ctx.Client.URL(epQuery, nil)
 	q := u.Query()
 	q.Set("query", query)
+	q.Set("time", time.Now().UTC().Add(-3*time.Hour).Format(time.RFC3339))
 	u.RawQuery = q.Encode()
 
 	req, err := http.NewRequest(http.MethodPost, u.String(), nil)

+ 5 - 0
pkg/storage/filestorage.go

@@ -21,6 +21,11 @@ func NewFileStorage(baseDir string) Storage {
 	return &FileStorage{baseDir}
 }
 
+// FullPath returns the storage working path combined with the path provided
+func (fs *FileStorage) FullPath(path string) string {
+	return gopath.Join(fs.baseDir, path)
+}
+
 // Stat returns the StorageStats for the specific path.
 func (fs *FileStorage) Stat(path string) (*StorageInfo, error) {
 	f := gopath.Join(fs.baseDir, path)

+ 10 - 2
pkg/storage/s3storage.go

@@ -303,6 +303,11 @@ func validate(conf S3Config) error {
 	return nil
 }
 
+// FullPath returns the storage working path combined with the path provided
+func (s3 *S3Storage) FullPath(name string) string {
+	return name
+}
+
 // Get returns a reader for the given object name.
 func (s3 *S3Storage) Read(name string) ([]byte, error) {
 	log.Infof("S3Storage::Read(%s)", name)
@@ -314,7 +319,7 @@ func (s3 *S3Storage) Read(name string) ([]byte, error) {
 
 // Exists checks if the given object exists.
 func (s3 *S3Storage) Exists(name string) (bool, error) {
-	log.Infof("S3Storage::Exists(%s)", name)
+	//log.Infof("S3Storage::Exists(%s)", name)
 
 	ctx := context.Background()
 
@@ -358,11 +363,14 @@ func (s3 *S3Storage) Write(name string, data []byte) error {
 
 // Attributes returns information about the specified object.
 func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
-	log.Infof("S3Storage::Stat(%s)", name)
+	//log.Infof("S3Storage::Stat(%s)", name)
 	ctx := context.Background()
 
 	objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
 	if err != nil {
+		if s3.isDoesNotExist(err) {
+			return nil, DoesNotExistError
+		}
 		return nil, err
 	}
 

+ 6 - 3
pkg/storage/storage.go

@@ -1,13 +1,13 @@
 package storage
 
 import (
-	"errors"
+	"os"
 	"time"
 )
 
 // DoesNotExistError is used as a generic error to return when a target path does not
-// exist in storage.
-var DoesNotExistError = errors.New("DoesNotExist")
+// exist in storage. Equivalent to os.ErrorNotExist such that it will work with os.IsNotExist(err)
+var DoesNotExistError = os.ErrNotExist
 
 // StorageInfo is a data object containing basic information about the path in storage.
 type StorageInfo struct {
@@ -18,6 +18,9 @@ type StorageInfo struct {
 
 // Storage provides an API for storing binary data
 type Storage interface {
+	// FullPath returns the storage working path combined with the path provided
+	FullPath(path string) string
+
 	// Stat returns the StorageStats for the specific path.
 	Stat(path string) (*StorageInfo, error)
 

+ 34 - 8
test/cloud_test.go

@@ -10,6 +10,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/cloud"
 	"github.com/kubecost/cost-model/pkg/clustercache"
+	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/costmodel"
 	"github.com/kubecost/cost-model/pkg/costmodel/clusters"
 
@@ -97,6 +98,10 @@ func TestNodePriceFromCSV(t *testing.T) {
 	nameWant := "gke-standard-cluster-1-pool-1-91dc432d-cg69"
 	labelFooWant := "labelfoo"
 
+	confMan := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		LocalConfigPath: "./",
+	})
+
 	n := &v1.Node{}
 	n.Spec.ProviderID = providerIDWant
 	n.Name = nameWant
@@ -108,7 +113,7 @@ func TestNodePriceFromCSV(t *testing.T) {
 	c := &cloud.CSVProvider{
 		CSVLocation: "../configs/pricing_schema.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("../configs/default.json"),
+			Config: cloud.NewProviderConfig(confMan, "../configs/default.json"),
 		},
 	}
 	c.DownloadPricingData()
@@ -138,7 +143,7 @@ func TestNodePriceFromCSV(t *testing.T) {
 	c2 := &cloud.CSVProvider{
 		CSVLocation: "../configs/fake.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("../configs/default.json"),
+			Config: cloud.NewProviderConfig(confMan, "../configs/default.json"),
 		},
 	}
 	k3 := c.GetKey(n.Labels, n)
@@ -153,6 +158,10 @@ func TestNodePriceFromCSVWithRegion(t *testing.T) {
 	nameWant := "foo"
 	labelFooWant := "labelfoo"
 
+	confMan := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		LocalConfigPath: "./",
+	})
+
 	n := &v1.Node{}
 	n.Spec.ProviderID = providerIDWant
 	n.Name = nameWant
@@ -180,7 +189,7 @@ func TestNodePriceFromCSVWithRegion(t *testing.T) {
 	c := &cloud.CSVProvider{
 		CSVLocation: "../configs/pricing_schema_region.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("../configs/default.json"),
+			Config: cloud.NewProviderConfig(confMan, "../configs/default.json"),
 		},
 	}
 	c.DownloadPricingData()
@@ -230,7 +239,7 @@ func TestNodePriceFromCSVWithRegion(t *testing.T) {
 	c2 := &cloud.CSVProvider{
 		CSVLocation: "../configs/fake.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("../configs/default.json"),
+			Config: cloud.NewProviderConfig(confMan, "../configs/default.json"),
 		},
 	}
 	k5 := c.GetKey(n.Labels, n)
@@ -265,10 +274,14 @@ type FakeClusterMap struct {
 
 func TestNodePriceFromCSVWithBadConfig(t *testing.T) {
 	os.Setenv("CONFIG_PATH", "../config")
+	confMan := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		LocalConfigPath: "./",
+	})
+
 	c := &cloud.CSVProvider{
 		CSVLocation: "../configs/pricing_schema_case.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("invalid.json"),
+			Config: cloud.NewProviderConfig(confMan, "invalid.json"),
 		},
 	}
 	c.DownloadPricingData()
@@ -294,10 +307,15 @@ func TestNodePriceFromCSVWithBadConfig(t *testing.T) {
 
 func TestSourceMatchesFromCSV(t *testing.T) {
 	os.Setenv("CONFIG_PATH", "../configs")
+
+	confMan := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		LocalConfigPath: "./",
+	})
+
 	c := &cloud.CSVProvider{
 		CSVLocation: "../configs/pricing_schema_case.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("/default.json"),
+			Config: cloud.NewProviderConfig(confMan, "/default.json"),
 		},
 	}
 	c.DownloadPricingData()
@@ -369,10 +387,14 @@ func TestNodePriceFromCSVWithCase(t *testing.T) {
 	n.Labels[v1.LabelZoneRegion] = "eastus2"
 	wantPrice := "0.13370357"
 
+	confMan := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		LocalConfigPath: "./",
+	})
+
 	c := &cloud.CSVProvider{
 		CSVLocation: "../configs/pricing_schema_case.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("../configs/default.json"),
+			Config: cloud.NewProviderConfig(confMan, "../configs/default.json"),
 		},
 	}
 
@@ -399,10 +421,14 @@ func TestNodePriceFromCSVByClass(t *testing.T) {
 	wantpricefloat := 0.13370357
 	wantPrice := fmt.Sprintf("%f", (math.Round(wantpricefloat*1000000) / 1000000))
 
+	confMan := config.NewConfigFileManager(&config.ConfigFileManagerOpts{
+		LocalConfigPath: "./",
+	})
+
 	c := &cloud.CSVProvider{
 		CSVLocation: "../configs/pricing_schema_case.csv",
 		CustomProvider: &cloud.CustomProvider{
-			Config: cloud.NewProviderConfig("../configs/default.json"),
+			Config: cloud.NewProviderConfig(confMan, "../configs/default.json"),
 		},
 	}