فهرست منبع

Merge branch 'develop' into oracle-cloud-infrastructure-provider

Ajay Tripathy 2 سال پیش
والد
کامیت
84b095432c

+ 59 - 8
pkg/cloud/azure/storagebillingparser.go

@@ -1,11 +1,12 @@
 package azure
 
 import (
-	"bytes"
 	"context"
 	"encoding/csv"
 	"fmt"
 	"io"
+	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -13,6 +14,7 @@ import (
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/pkg/cloud"
+	"github.com/opencost/opencost/pkg/env"
 )
 
 // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
@@ -44,6 +46,7 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 		return err
 	}
 	ctx := context.Background()
+	// Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
 	blobNames, err := asbp.getMostRecentBlobs(start, end, client, ctx)
 	if err != nil {
 		asbp.ConnectionStatus = cloud.FailedConnection
@@ -56,17 +59,30 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 	}
 
 	for _, blobName := range blobNames {
-		blobBytes, err2 := asbp.DownloadBlob(blobName, client, ctx)
-		if err2 != nil {
+		localPath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "db", "cloudcost")
+		localFilePath := filepath.Join(localPath, filepath.Base(blobName))
+
+		if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
+			log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
+		}
+
+		err := asbp.DownloadBlobToFile(localFilePath, blobName, client, ctx)
+		if err != nil {
+			asbp.ConnectionStatus = cloud.FailedConnection
+			return err
+		}
+
+		fp, err := os.Open(localFilePath)
+		if err != nil {
 			asbp.ConnectionStatus = cloud.FailedConnection
-			return err2
+			return err
 		}
-		err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
-		if err2 != nil {
+		defer fp.Close()
+		err = asbp.parseCSV(start, end, csv.NewReader(fp), resultFn)
+		if err != nil {
 			asbp.ConnectionStatus = cloud.ParseError
-			return err2
+			return err
 		}
-
 	}
 	asbp.ConnectionStatus = cloud.SuccessfulConnection
 	return nil
@@ -184,3 +200,38 @@ func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string
 	endOfMonth := input.AddDate(0, 1, -input.Day())
 	return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
 }
+
+// deleteFilesOlderThan7d recursively walks the directory specified and deletes
+// files which have not been modified in the last 7 days. Returns a list of
+// files deleted.
+func (asbp *AzureStorageBillingParser) deleteFilesOlderThan7d(localPath string) ([]string, error) {
+	duration := 7 * 24 * time.Hour
+	cleaned := []string{}
+	errs := []string{}
+
+	if _, err := os.Stat(localPath); err != nil {
+		return cleaned, nil // localPath does not exist
+	}
+
+	filepath.Walk(localPath, func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			errs = append(errs, err.Error())
+			return err
+		}
+
+		if time.Since(info.ModTime()) > duration {
+			err := os.Remove(path)
+			if err != nil {
+				errs = append(errs, err.Error())
+			}
+			cleaned = append(cleaned, path)
+		}
+		return nil
+	})
+
+	if len(errs) == 0 {
+		return cleaned, nil
+	} else {
+		return cleaned, fmt.Errorf("deleteFilesOlderThan7d: %v", errs)
+	}
+}

+ 23 - 14
pkg/cloud/azure/storageconnection.go

@@ -1,9 +1,10 @@
 package azure
 
 import (
-	"bytes"
 	"context"
 	"fmt"
+	"os"
+	"path/filepath"
 	"strings"
 
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
@@ -45,24 +46,32 @@ func (sc *StorageConnection) getBlobURLTemplate() string {
 	return "https://%s.blob.core.windows.net/%s"
 }
 
-func (sc *StorageConnection) DownloadBlob(blobName string, client *azblob.Client, ctx context.Context) ([]byte, error) {
-	log.Infof("Azure Storage: retrieving blob: %v", blobName)
+// DownloadBlobToFile downloads the Azure Billing CSV to a local file
+func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blobName string, client *azblob.Client, ctx context.Context) error {
+	// If file exists, don't download it again
+	if _, err := os.Stat(localFilePath); err == nil {
+		log.DedupedInfof(3, "CloudCost: Azure: DownloadBlobToFile: file %v already exists, not downloading %v", localFilePath, blobName)
+		return nil
+	}
 
-	downloadResponse, err := client.DownloadStream(ctx, sc.Container, blobName, nil)
+	// Create filepath
+	dir := filepath.Dir(localFilePath)
+	if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+		return fmt.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to create directory %w", err)
+	}
+	fp, err := os.Create(localFilePath)
 	if err != nil {
-		return nil, fmt.Errorf("Azure: DownloadBlob: failed to download %w", err)
+		return fmt.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to create file %w", err)
 	}
-	// NOTE: automatically retries are performed if the connection fails
-	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{})
-	defer retryReader.Close()
-
-	// read the body into a buffer
-	downloadedData := bytes.Buffer{}
+	defer fp.Close()
 
-	_, err = downloadedData.ReadFrom(retryReader)
+	// Download newest Azure Billing CSV to disk
+	log.Infof("CloudCost: Azure: DownloadBlobToFile: retrieving blob: %v", blobName)
+	filesize, err := client.DownloadFile(ctx, sc.Container, blobName, fp, nil)
 	if err != nil {
-		return nil, fmt.Errorf("Azure: DownloadBlob: failed to read downloaded data %w", err)
+		return fmt.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to download %w", err)
 	}
+	log.Infof("CloudCost: Azure: DownloadBlobToFile: retrieved %v of size %dMB", blobName, filesize/1024/1024)
 
-	return downloadedData.Bytes(), nil
+	return nil
 }

+ 8 - 2
pkg/cloud/config/controller.go

@@ -9,6 +9,7 @@ import (
 	"github.com/opencost/opencost/pkg/cloud"
 	"github.com/opencost/opencost/pkg/cloud/models"
 	"github.com/opencost/opencost/pkg/cloud/provider"
+	"github.com/opencost/opencost/pkg/env"
 )
 
 // configID identifies the source and the ID of a configuration to handle duplicate configs from multiple sources
@@ -47,8 +48,13 @@ type Controller struct {
 
 // NewController initializes an Config Controller
 func NewController(cp models.Provider) *Controller {
-	providerConfig := provider.ExtractConfigFromProviders(cp)
-	watchers := GetCloudBillingWatchers(providerConfig)
+	var watchers map[ConfigSource]cloud.KeyedConfigWatcher
+	if env.IsKubernetesEnabled() {
+		providerConfig := provider.ExtractConfigFromProviders(cp)
+		watchers = GetCloudBillingWatchers(providerConfig)
+	} else {
+		watchers = GetCloudBillingWatchers(nil)
+	}
 	ic := &Controller{
 		statuses: make(map[configID]*Status),
 		watchers: watchers,

+ 9 - 1
pkg/cloud/config/watcher.go

@@ -239,7 +239,13 @@ type MultiCloudWatcher struct {
 }
 
 func (mcw *MultiCloudWatcher) GetConfigs() []cloud.KeyedConfig {
-	multiConfigPath := path.Join(env.GetConfigPathWithDefault("/var/configs"), cloudIntegrationSecretPath)
+	var multiConfigPath string
+
+	if env.IsKubernetesEnabled() {
+		multiConfigPath = path.Join(env.GetConfigPathWithDefault("/var/configs"), cloudIntegrationSecretPath)
+	} else {
+		multiConfigPath = env.GetCloudCostConfigPath()
+	}
 	exists, err := fileutil.FileExists(multiConfigPath)
 	if err != nil {
 		log.Errorf("MultiCloudWatcher:  error checking file at '%s': %s", multiConfigPath, err.Error())
@@ -260,6 +266,8 @@ func (mcw *MultiCloudWatcher) GetConfigs() []cloud.KeyedConfig {
 		}
 	}
 
+	log.Debugf("MultiCloudWatcher GetConfigs: multiConfigPath: %s", multiConfigPath)
+
 	configurations := &Configurations{}
 	err = loadFile(multiConfigPath, configurations)
 	if err != nil {

+ 19 - 7
pkg/cmd/costmodel/costmodel.go

@@ -34,13 +34,22 @@ func Healthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
 
 func Execute(opts *CostModelOpts) error {
 	log.Infof("Starting cost-model version %s", version.FriendlyVersion())
-	a := costmodel.Initialize()
+	log.Infof("Kubernetes enabled: %t", env.IsKubernetesEnabled())
 
-	err := StartExportWorker(context.Background(), a.Model)
-	if err != nil {
-		log.Errorf("couldn't start CSV export worker: %v", err)
+	var a *costmodel.Accesses
+
+	if env.IsKubernetesEnabled() {
+		a = costmodel.Initialize()
+		err := StartExportWorker(context.Background(), a.Model)
+		if err != nil {
+			log.Errorf("couldn't start CSV export worker: %v", err)
+		}
+	} else {
+		a = costmodel.InitializeWithoutKubernetes()
+		log.Debugf("Cloud Cost config path: %s", env.GetCloudCostConfigPath())
 	}
 
+	log.Infof("Cloud Costs enabled: %t", env.IsCloudCostEnabled())
 	if env.IsCloudCostEnabled() {
 		repo := cloudcost.NewMemoryRepository()
 		a.CloudCostPipelineService = cloudcost.NewPipelineService(repo, a.CloudConfigController, cloudcost.DefaultIngestorConfiguration())
@@ -50,9 +59,12 @@ func Execute(opts *CostModelOpts) error {
 
 	rootMux := http.NewServeMux()
 	a.Router.GET("/healthz", Healthz)
-	a.Router.GET("/allocation", a.ComputeAllocationHandler)
-	a.Router.GET("/allocation/summary", a.ComputeAllocationHandlerSummary)
-	a.Router.GET("/assets", a.ComputeAssetsHandler)
+
+	if env.IsKubernetesEnabled() {
+		a.Router.GET("/allocation", a.ComputeAllocationHandler)
+		a.Router.GET("/allocation/summary", a.ComputeAllocationHandlerSummary)
+		a.Router.GET("/assets", a.ComputeAssetsHandler)
+	}
 
 	a.Router.GET("/cloudCost", a.CloudCostQueryService.GetCloudCostHandler())
 	a.Router.GET("/cloudCost/view/graph", a.CloudCostQueryService.GetCloudCostViewGraphHandler())

+ 28 - 0
pkg/costmodel/router.go

@@ -1830,6 +1830,34 @@ func Initialize(additionalConfigWatchers ...*watcher.ConfigMapWatcher) *Accesses
 	return a
 }
 
+func InitializeWithoutKubernetes() *Accesses {
+	var err error
+	if errorReportingEnabled {
+		err = sentry.Init(sentry.ClientOptions{Release: version.FriendlyVersion()})
+		if err != nil {
+			log.Infof("Failed to initialize sentry for error reporting")
+		} else {
+			err = errors.SetPanicHandler(handlePanic)
+			if err != nil {
+				log.Infof("Failed to set panic handler: %s", err)
+			}
+		}
+	}
+
+	a := &Accesses{
+		Router:                httprouter.New(),
+		CloudConfigController: cloudconfig.NewController(nil),
+		httpServices:          services.NewCostModelServices(),
+	}
+
+	a.Router.GET("/logs/level", a.GetLogLevel)
+	a.Router.POST("/logs/level", a.SetLogLevel)
+
+	a.httpServices.RegisterAll(a.Router)
+
+	return a
+}
+
 func writeErrorResponse(w http.ResponseWriter, code int, message string) {
 	out := map[string]string{
 		"message": message,

+ 11 - 0
pkg/env/costmodelenv.go

@@ -113,7 +113,10 @@ const (
 
 	DataRetentionDailyResolutionDaysEnvVar = "DATA_RETENTION_DAILY_RESOLUTION_DAYS"
 
+	// We assume that Kubernetes is enabled if there is a KUBERNETES_PORT environment variable present
+	KubernetesEnabledEnvVar         = "KUBERNETES_PORT"
 	CloudCostEnabledEnvVar          = "CLOUD_COST_ENABLED"
+	CloudCostConfigPath             = "CLOUD_COST_CONFIG_PATH"
 	CloudCostMonthToDateIntervalVar = "CLOUD_COST_MONTH_TO_DATE_INTERVAL"
 	CloudCostRefreshRateHoursEnvVar = "CLOUD_COST_REFRESH_RATE_HOURS"
 	CloudCostQueryWindowDaysEnvVar  = "CLOUD_COST_QUERY_WINDOW_DAYS"
@@ -616,10 +619,18 @@ func GetDataRetentionDailyResolutionDays() int64 {
 	return env.GetInt64(DataRetentionDailyResolutionDaysEnvVar, 15)
 }
 
+func IsKubernetesEnabled() bool {
+	return env.Get(KubernetesEnabledEnvVar, "") != ""
+}
+
 func IsCloudCostEnabled() bool {
 	return env.GetBool(CloudCostEnabledEnvVar, false)
 }
 
+func GetCloudCostConfigPath() string {
+	return env.Get(CloudCostConfigPath, "cloud-integration.json")
+}
+
 func GetCloudCostMonthToDateInterval() int {
 	return env.GetInt(CloudCostMonthToDateIntervalVar, 6)
 }

+ 69 - 0
pkg/env/costmodelenv_test.go

@@ -123,3 +123,72 @@ func TestGetExportCSVMaxDays(t *testing.T) {
 		})
 	}
 }
+
+func TestGetKubernetesEnabled(t *testing.T) {
+	tests := []struct {
+		name string
+		want bool
+		pre  func()
+	}{
+		{
+			name: "Ensure the default value is false",
+			want: false,
+		},
+		{
+			name: "Ensure the value is true when KUBERNETES_PORT has a value",
+			want: true,
+			pre: func() {
+				os.Setenv("KUBERNETES_PORT", "tcp://10.43.0.1:443")
+			},
+		},
+	}
+	for _, tt := range tests {
+		if tt.pre != nil {
+			tt.pre()
+		}
+		t.Run(tt.name, func(t *testing.T) {
+			if got := IsKubernetesEnabled(); got != tt.want {
+				t.Errorf("IsKubernetesEnabled() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+
+}
+
+func TestGetCloudCostConfigPath(t *testing.T) {
+	tests := []struct {
+		name string
+		want string
+		pre  func()
+	}{
+		{
+			name: "Ensure the default value is 'cloud-integration.json'",
+			want: "cloud-integration.json",
+		},
+		{
+			name: "Ensure the value is 'cloud-integration.json' when CLOUD_COST_CONFIG_PATH is set to ''",
+			want: "cloud-integration.json",
+			pre: func() {
+				os.Setenv("CLOUD_COST_CONFIG_PATH", "")
+			},
+		},
+		{
+			name: "Ensure the value is 'flying-pig.json' when CLOUD_COST_CONFIG_PATH is set to 'flying-pig.json'",
+			want: "flying-pig.json",
+			pre: func() {
+				os.Setenv("CLOUD_COST_CONFIG_PATH", "flying-pig.json")
+			},
+		},
+	}
+	for _, tt := range tests {
+		if tt.pre != nil {
+			tt.pre()
+		}
+		t.Run(tt.name, func(t *testing.T) {
+			if got := GetCloudCostConfigPath(); got != tt.want {
+				t.Errorf("GetCloudCostConfigPath() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+
+}