Przeglądaj źródła

Add Gzip Azure support (#3572)

Signed-off-by: HMetcalfeW <106991365+HMetcalfeW@users.noreply.github.com>
Hunter Metcalfe 3 miesięcy temu
rodzic
commit
8c2b4595b6

+ 6 - 0
pkg/cloud/azure/resources/billingexports/values/test_azure_billing.csv

@@ -0,0 +1,6 @@
+invoiceId,previousInvoiceId,billingAccountId,billingAccountName,billingProfileId,billingProfileName,invoiceSectionId,invoiceSectionName,resellerName,resellerMpnId,costCenter,billingPeriodEndDate,billingPeriodStartDate,servicePeriodEndDate,servicePeriodStartDate,date,serviceFamily,productOrderId,productOrderName,consumedService,meterId,meterName,meterCategory,meterSubCategory,meterRegion,ProductId,ProductName,SubscriptionId,subscriptionName,publisherType,publisherId,publisherName,resourceGroupName,ResourceId,resourceLocation,location,effectivePrice,quantity,unitOfMeasure,chargeType,billingCurrency,pricingCurrency,costInBillingCurrency,costInPricingCurrency,costInUsd,paygCostInBillingCurrency,paygCostInUsd,exchangeRatePricingToBilling,exchangeRateDate,isAzureCreditEligible,serviceInfo1,serviceInfo2,additionalInfo,tags,PayGPrice,frequency,term,reservationId,reservationName,pricingModel,unitPrice,costAllocationRuleName,benefitId,benefitName,provider
+,,test-billing-account-001,Test Company Inc,test-profile-001,Test Billing Profile,test-section-001,Test Section,,,,,,11/01/2024,10/01/2024,10/15/2024,Compute,test-order-001,Azure plan,Microsoft.Compute,test-meter-001,D2s v3,Virtual Machines,Dsv3 Series,eastus,TEST001,Virtual Machines Dsv3 Series,test-sub-001,Test Subscription,Microsoft,,Microsoft,test-rg-001,/subscriptions/test-sub-001/resourceGroups/test-rg-001/providers/Microsoft.Compute/virtualMachines/test-vm-001,eastus,US East,0.096,24,1 Hour,Usage,USD,USD,2.304,2.304,2.304,2.304,2.304,1,10/01/2024,True,,,"{""environment"":""test"",""project"":""opencost""}","{""costcenter"":""engineering"",""app"":""test""}",0.096,UsageBased,,,,OnDemand,0.096,,,,Azure
+,,test-billing-account-001,Test Company Inc,test-profile-001,Test Billing Profile,test-section-001,Test Section,,,,,,11/01/2024,10/01/2024,10/15/2024,Storage,test-order-002,Azure plan,Microsoft.Storage,test-meter-002,LRS Data Stored,Storage,General Block Blob,eastus,TEST002,Block Blob Storage,test-sub-001,Test Subscription,Microsoft,,Microsoft,test-rg-002,/subscriptions/test-sub-001/resourceGroups/test-rg-002/providers/Microsoft.Storage/storageAccounts/teststorage001,eastus,US East,0.0184,100,1 GB/Month,Usage,USD,USD,1.84,1.84,1.84,1.84,1.84,1,10/01/2024,True,,,"{""redundancy"":""LRS"",""tier"":""Standard""}","{""costcenter"":""engineering"",""app"":""storage""}",0.0184,UsageBased,,,,OnDemand,0.0184,,,,Azure
+,,test-billing-account-001,Test Company Inc,test-profile-001,Test Billing Profile,test-section-001,Test Section,,,,,,11/01/2024,10/01/2024,10/15/2024,Networking,test-order-003,Azure plan,Microsoft.Network,test-meter-003,Standard IPv4 Static Public IP,Virtual Network,IP Addresses,,TEST003,IP Addresses - Standard IPv4,test-sub-001,Test Subscription,Microsoft,,Microsoft,test-rg-003,/subscriptions/test-sub-001/resourceGroups/test-rg-003/providers/Microsoft.Network/publicIPAddresses/test-ip-001,eastus,US East,0.005,24,1 Hour,Usage,USD,USD,0.12,0.12,0.12,0.12,0.12,1,10/01/2024,True,,,"{""type"":""static"",""version"":""ipv4""}","{""costcenter"":""engineering"",""app"":""network""}",0.005,UsageBased,,,,OnDemand,0.005,,,,Azure
+,,test-billing-account-001,Test Company Inc,test-profile-001,Test Billing Profile,test-section-001,Test Section,,,,,,11/01/2024,10/01/2024,10/16/2024,Compute,test-order-004,Azure plan,Microsoft.Compute,test-meter-004,E4 v3,Virtual Machines,Ev3 Series,westus2,TEST004,Virtual Machines Ev3 Series,test-sub-002,Test Subscription 2,Microsoft,,Microsoft,test-rg-004,/subscriptions/test-sub-002/resourceGroups/test-rg-004/providers/Microsoft.Compute/virtualMachines/test-vm-002,westus2,US West 2,0.201,12,1 Hour,Usage,USD,USD,2.412,2.412,2.412,2.412,2.412,1,10/01/2024,True,,,"{""environment"":""production"",""project"":""opencost""}","{""costcenter"":""operations"",""app"":""prod""}",0.201,UsageBased,,,,OnDemand,0.201,,,,Azure
+,,test-billing-account-001,Test Company Inc,test-profile-001,Test Billing Profile,test-section-001,Test Section,,,,,,11/01/2024,10/01/2024,10/16/2024,Databases,test-order-005,Azure plan,Microsoft.Sql,test-meter-005,S2 DTUs,SQL Database,Single Database,,TEST005,SQL Database Single S2,test-sub-002,Test Subscription 2,Microsoft,,Microsoft,test-rg-005,/subscriptions/test-sub-002/resourceGroups/test-rg-005/providers/Microsoft.Sql/servers/testserver/databases/testdb,eastus,US East,1.5,24,1 Hour,Usage,USD,USD,36.0,36.0,36.0,36.0,36.0,1,10/01/2024,True,,,"{""tier"":""Standard"",""dtu"":""50""}","{""costcenter"":""engineering"",""app"":""database""}",1.5,UsageBased,,,,OnDemand,1.5,,,,Azure

BIN
pkg/cloud/azure/resources/billingexports/values/test_azure_billing.csv.gz


+ 60 - 14
pkg/cloud/azure/storagebillingparser.go

@@ -1,6 +1,7 @@
 package azure
 
 import (
+	"compress/gzip"
 	"context"
 	"encoding/csv"
 	"encoding/json"
@@ -32,6 +33,58 @@ func (asbp *AzureStorageBillingParser) Equals(config cloud.Config) bool {
 	return asbp.StorageConnection.Equals(&thatConfig.StorageConnection)
 }
 
+// decompressIfGzipped wraps the reader with a gzip reader if the blob name indicates
+// the file is gzip compressed. Returns the original reader if not compressed.
+func decompressIfGzipped(r io.Reader, blobName string) (io.ReadCloser, error) {
+	if strings.HasSuffix(strings.ToLower(blobName), ".gz") {
+		gr, err := gzip.NewReader(r)
+		if err != nil {
+			return nil, fmt.Errorf("failed to create gzip reader for %s: %w", blobName, err)
+		}
+		return gr, nil
+	}
+	// Return a NopCloser to maintain consistent interface
+	return io.NopCloser(r), nil
+}
+
+// processLocalBillingFile reads a local billing file, decompresses if needed, and parses it
+func (asbp *AzureStorageBillingParser) processLocalBillingFile(localFilePath, blobName string, start, end time.Time, resultFn AzureBillingResultFunc) error {
+	fp, err := os.Open(localFilePath)
+	if err != nil {
+		return err
+	}
+	defer fp.Close()
+
+	// Wrap with gzip reader if needed
+	reader, err := decompressIfGzipped(fp, blobName)
+	if err != nil {
+		return err
+	}
+	defer reader.Close()
+
+	err = asbp.parseCSV(start, end, csv.NewReader(reader), resultFn)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// processStreamBillingData reads streaming billing data, decompresses if needed, and parses it
+func (asbp *AzureStorageBillingParser) processStreamBillingData(streamReader io.Reader, blobName string, start, end time.Time, resultFn AzureBillingResultFunc) error {
+	// Wrap with gzip reader if needed
+	reader, err := decompressIfGzipped(streamReader, blobName)
+	if err != nil {
+		return err
+	}
+	defer reader.Close()
+
+	err = asbp.parseCSV(start, end, csv.NewReader(reader), resultFn)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
 type AzureBillingResultFunc func(*BillingRowValues) error
 
 func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, resultFn AzureBillingResultFunc) error {
@@ -79,32 +132,25 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 				return err
 			}
 
-			fp, err := os.Open(localFilePath)
-			if err != nil {
-				asbp.ConnectionStatus = cloud.FailedConnection
-				return err
-			}
-			defer fp.Close()
-			err = asbp.parseCSV(start, end, csv.NewReader(fp), resultFn)
+			err = asbp.processLocalBillingFile(localFilePath, blobName, start, end, resultFn)
 			if err != nil {
 				asbp.ConnectionStatus = cloud.ParseError
 				return err
 			}
-
 		}
 	} else {
 		for _, blobInfo := range blobInfos {
 			blobName := *blobInfo.Name
-			streamReader, err2 := asbp.StreamBlob(blobName, client)
-			if err2 != nil {
+			streamReader, err := asbp.StreamBlob(blobName, client)
+			if err != nil {
 				asbp.ConnectionStatus = cloud.FailedConnection
-				return err2
+				return err
 			}
 
-			err2 = asbp.parseCSV(start, end, csv.NewReader(streamReader), resultFn)
-			if err2 != nil {
+			err = asbp.processStreamBillingData(streamReader, blobName, start, end, resultFn)
+			if err != nil {
 				asbp.ConnectionStatus = cloud.ParseError
-				return err2
+				return err
 			}
 		}
 	}

+ 348 - 2
pkg/cloud/azure/storagebillingparser_test.go

@@ -1,13 +1,18 @@
 package azure
 
 import (
+	"bytes"
+	"compress/gzip"
+	"io"
+	"os"
+	"strings"
 	"testing"
 	"time"
 )
 
 func TestAzureStorageBillingParser_getMonthStrings(t *testing.T) {
 	asbp := AzureStorageBillingParser{}
-	loc, _ := time.LoadLocation("UTC")
+	loc := time.UTC // Use time.UTC constant instead of LoadLocation
 	testCases := map[string]struct {
 		start    time.Time
 		end      time.Time
@@ -51,7 +56,7 @@ func TestAzureStorageBillingParser_getMonthStrings(t *testing.T) {
 }
 
 func TestAzureStorageBillingParser_parseCSV(t *testing.T) {
-	loc, _ := time.LoadLocation("UTC")
+	loc := time.UTC // Use time.UTC constant instead of LoadLocation
 	start := time.Date(2021, 2, 1, 00, 00, 00, 00, loc)
 	end := time.Date(2021, 2, 3, 00, 00, 00, 00, loc)
 	tests := map[string]struct {
@@ -202,3 +207,344 @@ func TestAzureStorageBillingParser_parseCSV(t *testing.T) {
 
 	}
 }
+
+func TestAzureStorageBillingParser_processLocalBillingFile(t *testing.T) {
+	loc := time.UTC
+	start := time.Date(2024, 10, 1, 0, 0, 0, 0, loc)
+	end := time.Date(2024, 11, 30, 0, 0, 0, 0, loc)
+
+	testCases := map[string]struct {
+		fileName     string
+		expectedRows int
+		expectError  bool
+	}{
+		"Gzipped file": {
+			fileName:     "test_azure_billing.csv.gz",
+			expectedRows: 5,
+			expectError:  false,
+		},
+		"Non-gzipped file": {
+			fileName:     "test_azure_billing.csv",
+			expectedRows: 5,
+			expectError:  false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			asbp := &AzureStorageBillingParser{}
+			filePath := valueCasesPath + tc.fileName
+
+			var rowCount int
+			resultFn := func(abv *BillingRowValues) error {
+				rowCount++
+				if abv == nil {
+					t.Error("Received nil BillingRowValues")
+				}
+				return nil
+			}
+
+			err := asbp.processLocalBillingFile(filePath, tc.fileName, start, end, resultFn)
+			if tc.expectError && err == nil {
+				t.Error("Expected error but got none")
+			}
+			if !tc.expectError && err != nil {
+				t.Fatalf("Unexpected error: %v", err)
+			}
+
+			if rowCount != tc.expectedRows {
+				t.Errorf("Expected %d rows, got %d rows", tc.expectedRows, rowCount)
+			}
+		})
+	}
+}
+
+func TestAzureStorageBillingParser_processStreamBillingData(t *testing.T) {
+	loc := time.UTC
+	start := time.Date(2024, 10, 1, 0, 0, 0, 0, loc)
+	end := time.Date(2024, 11, 30, 0, 0, 0, 0, loc)
+
+	testCases := map[string]struct {
+		fileName     string
+		expectedRows int
+	}{
+		"Gzipped stream": {
+			fileName:     "test_azure_billing.csv.gz",
+			expectedRows: 5,
+		},
+		"Non-gzipped stream": {
+			fileName:     "test_azure_billing.csv",
+			expectedRows: 5,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			asbp := &AzureStorageBillingParser{}
+
+			// Read file into memory to simulate stream
+			data, err := os.ReadFile(valueCasesPath + tc.fileName)
+			if err != nil {
+				t.Fatalf("Failed to read test file: %v", err)
+			}
+			streamReader := bytes.NewReader(data)
+
+			var rowCount int
+			resultFn := func(abv *BillingRowValues) error {
+				rowCount++
+				if abv == nil {
+					t.Error("Received nil BillingRowValues")
+				}
+				return nil
+			}
+
+			err = asbp.processStreamBillingData(streamReader, tc.fileName, start, end, resultFn)
+			if err != nil {
+				t.Fatalf("Unexpected error: %v", err)
+			}
+
+			if rowCount != tc.expectedRows {
+				t.Errorf("Expected %d rows, got %d rows", tc.expectedRows, rowCount)
+			}
+		})
+	}
+}
+
+func TestDecompressIfGzipped(t *testing.T) {
+	testCases := map[string]struct {
+		blobName    string
+		content     string
+		shouldGzip  bool
+		expectError bool
+	}{
+		"Gzipped file with .gz extension": {
+			blobName:    "billing_export.csv.gz",
+			content:     "test,data\n1,2\n",
+			shouldGzip:  true,
+			expectError: false,
+		},
+		"Gzipped file with .GZ extension (case insensitive)": {
+			blobName:    "billing_export.CSV.GZ",
+			content:     "test,data\n1,2\n",
+			shouldGzip:  true,
+			expectError: false,
+		},
+		"Non-gzipped CSV file": {
+			blobName:    "billing_export.csv",
+			content:     "test,data\n1,2\n",
+			shouldGzip:  false,
+			expectError: false,
+		},
+		"Non-gzipped file without extension": {
+			blobName:    "billing_export",
+			content:     "test,data\n1,2\n",
+			shouldGzip:  false,
+			expectError: false,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			var inputReader io.Reader
+
+			if tc.shouldGzip {
+				// Create gzipped content
+				var buf bytes.Buffer
+				gw := gzip.NewWriter(&buf)
+				_, err := gw.Write([]byte(tc.content))
+				if err != nil {
+					t.Fatalf("Failed to write gzip content: %v", err)
+				}
+				gw.Close()
+				inputReader = &buf
+			} else {
+				// Use plain content
+				inputReader = strings.NewReader(tc.content)
+			}
+
+			// Call decompressIfGzipped
+			reader, err := decompressIfGzipped(inputReader, tc.blobName)
+			if tc.expectError {
+				if err == nil {
+					t.Errorf("Expected error but got none")
+				}
+				return
+			}
+
+			if err != nil {
+				t.Fatalf("Unexpected error: %v", err)
+			}
+			defer reader.Close()
+
+			// Read and verify content
+			output, err := io.ReadAll(reader)
+			if err != nil {
+				t.Fatalf("Failed to read from reader: %v", err)
+			}
+
+			if string(output) != tc.content {
+				t.Errorf("Content mismatch. Expected: %q, Got: %q", tc.content, string(output))
+			}
+		})
+	}
+}
+
+func TestDecompressIfGzipped_InvalidGzip(t *testing.T) {
+	// Test with invalid gzip data
+	blobName := "invalid.csv.gz"
+	invalidData := strings.NewReader("this is not gzipped data")
+
+	reader, err := decompressIfGzipped(invalidData, blobName)
+	if err == nil {
+		if reader != nil {
+			reader.Close()
+		}
+		t.Error("Expected error for invalid gzip data, but got none")
+	}
+}
+
+func TestDecompressIfGzipped_EmptyGzipFile(t *testing.T) {
+	// Test with empty gzipped file
+	blobName := "empty.csv.gz"
+	var buf bytes.Buffer
+	gw := gzip.NewWriter(&buf)
+	gw.Close()
+
+	reader, err := decompressIfGzipped(&buf, blobName)
+	if err != nil {
+		t.Fatalf("Unexpected error for empty gzip file: %v", err)
+	}
+	defer reader.Close()
+
+	output, err := io.ReadAll(reader)
+	if err != nil {
+		t.Fatalf("Failed to read empty gzip file: %v", err)
+	}
+
+	if len(output) != 0 {
+		t.Errorf("Expected empty output, got %d bytes", len(output))
+	}
+}
+
+// TestDecompressIfGzipped_MultipleFiles tests processing multiple files in sequence
+// to ensure proper resource cleanup between iterations
+func TestDecompressIfGzipped_MultipleFiles(t *testing.T) {
+	testFiles := []struct {
+		name       string
+		content    string
+		shouldGzip bool
+	}{
+		{"file1.csv.gz", "data1,data2\nvalue1,value2\n", true},
+		{"file2.csv", "data3,data4\nvalue3,value4\n", false},
+		{"file3.csv.GZ", "data5,data6\nvalue5,value6\n", true},
+	}
+
+	for _, tf := range testFiles {
+		t.Run(tf.name, func(t *testing.T) {
+			var input io.Reader
+			if tf.shouldGzip {
+				var buf bytes.Buffer
+				gw := gzip.NewWriter(&buf)
+				_, err := gw.Write([]byte(tf.content))
+				if err != nil {
+					t.Fatalf("Failed to write gzip data: %v", err)
+				}
+				gw.Close()
+				input = &buf
+			} else {
+				input = strings.NewReader(tf.content)
+			}
+
+			reader, err := decompressIfGzipped(input, tf.name)
+			if err != nil {
+				t.Fatalf("Failed to decompress %s: %v", tf.name, err)
+			}
+			defer reader.Close()
+
+			output, err := io.ReadAll(reader)
+			if err != nil {
+				t.Fatalf("Failed to read from reader for %s: %v", tf.name, err)
+			}
+
+			if string(output) != tf.content {
+				t.Errorf("Content mismatch for %s. Expected: %q, Got: %q", tf.name, tf.content, string(output))
+			}
+		})
+	}
+}
+
+// TestDecompressIfGzipped_CaseInsensitiveExtension tests various case combinations
+func TestDecompressIfGzipped_CaseInsensitiveExtension(t *testing.T) {
+	testCases := []string{
+		"file.gz",
+		"file.GZ",
+		"file.Gz",
+		"file.gZ",
+	}
+
+	content := "test,data\n1,2\n"
+	for _, blobName := range testCases {
+		t.Run(blobName, func(t *testing.T) {
+			var buf bytes.Buffer
+			gw := gzip.NewWriter(&buf)
+			_, err := gw.Write([]byte(content))
+			if err != nil {
+				t.Fatalf("Failed to write gzip data: %v", err)
+			}
+			gw.Close()
+
+			reader, err := decompressIfGzipped(&buf, blobName)
+			if err != nil {
+				t.Fatalf("Failed to decompress %s: %v", blobName, err)
+			}
+			defer reader.Close()
+
+			output, err := io.ReadAll(reader)
+			if err != nil {
+				t.Fatalf("Failed to read from reader: %v", err)
+			}
+
+			if string(output) != content {
+				t.Errorf("Content mismatch. Expected: %q, Got: %q", content, string(output))
+			}
+		})
+	}
+}
+
+// TestDecompressIfGzipped_LargeFile tests handling of larger gzipped files
+func TestDecompressIfGzipped_LargeFile(t *testing.T) {
+	// Create a larger CSV content (1000 rows)
+	var contentBuilder strings.Builder
+	contentBuilder.WriteString("col1,col2,col3,col4\n")
+	for i := 0; i < 1000; i++ {
+		contentBuilder.WriteString("value1,value2,value3,value4\n")
+	}
+	content := contentBuilder.String()
+
+	// Gzip the content
+	var buf bytes.Buffer
+	gw := gzip.NewWriter(&buf)
+	_, err := gw.Write([]byte(content))
+	if err != nil {
+		t.Fatalf("Failed to write gzip data: %v", err)
+	}
+	gw.Close()
+
+	blobName := "large_file.csv.gz"
+	reader, err := decompressIfGzipped(&buf, blobName)
+	if err != nil {
+		t.Fatalf("Failed to decompress large file: %v", err)
+	}
+	defer reader.Close()
+
+	output, err := io.ReadAll(reader)
+	if err != nil {
+		t.Fatalf("Failed to read large file: %v", err)
+	}
+
+	if string(output) != content {
+		t.Errorf("Content mismatch for large file. Expected %d bytes, got %d bytes", len(content), len(output))
+	}
+
+	t.Logf("Successfully processed large gzipped file with %d bytes", len(output))
+}