Просмотр исходного кода

Add lock on file check and download so that partially downloaded file… (#2857)

* Add locker to azure storage integration file download to prevent accessing file that has not finished downloading

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>

* fix lock duplication

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>

---------

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 1 год назад
Родитель
Сommit
87d1188dfb

+ 3 - 37
pkg/cloud/azure/storagebillingparser.go

@@ -69,7 +69,8 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 		for _, blob := range blobInfos {
 			blobName := *blob.Name
 
-			localFilePath := filepath.Join(localPath, filepath.Base(blobName))
+			// Use entire blob name to prevent collision with other files from previous months or other integrations (ex "part_0_0001.csv")
+			localFilePath := filepath.Join(localPath, strings.ReplaceAll(blobName, "/", "_"))
 
 			err := asbp.DownloadBlobToFile(localFilePath, blob, client, ctx)
 			if err != nil {
@@ -232,7 +233,7 @@ func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time,
 		if err != nil {
 			return nil, fmt.Errorf("failed to retrieve manifest %w", err)
 		}
-		
+
 		var manifest manifestJson
 		err = json.Unmarshal(manifestBytes, &manifest)
 		if err != nil {
@@ -294,38 +295,3 @@ func (asbp *AzureStorageBillingParser) timeToMonthString(input time.Time) string
 	endOfMonth := input.AddDate(0, 1, -input.Day())
 	return startOfMonth.Format(format) + "-" + endOfMonth.Format(format)
 }
-
-// deleteFilesOlderThan7d recursively walks the directory specified and deletes
-// files which have not been modified in the last 7 days. Returns a list of
-// files deleted.
-func (asbp *AzureStorageBillingParser) deleteFilesOlderThan7d(localPath string) ([]string, error) {
-	duration := 7 * 24 * time.Hour
-	cleaned := []string{}
-	errs := []string{}
-
-	if _, err := os.Stat(localPath); err != nil {
-		return cleaned, nil // localPath does not exist
-	}
-
-	filepath.Walk(localPath, func(path string, info os.FileInfo, err error) error {
-		if err != nil {
-			errs = append(errs, err.Error())
-			return err
-		}
-
-		if time.Since(info.ModTime()) > duration {
-			err := os.Remove(path)
-			if err != nil {
-				errs = append(errs, err.Error())
-			}
-			cleaned = append(cleaned, path)
-		}
-		return nil
-	})
-
-	if len(errs) == 0 {
-		return cleaned, nil
-	} else {
-		return cleaned, fmt.Errorf("deleteFilesOlderThan7d: %v", errs)
-	}
-}

+ 54 - 1
pkg/cloud/azure/storageconnection.go

@@ -7,6 +7,8 @@ import (
 	"os"
 	"path/filepath"
 	"strings"
+	"sync"
+	"time"
 
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
@@ -17,6 +19,7 @@ import (
 // StorageConnection provides access to Azure Storage
 type StorageConnection struct {
 	StorageConfiguration
+	lock             sync.Mutex
 	ConnectionStatus cloud.ConnectionStatus
 }
 
@@ -82,6 +85,9 @@ func (sc *StorageConnection) StreamBlob(blobName string, client *azblob.Client)
 
 // DownloadBlobToFile downloads the Azure Billing CSV to a local file
 func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blob container.BlobItem, client *azblob.Client, ctx context.Context) error {
+	// Lock to prevent accessing a file which may not be fully downloaded
+	sc.lock.Lock()
+	defer sc.lock.Unlock()
 	blobName := *blob.Name
 	// Check if file already exists
 	if fileInfo, err := os.Stat(localFilePath); err == nil {
@@ -107,12 +113,59 @@ func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blob conta
 	defer fp.Close()
 
 	// Download newest Azure Billing CSV to disk
+
+	// Time out to prevent deadlock on download
+	timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
+	defer cancel()
+
 	log.Infof("CloudCost: Azure: DownloadBlobToFile: retrieving blob: %v", blobName)
-	filesize, err := client.DownloadFile(ctx, sc.Container, blobName, fp, nil)
+	filesize, err := client.DownloadFile(timeoutCtx, sc.Container, blobName, fp, nil)
 	if err != nil {
+		// Clean up file from failed download
+		err2 := os.Remove(localFilePath)
+		if err2 != nil {
+			log.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to remove file %s after failed download %s", localFilePath, err2.Error())
+		}
 		return fmt.Errorf("CloudCost: Azure: DownloadBlobToFile: failed to download %w", err)
 	}
 	log.Infof("CloudCost: Azure: DownloadBlobToFile: retrieved %v of size %dMB", blobName, filesize/1024/1024)
 
 	return nil
 }
+
+// deleteFilesOlderThan7d recursively walks the directory specified and deletes
+// files which have not been modified in the last 7 days. Returns a list of
+// files deleted.
+func (sc *StorageConnection) deleteFilesOlderThan7d(localPath string) ([]string, error) {
+	sc.lock.Lock()
+	defer sc.lock.Unlock()
+	duration := 7 * 24 * time.Hour
+	cleaned := []string{}
+	errs := []string{}
+
+	if _, err := os.Stat(localPath); err != nil {
+		return cleaned, nil // localPath does not exist
+	}
+
+	filepath.Walk(localPath, func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			errs = append(errs, err.Error())
+			return err
+		}
+
+		if time.Since(info.ModTime()) > duration {
+			err := os.Remove(path)
+			if err != nil {
+				errs = append(errs, err.Error())
+			}
+			cleaned = append(cleaned, path)
+		}
+		return nil
+	})
+
+	if len(errs) == 0 {
+		return cleaned, nil
+	} else {
+		return cleaned, fmt.Errorf("deleteFilesOlderThan7d: %v", errs)
+	}
+}

+ 12 - 3
pkg/cloudcost/integration.go

@@ -57,15 +57,24 @@ func GetIntegrationFromConfig(kc cloud.KeyedConfig) CloudCostIntegration {
 	case *azure.StorageConnection:
 		return &azure.AzureStorageIntegration{
 			AzureStorageBillingParser: azure.AzureStorageBillingParser{
-				StorageConnection: *keyedConfig,
+				StorageConnection: azure.StorageConnection{
+					StorageConfiguration: keyedConfig.StorageConfiguration},
 			},
 		}
 	case *azure.AzureStorageBillingParser:
 		return &azure.AzureStorageIntegration{
-			AzureStorageBillingParser: *keyedConfig,
+			AzureStorageBillingParser: azure.AzureStorageBillingParser{
+				StorageConnection: azure.StorageConnection{
+					StorageConfiguration: keyedConfig.StorageConfiguration},
+			},
 		}
 	case *azure.AzureStorageIntegration:
-		return keyedConfig
+		return &azure.AzureStorageIntegration{
+			AzureStorageBillingParser: azure.AzureStorageBillingParser{
+				StorageConnection: azure.StorageConnection{
+					StorageConfiguration: keyedConfig.StorageConfiguration},
+			},
+		}
 	// S3SelectIntegration
 	case *aws.S3Configuration:
 		return &aws.S3SelectIntegration{