Jelajahi Sumber

Test passing and persisting

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 2 tahun lalu
induk
melakukan
ee36ced7b3

+ 274 - 150
pkg/cloud/config/controller.go

@@ -2,9 +2,13 @@ package config
 
 import (
 	"fmt"
+	"os"
+	"path/filepath"
 	"sync"
 	"time"
 
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/json"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud/models"
@@ -12,36 +16,11 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 )
 
-// configID identifies the source and the ID of a configuration to handle duplicate configs from multiple sources
-type configID struct {
-	source ConfigSource
-	key    string
-}
-
-func (cid configID) Equals(that configID) bool {
-	return cid.source == that.source && cid.key == that.key
-}
-
-func newConfigID(source, key string) configID {
-	return configID{
-		source: GetConfigSource(source),
-		key:    key,
-	}
-}
-
-type Status struct {
-	Source ConfigSource
-	Key    string
-	Active bool
-	Valid  bool
-	Config cloud.KeyedConfig
-}
-
 // Controller manages the cloud.Config using config Watcher(s) to track various configuration
 // methods. To do this it has a map of config watchers mapped on configuration source and a list Observers that it updates
 // upon any change detected from the config watchers.
 type Controller struct {
-	statuses  map[configID]*Status
+	lock      sync.RWMutex
 	observers []Observer
 	watchers  map[ConfigSource]cloud.KeyedConfigWatcher
 }
@@ -56,11 +35,9 @@ func NewController(cp models.Provider) *Controller {
 		watchers = GetCloudBillingWatchers(nil)
 	}
 	ic := &Controller{
-		statuses: make(map[configID]*Status),
 		watchers: watchers,
 	}
 
-	ic.load()
 	ic.pullWatchers()
 
 	go func() {
@@ -79,38 +56,208 @@ func NewController(cp models.Provider) *Controller {
 	return ic
 }
 
-func (c *Controller) EnableConfig(key, source string) error {
-	cID := newConfigID(source, key)
-	cs, ok := c.statuses[cID]
+// pullWatchers retrieve configs from watchers and update configs according to priority of sources
+func (c *Controller) pullWatchers() {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	statuses, err := c.load()
+	if err != nil {
+		log.Errorf("failed to load statuses when pulling watchers %s", err)
+		statuses = Statuses{}
+	}
+	for source, watcher := range c.watchers {
+		watcherConfsByKey := map[string]cloud.KeyedConfig{}
+		for _, wConf := range watcher.GetConfigs() {
+			watcherConfsByKey[wConf.Key()] = wConf
+		}
+
+		// remove existing configs that are no longer present in the source
+		for _, status := range statuses.List() {
+			if status.Source == source {
+				if _, ok := watcherConfsByKey[status.Key]; !ok {
+					err := c.deleteConfig(status.Key, status.Source, statuses)
+					if err != nil {
+						log.Errorf("Conrtoller: pullWatchers: %s", err.Error())
+					}
+				}
+
+			}
+		}
+
+		for key, conf := range watcherConfsByKey {
+
+			// Check existing configs for matching key and source
+			if existingStatus, ok := statuses.Get(key, source); ok {
+				// if config has not changed continue
+				if existingStatus.Config.Equals(conf) {
+					continue
+				}
+				// remove the existing config
+				err := c.deleteConfig(key, source, statuses)
+				if err != nil {
+					log.Errorf("Conrtoller: pullWatchers: %s", err.Error())
+				}
+
+			}
+
+			err := conf.Validate()
+			valid := err == nil
+
+			configType, err := ConfigTypeFromConfig(conf)
+			if err != nil {
+				log.Errorf("failed to get config type for config with key: %s", conf.Key())
+				continue
+			}
+
+			status := Status{
+				Key:        key,
+				Source:     source,
+				Active:     valid, // if valid, then new config will be active
+				Valid:      valid,
+				ConfigType: configType,
+				Config:     conf,
+			}
+
+			// handle a config with a new unique key for a source or an update config from a source which was inactive before
+			if valid {
+				for _, matchStat := range statuses.List() {
+					//// skip matching configs
+					//if matchID.Equals(cID) {
+					//	continue
+					//}
+
+					if matchStat.Active {
+						// if source is non-multi-cloud disable all other non-multi-cloud sourced configs
+						if source == HelmSource || source == ConfigFileSource {
+							if matchStat.Source == HelmSource || matchStat.Source == ConfigFileSource {
+								matchStat.Active = false
+								c.broadcastRemoveConfig(matchStat.Key)
+							}
+						}
+
+						// check for configs with the same key that are active
+						if matchStat.Key == key {
+							// If source has higher priority disable other active configs
+							matchStat.Active = false
+							c.broadcastRemoveConfig(matchStat.Key)
+						}
+					}
+				}
+			}
+
+			// update config and put to observers if active
+			statuses.Insert(&status)
+			if status.Active {
+				c.broadcastAddConfig(conf)
+			}
+			err = c.save(statuses)
+			if err != nil {
+				fmt.Errorf("failed to save statuses %s", err.Error())
+			}
+		}
+	}
+}
+
+// CreateConfig adds a new config to status with a source of ConfigControllerSource
+// It will disable any config with the same key
+// fails if there is an existing config with the same key and source
+func (c *Controller) CreateConfig(conf cloud.KeyedConfig) error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	err := conf.Validate()
+	if err != nil {
+		return fmt.Errorf("provided configuration was invalid: %s", err.Error())
+	}
+
+	statuses, err := c.load()
+	if err != nil {
+		return fmt.Errorf("failed to load statuses")
+	}
+	source := ConfigControllerSource
+	key := conf.Key()
+
+	_, ok := statuses.Get(key, source)
+	if ok {
+		return fmt.Errorf("config with key %s from source %s already exist", key, source.String())
+	}
+
+	statuses.Insert(&Status{
+		Key:    key,
+		Source: source,
+		Valid:  true,
+		Active: true,
+		Config: conf,
+	})
+
+	// check for configurations with the same configuration key that are already active.
+	for _, confStat := range statuses.List() {
+		if confStat.Key != key || confStat.Source == source {
+			continue
+		}
+
+		// if active disable
+		if confStat.Active == true {
+			confStat.Active = false
+			c.broadcastRemoveConfig(key)
+		}
+
+	}
+
+	c.broadcastAddConfig(conf)
+	c.save(statuses)
+	return nil
+}
+
+// EnableConfig enables a config with the given key and source, and disables any config with a matching key
+func (c *Controller) EnableConfig(key, sourceStr string) error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	statuses, err := c.load()
+	if err != nil {
+		return fmt.Errorf("failed to load statuses")
+	}
+
+	source := GetConfigSource(sourceStr)
+	cs, ok := statuses.Get(key, source)
 	if !ok {
-		return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s does not exist", key, source)
+		return fmt.Errorf("config with key %s from source %s does not exist", key, sourceStr)
 	}
 	if cs.Active {
-		return fmt.Errorf("Controller: EnableConfig: config with key %s from source %s is already active", key, source)
+		return fmt.Errorf("config with key %s from source %s is already active", key, sourceStr)
 	}
 
 	// check for configurations with the same configuration key that are already active.
-	for confID, confStat := range c.statuses {
-		if confID.key != key || confID.source == cID.source {
+	for _, confStat := range statuses.List() {
+		if confStat.Key != key || confStat.Source == source {
 			continue
 		}
 
 		// if active disable
 		if confStat.Active == true {
 			confStat.Active = false
+			c.broadcastRemoveConfig(key)
 		}
+
 	}
 
 	cs.Active = true
-	c.putConfig(cs.Config)
-	c.save()
+	c.broadcastAddConfig(cs.Config)
+	c.save(statuses)
 	return nil
 }
 
 // DisableConfig updates an config status if it was enabled
-func (c *Controller) DisableConfig(key, source string) error {
-	iID := newConfigID(source, key)
-	is, ok := c.statuses[iID]
+func (c *Controller) DisableConfig(key, sourceStr string) error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	statuses, err := c.load()
+	if err != nil {
+		return fmt.Errorf("failed to load statuses")
+	}
+	source := GetConfigSource(sourceStr)
+	is, ok := statuses.Get(key, source)
 	if !ok {
 		return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
 	}
@@ -119,122 +266,87 @@ func (c *Controller) DisableConfig(key, source string) error {
 	}
 
 	is.Active = false
-	c.deleteConfig(iID.key)
-	c.save()
+	c.broadcastRemoveConfig(key)
+	c.save(statuses)
 	return nil
 }
 
 // DeleteConfig removes an config from the statuses and deletes the config on all observers if it was active
 func (c *Controller) DeleteConfig(key, source string) error {
-	id := newConfigID(source, key)
-	is, ok := c.statuses[id]
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	statuses, err := c.load()
+	if err != nil {
+		return fmt.Errorf("failed to load statuses")
+	}
+
+	err = c.deleteConfig(key, GetConfigSource(source), statuses)
+	if err != nil {
+		return fmt.Errorf("Controller: DeleteConfig: %w", err)
+	}
+	return nil
+}
+
+func (c *Controller) deleteConfig(key string, source ConfigSource, statuses Statuses) error {
+	if source != ConfigControllerSource {
+		return fmt.Errorf("controller does not own config with key %s from source %s, manage this config at its source", key, source.String())
+	}
+
+	is, ok := statuses.Get(key, source)
 	if !ok {
-		return fmt.Errorf("Controller: DisableConfig: config with key %s from source %s does not exist", key, source)
+		return fmt.Errorf("config with key %s from source %s does not exist", key, source.String())
 	}
 
 	// delete config on observers if active
 	if is.Active {
-		c.deleteConfig(id.key)
+		c.broadcastRemoveConfig(key)
 	}
-	delete(c.statuses, id)
-	c.save()
+	delete(statuses[source], key)
+	c.save(statuses)
 	return nil
 }
 
-// pullWatchers retrieve configs from watchers and update configs according to priority of sources
-func (c *Controller) pullWatchers() {
-
-	for source, watcher := range c.watchers {
-		for _, conf := range watcher.GetConfigs() {
-			key := conf.Key()
-			cID := configID{
-				source: source,
-				key:    key,
-			}
-
-			err := conf.Validate()
-			valid := err == nil
+const configFile = "cloud-configurations.json"
 
-			status := Status{
-				Key:    key,
-				Source: source,
-				Active: valid, // active if valid, for now
-				Valid:  valid,
-				Config: conf,
-			}
+func (c *Controller) load() (Statuses, error) {
 
-			// Check existing configs for matching key and source
-			if existingStatus, ok := c.statuses[cID]; ok {
-				// if config has not changed continue
-				if existingStatus.Config.Equals(conf) {
-					continue
-				}
-				// if existing CS is active then it should be replaced by the updated config
-				if existingStatus.Active {
-					if status.Valid {
-						c.putConfig(conf)
-					} else {
-						// if active config is being overwritten by an invalid one, delete the config, as it will not be active
-						c.deleteConfig(key)
-					}
-					c.statuses[cID] = &status
-					continue
-				}
-			}
+	filePath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile)
+	raw, err := os.ReadFile(filePath)
+	if err != nil {
+		return nil, fmt.Errorf("ConfigController: failed to load config statuses from file: %w", err)
+	}
 
-			// At this point we know that the config from this watcher has changed
+	statuses := Statuses{}
+	err = json.Unmarshal(raw, &statuses)
+	if err != nil {
+		return nil, fmt.Errorf("ConfigController: failed to marshal config statuses: %s", err.Error())
+	}
 
-			// handle an config with a new unique key for a source or an update config from a source which was inactive before
-			if valid {
-				for matchID, matchCS := range c.statuses {
-					// skip matching configs
-					if matchID.Equals(cID) {
-						continue
-					}
+	return statuses, nil
+}
 
-					if matchCS.Active {
-						// if source is non-multi-cloud disable all other non-multi-cloud sourced configs
-						if cID.source == HelmSource || cID.source == ConfigFileSource {
-							if matchID.source == HelmSource || matchID.source == ConfigFileSource {
-								matchCS.Active = false
-								c.deleteConfig(matchID.key)
-							}
-						}
+func (c *Controller) save(statuses Statuses) error {
 
-						// check for configs with the same key that are active
-						if matchID.key == key {
-							// If source has higher priority disable other active configs
-							matchCS.Active = false
-							c.deleteConfig(matchID.key)
-						}
-					}
-				}
-			}
-
-			// update config and put to observers if active
-			c.statuses[cID] = &status
-			if status.Active {
-				c.putConfig(conf)
-			}
-		}
+	filePath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), configFile)
+	raw, err := json.Marshal(statuses)
+	if err != nil {
+		return fmt.Errorf("ConfigController: failed to marshal config statuses: %s", err)
 	}
-}
 
-// todo implement when building config api and persistence is necessary
-func (c *Controller) load() {}
+	err = os.WriteFile(filePath, raw, 0644)
+	if err != nil {
+		return fmt.Errorf("ConfigController: failed to save config statuses to file: %s", err)
+	}
 
-// todo implement when building config api and persistence is necessary
-func (c *Controller) save() {}
+	return nil
+}
 
 func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
+	c.lock.RLock()
+	defer c.lock.RUnlock()
 	configs := new(Configurations)
 
-	activeConfigs := make(map[string]cloud.Config)
-	for iID, cs := range c.statuses {
-		if cs.Active {
-			activeConfigs[iID.key] = cs.Config
-		}
-	}
+	activeConfigs := c.getActiveConfigs()
 	if key != "" {
 		conf, ok := activeConfigs[key]
 		if !ok {
@@ -259,17 +371,21 @@ func (c *Controller) ExportConfigs(key string) (*Configurations, error) {
 }
 
 func (c *Controller) getActiveConfigs() map[string]cloud.KeyedConfig {
-	bi := make(map[string]cloud.KeyedConfig)
-	for iID, cs := range c.statuses {
+	activeConfigs := make(map[string]cloud.KeyedConfig)
+	statuses, err := c.load()
+	if err != nil {
+		log.Errorf("GetStatus: failed to load cloud statuses")
+	}
+	for _, cs := range statuses.List() {
 		if cs.Active {
-			bi[iID.key] = cs.Config
+			activeConfigs[cs.Key] = cs.Config
 		}
 	}
-	return bi
+	return activeConfigs
 }
 
-// deleteConfig ask observers to remove and stop all processes related to a configuration with a given key
-func (c *Controller) deleteConfig(key string) {
+// broadcastRemoveConfig ask observers to remove and stop all processes related to a configuration with a given key
+func (c *Controller) broadcastRemoveConfig(key string) {
 	var wg sync.WaitGroup
 	for _, obs := range c.observers {
 		observer := obs
@@ -282,30 +398,38 @@ func (c *Controller) deleteConfig(key string) {
 	wg.Wait()
 }
 
+// broadcastAddConfig gives observers a new config to handle
+func (c *Controller) broadcastAddConfig(conf cloud.KeyedConfig) {
+	var wg sync.WaitGroup
+	for _, obs := range c.observers {
+		observer := obs
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			observer.PutConfig(conf)
+		}()
+	}
+	wg.Wait()
+}
+
 // RegisterObserver gives out the current active list configs and adds the observer to the push list
 func (c *Controller) RegisterObserver(obs Observer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	obs.SetConfigs(c.getActiveConfigs())
 	c.observers = append(c.observers, obs)
 }
 
 func (c *Controller) GetStatus() []Status {
+	c.lock.RLock()
+	defer c.lock.RUnlock()
 	var status []Status
-	for _, intStat := range c.statuses {
+	statuses, err := c.load()
+	if err != nil {
+		log.Errorf("GetStatus: failed to load cloud statuses")
+	}
+	for _, intStat := range statuses.List() {
 		status = append(status, *intStat)
 	}
 	return status
 }
-
-// putConfig gives observers a new config to handle
-func (c *Controller) putConfig(conf cloud.KeyedConfig) {
-	var wg sync.WaitGroup
-	for _, obs := range c.observers {
-		observer := obs
-		wg.Add(1)
-		go func() {
-			defer wg.Done()
-			observer.PutConfig(conf)
-		}()
-	}
-	wg.Wait()
-}

+ 81 - 16
pkg/cloud/config/controller_handlers.go

@@ -1,11 +1,19 @@
 package config
 
 import (
+	"bytes"
+	"encoding/json"
 	"fmt"
+	"io"
 	"net/http"
+	"strings"
 
 	"github.com/julienschmidt/httprouter"
 	proto "github.com/opencost/opencost/core/pkg/protocol"
+	"github.com/opencost/opencost/pkg/cloud"
+	"github.com/opencost/opencost/pkg/cloud/aws"
+	"github.com/opencost/opencost/pkg/cloud/azure"
+	"github.com/opencost/opencost/pkg/cloud/gcp"
 	"github.com/opencost/opencost/pkg/env"
 )
 
@@ -51,6 +59,77 @@ func (c *Controller) GetExportConfigHandler() func(w http.ResponseWriter, r *htt
 	}
 }
 
+// GetEnableConfigHandler creates a handler from a http request which enables an integration via the integrationController
+func (c *Controller) GetAddConfigHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+	// perform basic checks to ensure that the pipeline can be accessed
+	fn := c.cloudCostChecks()
+	if fn != nil {
+		return fn
+	}
+
+	// Return valid handler func
+	return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
+		w.Header().Set("Content-Type", "application/json")
+
+		configType := r.URL.Query().Get("type")
+
+		config, err := parseConfig(configType, r.Body)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+
+		err = c.CreateConfig(config)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+
+		protocol.WriteData(w, fmt.Sprintf("Successfully added integration with key %s", config.Key()))
+	}
+}
+
+func parseConfig(configType string, body io.Reader) (cloud.KeyedConfig, error) {
+	buf := new(bytes.Buffer)
+	_, err := buf.ReadFrom(body)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read body: %w", err)
+	}
+	bytes := buf.Bytes()
+	switch strings.ToLower(configType) {
+	case S3ConfigType:
+		config := &aws.S3Configuration{}
+		err = json.Unmarshal(bytes, config)
+		if err != nil {
+			return nil, fmt.Errorf("error unmarshalling S3 Configuration: %w", err)
+		}
+		return config, nil
+	case AthenaConfigType:
+		config := &aws.AthenaConfiguration{}
+		err = json.Unmarshal(bytes, config)
+		if err != nil {
+			return nil, fmt.Errorf("error unmarshalling Athena Configuration: %w", err)
+		}
+		return config, nil
+	case BigQueryConfigType:
+		config := &gcp.BigQueryConfiguration{}
+		err = json.Unmarshal(bytes, config)
+		if err != nil {
+			return nil, fmt.Errorf("error unmarshalling Big Query Configuration: %w", err)
+		}
+		return config, nil
+	case AzureStorageConfigType:
+		config := &azure.StorageConfiguration{}
+		err = json.Unmarshal(bytes, config)
+		if err != nil {
+			return nil, fmt.Errorf("error unmarshalling Azure Storage Configuration: %w", err)
+		}
+		return config, nil
+
+	}
+	return nil, fmt.Errorf("provided config type was not recognised %s", configType)
+}
+
 // GetEnableConfigHandler creates a handler from a http request which enables an integration via the integrationController
 func (c *Controller) GetEnableConfigHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	// perform basic checks to ensure that the pipeline can be accessed
@@ -136,25 +215,11 @@ func (c *Controller) GetDeleteConfigHandler() func(w http.ResponseWriter, r *htt
 			return
 		}
 
-		source := r.URL.Query().Get("source")
-		if source == "" {
-			http.Error(w, "required parameter 'source' is missing", http.StatusBadRequest)
-			return
-		}
-
-		err := c.DeleteConfig(integrationKey, source)
+		err := c.DeleteConfig(integrationKey, ConfigControllerSource.String())
 		if err != nil {
 			http.Error(w, err.Error(), http.StatusBadRequest)
 			return
 		}
-		protocol.WriteData(w, fmt.Sprintf("Successfully deleted integration with key %s from source %s", integrationKey, source))
-
-		for _, intStat := range c.GetStatus() {
-			if intStat.Key == integrationKey {
-				protocol.WriteData(w, fmt.Sprintf("Found addition integration with integration key %s from source %s. If you wish to delete this data do so manually or delete all integrations with matching keys", integrationKey, intStat.Source))
-				return
-			}
-		}
-		protocol.WriteData(w, fmt.Sprintf("Successfully deleted cloud cost data with key %s", integrationKey))
+		protocol.WriteData(w, fmt.Sprintf("Successfully deleted integration with key %s", integrationKey))
 	}
 }

+ 372 - 312
pkg/cloud/config/controller_test.go

@@ -1,11 +1,14 @@
 package config
 
 import (
+	"os"
+	"path/filepath"
 	"testing"
 
 	cloudconfig "github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud/aws"
 	"github.com/opencost/opencost/pkg/cloud/gcp"
+	"github.com/opencost/opencost/pkg/env"
 )
 
 // Baseline valid config
@@ -67,22 +70,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Helm Source No Change": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -94,22 +99,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Helm Source Update Config": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConfModifiedProperty.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConfModifiedProperty,
+					Source:     HelmSource,
+					Key:        validAthenaConfModifiedProperty.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConfModifiedProperty,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -121,22 +128,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Helm Source Update Config Invalid": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -148,22 +157,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    invalidAthenaConf.Key(),
-					Active: false,
-					Valid:  false,
-					Config: invalidAthenaConf,
+					Source:     HelmSource,
+					Key:        invalidAthenaConf.Key(),
+					Active:     false,
+					Valid:      false,
+					ConfigType: AthenaConfigType,
+					Config:     invalidAthenaConf,
 				},
 			},
 		},
 		"Helm Source New Config": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -175,18 +186,20 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: false, // this value changed
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false, // this value changed
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 		},
@@ -202,22 +215,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Config File No Change": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -229,22 +244,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Config File Update Config": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -256,22 +273,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Config File Update Config Invalid": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -283,22 +302,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    invalidAthenaConf.Key(),
-					Active: false,
-					Valid:  false,
-					Config: invalidAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        invalidAthenaConf.Key(),
+					Active:     false,
+					Valid:      false,
+					ConfigType: AthenaConfigType,
+					Config:     invalidAthenaConf,
 				},
 			},
 		},
 		"Config File New Config": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -310,18 +331,20 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: false, // this value changed
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false, // this value changed
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 		},
@@ -337,22 +360,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Multi Cloud No Change": {
 			initialStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -364,22 +389,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 		},
 		"Multi Cloud Update Config": {
 			initialStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -391,22 +418,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConfModifiedProperty.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConfModifiedProperty,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConfModifiedProperty.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConfModifiedProperty,
 				},
 			},
 		},
 		"Multi Cloud Update Config Invalid": {
 			initialStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -418,22 +447,24 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    invalidAthenaConf.Key(),
-					Active: false,
-					Valid:  false,
-					Config: invalidAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        invalidAthenaConf.Key(),
+					Active:     false,
+					Valid:      false,
+					ConfigType: AthenaConfigType,
+					Config:     invalidAthenaConf,
 				},
 			},
 		},
 		"Multi Cloud New Config": {
 			initialStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -445,18 +476,20 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: MultiCloudSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     MultiCloudSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 				{
-					Source: MultiCloudSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     MultiCloudSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 		},
@@ -464,11 +497,12 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 		"New Helm, Existing Config File": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -485,36 +519,40 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 		},
 		"Update Helm, Existing Config File": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -531,29 +569,32 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConfModifiedProperty.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConfModifiedProperty,
+					Source:     HelmSource,
+					Key:        validAthenaConfModifiedProperty.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConfModifiedProperty,
 				},
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 		},
 		"New Helm Invalid, Existing Config File": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -570,36 +611,40 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 				{
-					Source: HelmSource,
-					Key:    invalidAthenaConf.Key(),
-					Active: false,
-					Valid:  false,
-					Config: invalidAthenaConf,
+					Source:     HelmSource,
+					Key:        invalidAthenaConf.Key(),
+					Active:     false,
+					Valid:      false,
+					ConfigType: AthenaConfigType,
+					Config:     invalidAthenaConf,
 				},
 			},
 		},
 		"Update Helm Invalid, Existing Config File": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -616,29 +661,32 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 				{
-					Source: HelmSource,
-					Key:    invalidAthenaConf.Key(),
-					Active: false,
-					Valid:  false,
-					Config: invalidAthenaConf,
+					Source:     HelmSource,
+					Key:        invalidAthenaConf.Key(),
+					Active:     false,
+					Valid:      false,
+					ConfigType: AthenaConfigType,
+					Config:     invalidAthenaConf,
 				},
 			},
 		},
 		"New Config File, Existing Helm": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -655,36 +703,40 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validAthenaConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     HelmSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 				{
-					Source: ConfigFileSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     ConfigFileSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 		},
 		"Update Config File, Existing Helm": {
 			initialStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -699,29 +751,32 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConfModifiedProperty.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validAthenaConfModifiedProperty,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConfModifiedProperty.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConfModifiedProperty,
 				},
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 		},
 		"New Config File Invalid, Existing Helm": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -738,36 +793,40 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 				{
-					Source: ConfigFileSource,
-					Key:    invalidAthenaConf.Key(),
-					Active: false,
-					Valid:  false,
-					Config: invalidAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        invalidAthenaConf.Key(),
+					Active:     false,
+					Valid:      false,
+					ConfigType: AthenaConfigType,
+					Config:     invalidAthenaConf,
 				},
 			},
 		},
 		"Update Config File Invalid, Existing Helm": {
 			initialStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 				{
-					Source: ConfigFileSource,
-					Key:    validAthenaConf.Key(),
-					Active: false,
-					Valid:  true,
-					Config: validAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        validAthenaConf.Key(),
+					Active:     false,
+					Valid:      true,
+					ConfigType: AthenaConfigType,
+					Config:     validAthenaConf,
 				},
 			},
 			configWatchers: map[ConfigSource]cloudconfig.KeyedConfigWatcher{
@@ -784,18 +843,20 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 			},
 			expectedStatuses: []*Status{
 				{
-					Source: HelmSource,
-					Key:    validBigQueryConf.Key(),
-					Active: true,
-					Valid:  true,
-					Config: validBigQueryConf,
+					Source:     HelmSource,
+					Key:        validBigQueryConf.Key(),
+					Active:     true,
+					Valid:      true,
+					ConfigType: BigQueryConfigType,
+					Config:     validBigQueryConf,
 				},
 				{
-					Source: ConfigFileSource,
-					Key:    invalidAthenaConf.Key(),
-					Active: false,
-					Valid:  false,
-					Config: invalidAthenaConf,
+					Source:     ConfigFileSource,
+					Key:        invalidAthenaConf.Key(),
+					Active:     false,
+					Valid:      false,
+					ConfigType: AthenaConfigType,
+					Config:     invalidAthenaConf,
 				},
 			},
 		},
@@ -804,44 +865,43 @@ func TestIntegrationController_pullWatchers(t *testing.T) {
 	for name, tc := range testCases {
 		t.Run(name, func(t *testing.T) {
 			// Test set up and validation
-			initialStatuses := make(map[configID]*Status)
+			initialStatuses := Statuses{}
 			for _, status := range tc.initialStatuses {
-				iID := configID{
-					source: status.Source,
-					key:    status.Key,
+				if _, ok := initialStatuses.Get(status.Key, status.Source); ok {
+					t.Errorf("invalid test, duplicate initial status with key: %s source: %s", status.Key, status.Source.String())
 				}
-				if _, ok := initialStatuses[iID]; ok {
-					t.Errorf("invalid test, duplicate initial status with key: %s source: %s", iID.key, iID.source.String())
-				}
-				initialStatuses[iID] = status
+				initialStatuses.Insert(status)
 			}
 
-			expectedStatuses := make(map[configID]*Status)
+			expectedStatuses := Statuses{}
 			for _, status := range tc.expectedStatuses {
-				iID := configID{
-					source: status.Source,
-					key:    status.Key,
-				}
-				if _, ok := expectedStatuses[iID]; ok {
-					t.Errorf("invalid test, duplicate expected status with key: %s source: %s", iID.key, iID.source.String())
+				if _, ok := expectedStatuses.Get(status.Key, status.Source); ok {
+					t.Errorf("invalid test, duplicate expected status with key: %s source: %s", status.Key, status.Source.String())
 				}
-				expectedStatuses[iID] = status
+				expectedStatuses.Insert(status)
 			}
 
+			tempDir := os.TempDir()
+			os.Setenv(env.ConfigPathEnvVar, tempDir)
+			defer os.Remove(filepath.Join(tempDir, configFile))
 			// Initialize controller
 			icd := &Controller{
-				statuses: initialStatuses,
 				watchers: tc.configWatchers,
 			}
+			icd.save(initialStatuses)
 			icd.pullWatchers()
-			if len(icd.statuses) != len(tc.expectedStatuses) {
-				t.Errorf("integration statueses did not have the correct length actaul: %d, expected: %d", len(icd.statuses), len(tc.expectedStatuses))
+			status, err := icd.load()
+			if err != nil {
+				t.Errorf("failed to load status file: %s", err.Error())
+			}
+			if len(status.List()) != len(expectedStatuses.List()) {
+				t.Errorf("integration statueses did not have the correct length actaul: %d, expected: %d", len(status), len(tc.expectedStatuses))
 			}
 
-			for iID, actualStatus := range icd.statuses {
-				expectedStatus, ok := expectedStatuses[iID]
+			for _, actualStatus := range status.List() {
+				expectedStatus, ok := expectedStatuses.Get(actualStatus.Key, actualStatus.Source)
 				if !ok {
-					t.Errorf("expected integration statuses is missing with integration ID: %v", iID)
+					t.Errorf("expected integration statuses is missing with integration key: %s, source: %s", actualStatus.Key, actualStatus.Source.String())
 				}
 
 				// failure here indicates an issue with the configID

+ 145 - 0
pkg/cloud/config/statuses.go

@@ -0,0 +1,145 @@
+package config
+
+import (
+	"fmt"
+	"strings"
+
+	"github.com/opencost/opencost/core/pkg/util/json"
+	"github.com/opencost/opencost/pkg/cloud"
+	"github.com/opencost/opencost/pkg/cloud/aws"
+	"github.com/opencost/opencost/pkg/cloud/azure"
+	"github.com/opencost/opencost/pkg/cloud/gcp"
+)
+
+const (
+	S3ConfigType           = "s3"
+	AthenaConfigType       = "athena"
+	BigQueryConfigType     = "bigquery"
+	AzureStorageConfigType = "azurestorage"
+)
+
+func ConfigTypeFromConfig(config cloud.KeyedConfig) (string, error) {
+	switch config.(type) {
+	case *aws.S3Configuration:
+		return S3ConfigType, nil
+	case *aws.AthenaConfiguration:
+		return AthenaConfigType, nil
+	case *gcp.BigQueryConfiguration:
+		return BigQueryConfigType, nil
+	case *azure.StorageConfiguration:
+		return AzureStorageConfigType, nil
+	}
+	return "", fmt.Errorf("failed to config type for config with key: %s", config.Key())
+}
+
+type Statuses map[ConfigSource]map[string]*Status
+
+func (s Statuses) Get(key string, source ConfigSource) (*Status, bool) {
+	if _, ok := s[source]; !ok {
+		return nil, false
+	}
+	status, ok := s[source][key]
+	return status, ok
+}
+
+func (s Statuses) Insert(status *Status) {
+	if _, ok := s[status.Source]; !ok {
+		s[status.Source] = map[string]*Status{}
+	}
+	s[status.Source][status.Key] = status
+}
+
+func (s Statuses) List() []*Status {
+	var list []*Status
+	for _, statusesByKey := range s {
+		for _, status := range statusesByKey {
+			list = append(list, status)
+		}
+	}
+	return list
+}
+
+type Status struct {
+	Source     ConfigSource      `json:"source"`
+	Key        string            `json:"key"`
+	Active     bool              `json:"active"`
+	Valid      bool              `json:"valid"`
+	ConfigType string            `json:"configType"`
+	Config     cloud.KeyedConfig `json:"config"`
+}
+
+func (s *Status) UnmarshalJSON(b []byte) error {
+	var f interface{}
+	err := json.Unmarshal(b, &f)
+	if err != nil {
+		return err
+	}
+
+	fmap := f.(map[string]interface{})
+
+	sourceFloat, err := cloud.GetInterfaceValue[float64](fmap, "source")
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: %s", err.Error())
+	}
+	source := ConfigSource(int(sourceFloat))
+
+	key, err := cloud.GetInterfaceValue[string](fmap, "key")
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: %s", err.Error())
+	}
+
+	active, err := cloud.GetInterfaceValue[bool](fmap, "active")
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: %s", err.Error())
+	}
+
+	valid, err := cloud.GetInterfaceValue[bool](fmap, "valid")
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: %s", err.Error())
+	}
+
+	configType, err := cloud.GetInterfaceValue[string](fmap, "configType")
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: %s", err.Error())
+	}
+
+	// Pick correct implementation to unmarshal into
+	var config cloud.KeyedConfig
+	switch strings.ToLower(configType) {
+	case S3ConfigType:
+		config = &aws.S3Configuration{}
+	case AthenaConfigType:
+		config = &aws.AthenaConfiguration{}
+	case BigQueryConfigType:
+		config = &gcp.BigQueryConfiguration{}
+	case AzureStorageConfigType:
+		config = &azure.StorageConfiguration{}
+	default:
+		return fmt.Errorf("Status: UnmarshalJSON: config type '%s' is not recognized", configType)
+	}
+
+	configAny, err := cloud.GetInterfaceValue[any](fmap, "config")
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: %s", err.Error())
+	}
+
+	// convert the interface back to a []Byte so that it can be unmarshalled into the correct type
+	fBin, err := json.Marshal(configAny)
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: could not marshal value %v: %w", f, err)
+	}
+
+	err = json.Unmarshal(fBin, config)
+	if err != nil {
+		return fmt.Errorf("Status: UnmarshalJSON: failed to unmarshal into Configuration type %T from value %v: %w", config, f, err)
+	}
+
+	// Set Values
+	s.Source = source
+	s.Key = key
+	s.Active = active
+	s.Valid = valid
+	s.ConfigType = configType
+	s.Config = config
+	return nil
+}