Kaynağa Gözat

Merge branch 'develop' into cloudclost-k8sless

Matt Ray 2 yıl önce
ebeveyn
işleme
b645a8b829

+ 59 - 8
pkg/cloud/azure/storagebillingparser.go

@@ -1,11 +1,12 @@
 package azure
 
 import (
-	"bytes"
 	"context"
 	"encoding/csv"
 	"fmt"
 	"io"
+	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -13,6 +14,7 @@ import (
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/pkg/cloud"
+	"github.com/opencost/opencost/pkg/env"
 )
 
 // AzureStorageBillingParser accesses billing data stored in CSV files in Azure Storage
@@ -44,6 +46,7 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 		return err
 	}
 	ctx := context.Background()
+	// Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
 	blobNames, err := asbp.getMostRecentBlobs(start, end, client, ctx)
 	if err != nil {
 		asbp.ConnectionStatus = cloud.FailedConnection
@@ -56,17 +59,30 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 	}
 
 	for _, blobName := range blobNames {
-		blobBytes, err2 := asbp.DownloadBlob(blobName, client, ctx)
-		if err2 != nil {
+		localPath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "db", "cloudcost")
+		localFilePath := filepath.Join(localPath, filepath.Base(blobName))
+
+		if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
+			log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
+		}
+
+		err := asbp.DownloadBlobToFile(localFilePath, blobName, client, ctx)
+		if err != nil {
+			asbp.ConnectionStatus = cloud.FailedConnection
+			return err
+		}
+
+		fp, err := os.Open(localFilePath)
+		if err != nil {
 			asbp.ConnectionStatus = cloud.FailedConnection
-			return err2
+			return err
 		}
-		err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
-		if err2 != nil {
+		defer fp.Close()
+		err = asbp.parseCSV(start, end, csv.NewReader(fp), resultFn)
+		if err != nil {
 			asbp.ConnectionStatus = cloud.ParseError
-			return err2
+			return err
 		}
-
 	}
 	asbp.ConnectionStatus = cloud.SuccessfulConnection
 	return nil
@@ -184,3 +200,38 @@ func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string
 	endOfMonth := input.AddDate(0, 1, -input.Day())
 	return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
 }
+
+// deleteFilesOlderThan7d recursively walks the directory specified and deletes
+// files which have not been modified in the last 7 days. Returns a list of
+// files deleted.
+func (asbp *AzureStorageBillingParser) deleteFilesOlderThan7d(localPath string) ([]string, error) {
+	duration := 7 * 24 * time.Hour
+	cleaned := []string{}
+	errs := []string{}
+
+	if _, err := os.Stat(localPath); err != nil {
+		return cleaned, nil // localPath does not exist
+	}
+
+	filepath.Walk(localPath, func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			errs = append(errs, err.Error())
+			return err
+		}
+
+		if time.Since(info.ModTime()) > duration {
+			err := os.Remove(path)
+			if err != nil {
+				errs = append(errs, err.Error())
+			}
+			cleaned = append(cleaned, path)
+		}
+		return nil
+	})
+
+	if len(errs) == 0 {
+		return cleaned, nil
+	} else {
+		return cleaned, fmt.Errorf("deleteFilesOlderThan7d: %v", errs)
+	}
+}

+ 23 - 14
pkg/cloud/azure/storageconnection.go

@@ -1,9 +1,10 @@
 package azure
 
 import (
-	"bytes"
 	"context"
 	"fmt"
+	"os"
+	"path/filepath"
 	"strings"
 
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
@@ -45,24 +46,32 @@ func (sc *StorageConnection) getBlobURLTemplate() string {
 	return "https://%s.blob.core.windows.net/%s"
 }
 
-func (sc *StorageConnection) DownloadBlob(blobName string, client *azblob.Client, ctx context.Context) ([]byte, error) {
-	log.Infof("Azure Storage: retrieving blob: %v", blobName)
+// DownloadBlobToFile downloads the Azure Billing CSV to a local file
+func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blobName string, client *azblob.Client, ctx context.Context) error {
+	// If file exists, don't download it again
+	if _, err := os.Stat(localFilePath); err == nil {
+		log.DedupedInfof(3, "CloudCost: Azure: DownloadBlobToFile: file %v already exists, not downloading %v", localFilePath, blobName)
+		return nil
+	}
 
-	downloadResponse, err := client.DownloadStream(ctx, sc.Container, blobName, nil)
+	// Create filepath
+	dir := filepath.Dir(localFilePath)
+	if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+		return fmt.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to create directory %w", err)
+	}
+	fp, err := os.Create(localFilePath)
 	if err != nil {
-		return nil, fmt.Errorf("Azure: DownloadBlob: failed to download %w", err)
+		return fmt.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to create file %w", err)
 	}
-	// NOTE: automatically retries are performed if the connection fails
-	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{})
-	defer retryReader.Close()
-
-	// read the body into a buffer
-	downloadedData := bytes.Buffer{}
+	defer fp.Close()
 
-	_, err = downloadedData.ReadFrom(retryReader)
+	// Download newest Azure Billing CSV to disk
+	log.Infof("CloudCost: Azure: DownloadBlobToFile: retrieving blob: %v", blobName)
+	filesize, err := client.DownloadFile(ctx, sc.Container, blobName, fp, nil)
 	if err != nil {
-		return nil, fmt.Errorf("Azure: DownloadBlob: failed to read downloaded data %w", err)
+		return fmt.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to download %w", err)
 	}
+	log.Infof("CloudCost: Azure: DownloadBlobToFile: retrieved %v of size %dMB", blobName, filesize/1024/1024)
 
-	return downloadedData.Bytes(), nil
+	return nil
 }