Parcourir la source

SWII cloud costs

Signed-off-by: Niko Kovacevic <nikovacevic@gmail.com>
Niko Kovacevic il y a 2 ans
Parent
commit
4f4cfe46fe

+ 1 - 50
pkg/cloud/config/controller_handlers.go

@@ -1,19 +1,11 @@
 package config
 
 import (
-	"bytes"
-	"encoding/json"
 	"fmt"
-	"io"
 	"net/http"
-	"strings"
 
 	"github.com/julienschmidt/httprouter"
 	proto "github.com/opencost/opencost/core/pkg/protocol"
-	"github.com/opencost/opencost/pkg/cloud"
-	"github.com/opencost/opencost/pkg/cloud/aws"
-	"github.com/opencost/opencost/pkg/cloud/azure"
-	"github.com/opencost/opencost/pkg/cloud/gcp"
 )
 
 var protocol = proto.HTTP()
@@ -66,7 +58,7 @@ func (c *Controller) GetAddConfigHandler() func(w http.ResponseWriter, r *http.R
 
 		configType := r.URL.Query().Get("type")
 
-		config, err := parseConfig(configType, r.Body)
+		config, err := ParseConfig(configType, r.Body)
 		if err != nil {
 			http.Error(w, err.Error(), http.StatusBadRequest)
 			return
@@ -82,47 +74,6 @@ func (c *Controller) GetAddConfigHandler() func(w http.ResponseWriter, r *http.R
 	}
 }
 
-func parseConfig(configType string, body io.Reader) (cloud.KeyedConfig, error) {
-	buf := new(bytes.Buffer)
-	_, err := buf.ReadFrom(body)
-	if err != nil {
-		return nil, fmt.Errorf("failed to read body: %w", err)
-	}
-	bytes := buf.Bytes()
-	switch strings.ToLower(configType) {
-	case S3ConfigType:
-		config := &aws.S3Configuration{}
-		err = json.Unmarshal(bytes, config)
-		if err != nil {
-			return nil, fmt.Errorf("error unmarshalling S3 Configuration: %w", err)
-		}
-		return config, nil
-	case AthenaConfigType:
-		config := &aws.AthenaConfiguration{}
-		err = json.Unmarshal(bytes, config)
-		if err != nil {
-			return nil, fmt.Errorf("error unmarshalling Athena Configuration: %w", err)
-		}
-		return config, nil
-	case BigQueryConfigType:
-		config := &gcp.BigQueryConfiguration{}
-		err = json.Unmarshal(bytes, config)
-		if err != nil {
-			return nil, fmt.Errorf("error unmarshalling Big Query Configuration: %w", err)
-		}
-		return config, nil
-	case AzureStorageConfigType:
-		config := &azure.StorageConfiguration{}
-		err = json.Unmarshal(bytes, config)
-		if err != nil {
-			return nil, fmt.Errorf("error unmarshalling Azure Storage Configuration: %w", err)
-		}
-		return config, nil
-
-	}
-	return nil, fmt.Errorf("provided config type was not recognised %s", configType)
-}
-
 // GetEnableConfigHandler creates a handler from a http request which enables an integration via the integrationController
 func (c *Controller) GetEnableConfigHandler() func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
 	// perform basic checks to ensure that the pipeline can be accessed

+ 53 - 0
pkg/cloud/config/utils.go

@@ -0,0 +1,53 @@
+package config
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"strings"
+
+	"github.com/opencost/opencost/pkg/cloud"
+	"github.com/opencost/opencost/pkg/cloud/aws"
+	"github.com/opencost/opencost/pkg/cloud/azure"
+	"github.com/opencost/opencost/pkg/cloud/gcp"
+)
+
+func ParseConfig(configType string, body io.Reader) (cloud.KeyedConfig, error) {
+	buf := new(bytes.Buffer)
+	_, err := buf.ReadFrom(body)
+	if err != nil {
+		return nil, fmt.Errorf("failed to read body: %w", err)
+	}
+
+	return ParseConfigBytes(configType, buf.Bytes())
+}
+
+func ParseConfigBytes(configType string, configBytes []byte) (cloud.KeyedConfig, error) {
+	var config cloud.KeyedConfig
+	var err error
+
+	switch strings.ToLower(configType) {
+	case S3ConfigType:
+		config = &aws.S3Configuration{}
+	case AthenaConfigType:
+		config = &aws.AthenaConfiguration{}
+	case BigQueryConfigType:
+		config = &gcp.BigQueryConfiguration{}
+	case AzureStorageConfigType:
+		config = &azure.StorageConfiguration{}
+	default:
+		return nil, fmt.Errorf("provided config type was not recognized %s", configType)
+	}
+
+	err = json.Unmarshal(configBytes, config)
+	if err != nil {
+		return nil, fmt.Errorf("error unmarshalling configuration of type %s: %w", configType, err)
+	}
+
+	return config, nil
+}
+
+func ParseConfigString(configType string, configStr string) (cloud.KeyedConfig, error) {
+	return ParseConfigBytes(configType, []byte(configStr))
+}

+ 9 - 4
pkg/cloud/config/watcher.go

@@ -324,18 +324,21 @@ const (
 	MultiCloudSource
 	ConfigFileSource
 	HelmSource
+	DBSource
 )
 
 func GetConfigSource(str string) ConfigSource {
 	switch str {
-	case "configController":
+	case ConfigControllerSource.String():
 		return ConfigControllerSource
-	case "configfile":
+	case ConfigFileSource.String():
 		return ConfigFileSource
-	case "helm":
+	case HelmSource.String():
 		return HelmSource
-	case "multicloud":
+	case MultiCloudSource.String():
 		return MultiCloudSource
+	case DBSource.String():
+		return DBSource
 	default:
 		return UnknownSource
 	}
@@ -353,6 +356,8 @@ func (cs ConfigSource) String() string {
 		return "multicloud"
 	case UnknownSource:
 		return "unknown"
+	case DBSource:
+		return "db"
 	default:
 		return "unknown"
 	}

+ 6 - 6
pkg/cloudcost/ingestionmanager.go

@@ -15,7 +15,7 @@ import (
 // config.Controller
 type IngestionManager struct {
 	lock      sync.Mutex
-	ingestors map[string]*ingestor
+	ingestors map[string]*Ingestor
 	config    IngestorConfig
 	repo      Repository
 }
@@ -25,12 +25,12 @@ func NewIngestionManager(controller *config.Controller, repo Repository, ingConf
 	// return empty ingestion manager if store or integration controller are nil
 	if controller == nil || repo == nil {
 		return &IngestionManager{
-			ingestors: map[string]*ingestor{},
+			ingestors: map[string]*Ingestor{},
 		}
 	}
 
 	im := &IngestionManager{
-		ingestors: map[string]*ingestor{},
+		ingestors: map[string]*Ingestor{},
 		repo:      repo,
 		config:    ingConf,
 	}
@@ -110,7 +110,7 @@ func (im *IngestionManager) RebuildAll() {
 	var wg sync.WaitGroup
 	wg.Add(len(im.ingestors))
 	for key := range im.ingestors {
-		go func(ing *ingestor) {
+		go func(ing *Ingestor) {
 			defer wg.Done()
 			ing.Stop()
 			ing.Start(true)
@@ -143,7 +143,7 @@ func (im *IngestionManager) RepairAll(start, end time.Time) error {
 	}
 
 	for key := range im.ingestors {
-		go func(ing *ingestor) {
+		go func(ing *Ingestor) {
 			for _, window := range windows {
 				ing.BuildWindow(*window.Start(), *window.End())
 			}
@@ -166,7 +166,7 @@ func (im *IngestionManager) Repair(integrationKey string, start, end time.Time)
 	if !ok {
 		return fmt.Errorf("CloudCost: IngestionManager: Repair: failed to rebuild, integration with key does not exist: %s", integrationKey)
 	}
-	go func(ing *ingestor) {
+	go func(ing *Ingestor) {
 		for _, window := range windows {
 			ing.BuildWindow(*window.Start(), *window.End())
 		}

+ 12 - 12
pkg/cloudcost/ingestor.go

@@ -47,8 +47,8 @@ func DefaultIngestorConfiguration() IngestorConfig {
 	}
 }
 
-// ingestor runs the process for ingesting CloudCost from its CloudCostIntegration and store it in a Repository
-type ingestor struct {
+// Ingestor runs the process for ingesting CloudCost from its CloudCostIntegration and store it in a Repository
+type Ingestor struct {
 	key          string
 	integration  CloudCostIntegration
 	config       IngestorConfig
@@ -66,7 +66,7 @@ type ingestor struct {
 }
 
 // NewIngestor is an initializer for ingestor
-func NewIngestor(ingestorConfig IngestorConfig, repo Repository, config cloud.KeyedConfig) (*ingestor, error) {
+func NewIngestor(ingestorConfig IngestorConfig, repo Repository, config cloud.KeyedConfig) (*Ingestor, error) {
 	if repo == nil {
 		return nil, fmt.Errorf("CloudCost: NewIngestor: repository connot be nil")
 	}
@@ -79,7 +79,7 @@ func NewIngestor(ingestorConfig IngestorConfig, repo Repository, config cloud.Ke
 	}
 	now := time.Now().UTC()
 	midnight := opencost.RoundForward(now, timeutil.Day)
-	return &ingestor{
+	return &Ingestor{
 		config:       ingestorConfig,
 		repo:         repo,
 		key:          config.Key(),
@@ -90,7 +90,7 @@ func NewIngestor(ingestorConfig IngestorConfig, repo Repository, config cloud.Ke
 	}, nil
 }
 
-func (ing *ingestor) LoadWindow(start, end time.Time) {
+func (ing *Ingestor) LoadWindow(start, end time.Time) {
 	windows, err := opencost.GetWindows(start, end, timeutil.Day)
 	if err != nil {
 		log.Errorf("CloudCost[%s]: ingestor: invalid window %s", ing.key, opencost.NewWindow(&start, &end))
@@ -112,7 +112,7 @@ func (ing *ingestor) LoadWindow(start, end time.Time) {
 
 }
 
-func (ing *ingestor) BuildWindow(start, end time.Time) {
+func (ing *Ingestor) BuildWindow(start, end time.Time) {
 	log.Infof("CloudCost[%s]: ingestor: building window %s", ing.key, opencost.NewWindow(&start, &end))
 	ccsr, err := ing.integration.GetCloudCost(start, end)
 	if err != nil {
@@ -129,7 +129,7 @@ func (ing *ingestor) BuildWindow(start, end time.Time) {
 	}
 }
 
-func (ing *ingestor) Start(rebuild bool) {
+func (ing *Ingestor) Start(rebuild bool) {
 
 	// If already running, log that and return.
 	if !ing.isRunning.CompareAndSwap(false, true) {
@@ -149,7 +149,7 @@ func (ing *ingestor) Start(rebuild bool) {
 	go ing.run()
 }
 
-func (ing *ingestor) Stop() {
+func (ing *Ingestor) Stop() {
 	// If already stopping, log that and return.
 	if !ing.isStopping.CompareAndSwap(false, true) {
 		log.Infof("CloudCost: ingestor: is already stopping")
@@ -187,7 +187,7 @@ func (ing *ingestor) Stop() {
 }
 
 // Status returns an IngestorStatus that describes the current state of the ingestor
-func (ing *ingestor) Status() IngestorStatus {
+func (ing *Ingestor) Status() IngestorStatus {
 	return IngestorStatus{
 		Created:          ing.creationTime,
 		LastRun:          ing.lastRun,
@@ -198,7 +198,7 @@ func (ing *ingestor) Status() IngestorStatus {
 	}
 }
 
-func (ing *ingestor) build(rebuild bool) {
+func (ing *Ingestor) build(rebuild bool) {
 	defer errors.HandlePanic()
 
 	// Profile the full Duration of the build time
@@ -257,7 +257,7 @@ func (ing *ingestor) build(rebuild bool) {
 
 }
 
-func (ing *ingestor) run() {
+func (ing *Ingestor) run() {
 	defer errors.HandlePanic()
 
 	ticker := timeutil.NewJobTicker()
@@ -328,7 +328,7 @@ func (ing *ingestor) run() {
 	}
 }
 
-func (ing *ingestor) expandCoverage(window opencost.Window) {
+func (ing *Ingestor) expandCoverage(window opencost.Window) {
 	if window.IsOpen() {
 		return
 	}

+ 5 - 0
pkg/cloudcost/repositoryfactory.go

@@ -0,0 +1,5 @@
+package cloudcost
+
+type RepositoryFactory interface {
+	GetRepository(string) Repository
+}