Преглед на файлове

Handle partitioned and updating billing exports (#2794)

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb преди 1 година
родител
ревизия
d866000029
променени са 2 файла, в които са добавени 111 реда и са изтрити 31 реда
  1. 98 26
      pkg/cloud/azure/storagebillingparser.go
  2. 13 5
      pkg/cloud/azure/storageconnection.go

+ 98 - 26
pkg/cloud/azure/storagebillingparser.go

@@ -3,6 +3,7 @@ package azure
 import (
 	"context"
 	"encoding/csv"
+	"encoding/json"
 	"fmt"
 	"io"
 	"os"
@@ -46,28 +47,31 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 		return err
 	}
 	ctx := context.Background()
+	// most recent blob list contains information on blob including name and lastMod time
 	// Example blobNames: [ export/myExport/20240101-20240131/myExport_758a42af-0731-4edb-b498-1e523bb40f12.csv ]
-	blobNames, err := asbp.getMostRecentBlobs(start, end, client, ctx)
+	blobInfos, err := asbp.getMostRecentBlobs(start, end, client, ctx)
 	if err != nil {
 		asbp.ConnectionStatus = cloud.FailedConnection
 		return err
 	}
 
-	if len(blobNames) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
+	if len(blobInfos) == 0 && asbp.ConnectionStatus != cloud.SuccessfulConnection {
 		asbp.ConnectionStatus = cloud.MissingData
 		return nil
 	}
 
-	for _, blobName := range blobNames {
-		if env.IsAzureDownloadBillingDataToDisk() {
-			localPath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "db", "cloudcost")
-			localFilePath := filepath.Join(localPath, filepath.Base(blobName))
+	if env.IsAzureDownloadBillingDataToDisk() {
+		// clean up old files that have been saved to disk before downloading new ones
+		localPath := filepath.Join(env.GetConfigPathWithDefault(env.DefaultConfigMountPath), "db", "cloudcost")
+		if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
+			log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
+		}
+		for _, blob := range blobInfos {
+			blobName := *blob.Name
 
-			if _, err := asbp.deleteFilesOlderThan7d(localPath); err != nil {
-				log.Warnf("CloudCost: Azure: ParseBillingData: failed to remove the following stale files: %v", err)
-			}
+			localFilePath := filepath.Join(localPath, filepath.Base(blobName))
 
-			err := asbp.DownloadBlobToFile(localFilePath, blobName, client, ctx)
+			err := asbp.DownloadBlobToFile(localFilePath, blob, client, ctx)
 			if err != nil {
 				asbp.ConnectionStatus = cloud.FailedConnection
 				return err
@@ -84,7 +88,11 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 				asbp.ConnectionStatus = cloud.ParseError
 				return err
 			}
-		} else {
+
+		}
+	} else {
+		for _, blobInfo := range blobInfos {
+			blobName := *blobInfo.Name
 			streamReader, err2 := asbp.StreamBlob(blobName, client)
 			if err2 != nil {
 				asbp.ConnectionStatus = cloud.FailedConnection
@@ -98,6 +106,7 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 			}
 		}
 	}
+
 	asbp.ConnectionStatus = cloud.SuccessfulConnection
 	return nil
 }
@@ -133,10 +142,10 @@ func (asbp *AzureStorageBillingParser) parseCSV(start, end time.Time, reader *cs
 	return nil
 }
 
-// getMostRecentBlobs returns a list of filepaths on the Azure Storage
+// getMostRecentBlobs returns a list of blobs in the Azure Storage
 // Container. It uses the "Last Modified Time" of the file to determine which
 // has the latest month-to-date billing data.
-func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]string, error) {
+func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time, client *azblob.Client, ctx context.Context) ([]container.BlobItem, error) {
 	log.Infof("Azure Storage: retrieving most recent reports from: %v - %v", start, end)
 
 	// Get list of month substrings for months contained in the start to end range
@@ -144,7 +153,9 @@ func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time,
 	if err != nil {
 		return nil, err
 	}
-	mostRecentBlobs := make(map[string]container.BlobItem)
+
+	// Build map of blobs keyed by month string and blob name
+	blobsForMonth := make(map[string]map[string]container.BlobItem)
 
 	pager := client.NewListBlobsFlatPager(asbp.Container, &azblob.ListBlobsFlatOptions{
 		Include: container.ListBlobsInclude{Deleted: false, Versions: false},
@@ -167,27 +178,88 @@ func (asbp *AzureStorageBillingParser) getMostRecentBlobs(start, end time.Time,
 			}
 			for _, month := range monthStrs {
 				if strings.Contains(*blobInfo.Name, month) {
-					// check if blob is the newest seen for this month
-					if prevBlob, ok := mostRecentBlobs[month]; ok {
-						if prevBlob.Properties.CreationTime.After(*blobInfo.Properties.CreationTime) {
-							continue
-						}
+					if _, ok := blobsForMonth[month]; !ok {
+						blobsForMonth[month] = make(map[string]container.BlobItem)
 					}
-					mostRecentBlobs[month] = *blobInfo
+					blobsForMonth[month][*blobInfo.Name] = *blobInfo
 				}
 			}
 		}
 	}
 
-	// convert blob names into blob urls and move from map into ordered list of blob names
-	var blobNames []string
-	for _, month := range monthStrs {
-		if blob, ok := mostRecentBlobs[month]; ok {
-			blobNames = append(blobNames, *blob.Name)
+	// build list of most recent blobs that are needed to fulfil a query on the give date range
+	var blobs []container.BlobItem
+	for _, monthBlobs := range blobsForMonth {
+		// Find most recent blob
+		var mostRecentBlob *container.BlobItem
+		var mostRecentManifest *container.BlobItem
+
+		for name := range monthBlobs {
+			blob := monthBlobs[name]
+			lastMod := *blob.Properties.LastModified
+			// Handle manifest files
+			if strings.HasSuffix(*blob.Name, "manifest.json") {
+				if mostRecentManifest == nil {
+					mostRecentManifest = &blob
+
+					continue
+				}
+				if mostRecentManifest.Properties.LastModified.Before(lastMod) {
+					mostRecentManifest = &blob
+				}
+				// Only look at non-manifest blobs if manifests are not present
+			} else if mostRecentManifest == nil {
+				if mostRecentBlob == nil {
+					mostRecentBlob = &blob
+					continue
+				}
+				if mostRecentBlob.Properties.LastModified.Before(lastMod) {
+					mostRecentBlob = &blob
+				}
+			}
+		}
+
+		// In the absence of a manifest, add the most recent blob
+		if mostRecentManifest == nil {
+			if mostRecentBlob != nil {
+				blobs = append(blobs, *mostRecentBlob)
+			}
+			continue
+		}
+
+		// download manifest for the month
+		manifestBytes, err := asbp.DownloadBlob(*mostRecentManifest.Name, client, ctx)
+		if err != nil {
+			return nil, fmt.Errorf("failed to retrieve manifest %w", err)
+		}
+		
+		var manifest manifestJson
+		err = json.Unmarshal(manifestBytes, &manifest)
+		if err != nil {
+			return nil, fmt.Errorf("failed to unmarshal manifest %w", err)
+		}
+
+		// Add all partitioned blobs named in the manifest to the list of blobs to be retrieved
+		for _, mb := range manifest.Blobs {
+			namedBlob, ok := monthBlobs[mb.BlobName]
+			if !ok {
+				log.Errorf("AzureStorage: failed to find blob named in manifest '%s'", mb.BlobName)
+				continue
+			}
+			blobs = append(blobs, namedBlob)
 		}
 	}
 
-	return blobNames, nil
+	return blobs, nil
+}
+
+// manifestJson is a struct for unmarshalling manifest.json files associated with the azure billing export
+type manifestJson struct {
+	Blobs []manifestBlob `json:"blobs"`
+}
+
+type manifestBlob struct {
+	BlobName string `json:"blobName"`
 }
 
 // getMonthStrings returns a list of month strings in the format

+ 13 - 5
pkg/cloud/azure/storageconnection.go

@@ -9,6 +9,7 @@ import (
 	"strings"
 
 	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
+	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/pkg/cloud"
 )
@@ -80,11 +81,18 @@ func (sc *StorageConnection) StreamBlob(blobName string, client *azblob.Client)
 }
 
 // DownloadBlobToFile downloads the Azure Billing CSV to a local file
-func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blobName string, client *azblob.Client, ctx context.Context) error {
-	// If file exists, don't download it again
-	if _, err := os.Stat(localFilePath); err == nil {
-		log.DedupedInfof(3, "CloudCost: Azure: DownloadBlobToFile: file %v already exists, not downloading %v", localFilePath, blobName)
-		return nil
+func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blob container.BlobItem, client *azblob.Client, ctx context.Context) error {
+	blobName := *blob.Name
+	// Check if file already exists
+	if fileInfo, err := os.Stat(localFilePath); err == nil {
+		blobModTime := *blob.Properties.LastModified
+		// Check if the blob was last modified before the file was modified, indicating that the
+		// file is the most recent version of the blob
+		if blobModTime.Before(fileInfo.ModTime()) {
+			log.Debugf("CloudCost: Azure: DownloadBlobToFile: file %s is more recent than correspondig blob %s", localFilePath, blobName)
+			return nil
+		}
+
 	}
 
 	// Create filepath