2
0
Эх сурвалжийг харах

Refactor and add testing for Athena and S3 row parsing (#2916)

* Refactor and add testing for Athena and S3 row parsing

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>

* Add extra test cases

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>

* Replace const with string literals to make "quality" gate pass

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>

---------

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 1 жил өмнө
parent
commit
768bcc147b

+ 11 - 10
pkg/cloud/aws/athenaintegration.go

@@ -170,10 +170,12 @@ func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.Cloud
 
 	// Generate row handling function.
 	rowHandler := func(row types.Row) {
-		err2 := ai.RowToCloudCost(row, aqi, ccsr)
+		cc, err2 := athenaRowToCloudCost(row, aqi)
 		if err2 != nil {
 			log.Errorf("AthenaIntegration: GetCloudCost: error while parsing row: %s", err2.Error())
+			return
 		}
+		ccsr.LoadCloudCost(cc)
 	}
 	log.Debugf("AthenaIntegration[%s]: GetCloudCost: querying: %s", ai.Key(), aqi.Query)
 	// Query CUR data and fill out CCSR
@@ -334,9 +336,9 @@ func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
 	return str
 }
 
-func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexes, ccsr *opencost.CloudCostSetRange) error {
+func athenaRowToCloudCost(row types.Row, aqi AthenaQueryIndexes) (*opencost.CloudCost, error) {
 	if len(row.Data) < len(aqi.ColumnIndexes) {
-		return fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
+		return nil, fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
 	}
 
 	// Iterate through the slice of tag columns, assigning
@@ -379,22 +381,22 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
 
 	listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	netCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetCostColumn)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	// Identify resource category in the CUR
@@ -429,7 +431,7 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
 
 	start, err := time.Parse(AthenaDateLayout, startStr)
 	if err != nil {
-		return fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
+		return nil, fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
 	}
 	end := start.AddDate(0, 0, 1)
 
@@ -458,8 +460,7 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
 		},
 	}
 
-	ccsr.LoadCloudCost(cc)
-	return nil
+	return cc, nil
 }
 
 func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {

+ 333 - 0
pkg/cloud/aws/athenaintegration_test.go

@@ -2,9 +2,12 @@ package aws
 
 import (
 	"os"
+	"reflect"
 	"testing"
 	"time"
 
+	"github.com/aws/aws-sdk-go-v2/service/athena/types"
+	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/util/json"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
@@ -63,3 +66,333 @@ func TestAthenaIntegration_GetCloudCost(t *testing.T) {
 		})
 	}
 }
+
+func Test_athenaRowToCloudCost(t *testing.T) {
+	aqi := AthenaQueryIndexes{
+		ColumnIndexes: map[string]int{
+			"ListCostColumn":              0,
+			"NetCostColumn":               1,
+			"AmortizedNetCostColumn":      2,
+			"AmortizedCostColumn":         3,
+			"IsK8sColumn":                 4,
+			AthenaDateTruncColumn:         5,
+			"line_item_resource_id":       6,
+			"bill_payer_account_id":       7,
+			"line_item_usage_account_id":  8,
+			"line_item_product_code":      9,
+			"line_item_usage_type":        10,
+			"product_region_code":         11,
+			"line_item_availability_zone": 12,
+			"resource_tags_user_test":     13,
+			"resource_tags_aws_test":      14,
+		},
+		TagColumns:             []string{"resource_tags_user_test"},
+		AWSTagColumns:          []string{"resource_tags_aws_test"},
+		ListCostColumn:         "ListCostColumn",
+		NetCostColumn:          "NetCostColumn",
+		AmortizedNetCostColumn: "AmortizedNetCostColumn",
+		AmortizedCostColumn:    "AmortizedCostColumn",
+		IsK8sColumn:            "IsK8sColumn",
+	}
+
+	tests := []struct {
+		name    string
+		row     []string
+		aqi     AthenaQueryIndexes
+		want    *opencost.CloudCost
+		wantErr bool
+	}{
+		{
+			name:    "incorrect row length",
+			row:     []string{"not enough elements"},
+			aqi:     aqi,
+			want:    nil,
+			wantErr: true,
+		},
+		{
+			name:    "invalid list cost",
+			row:     []string{"invalid", "2", "3", "4", "true", "2024-09-01 00:00:00.000", "resourceID", "payerAccountID", "usageAccountID", "productCode", "usageType", "regionCode", "availabilityZone", "userTagTestValue", "awsTagTestValue"},
+			aqi:     aqi,
+			want:    nil,
+			wantErr: true,
+		},
+		{
+			name:    "invalid net cost",
+			row:     []string{"1", "invalid", "3", "4", "true", "2024-09-01 00:00:00.000", "resourceID", "payerAccountID", "usageAccountID", "productCode", "usageType", "regionCode", "availabilityZone", "userTagTestValue", "awsTagTestValue"},
+			aqi:     aqi,
+			want:    nil,
+			wantErr: true,
+		},
+		{
+			name:    "invalid amortized net cost",
+			row:     []string{"1", "2", "invalid", "4", "true", "2024-09-01 00:00:00.000", "resourceID", "payerAccountID", "usageAccountID", "productCode", "usageType", "regionCode", "availabilityZone", "userTagTestValue", "awsTagTestValue"},
+			aqi:     aqi,
+			want:    nil,
+			wantErr: true,
+		},
+		{
+			name:    "invalid amortized cost",
+			row:     []string{"1", "2", "3", "invalid", "true", "2024-09-01 00:00:00.000", "resourceID", "payerAccountID", "usageAccountID", "productCode", "usageType", "regionCode", "availabilityZone", "userTagTestValue", "awsTagTestValue"},
+			aqi:     aqi,
+			want:    nil,
+			wantErr: true,
+		},
+		{
+			name:    "invalid date",
+			row:     []string{"1", "2", "3", "4", "true", "invalid", "resourceID", "payerAccountID", "usageAccountID", "productCode", "usageType", "regionCode", "availabilityZone", "userTagTestValue", "awsTagTestValue"},
+			aqi:     aqi,
+			want:    nil,
+			wantErr: true,
+		},
+		{
+			name: "valid kubernetes with labels",
+			row:  []string{"1", "2", "3", "4", "true", "2024-09-01 00:00:00.000", "resourceID", "payerAccountID", "usageAccountID", "productCode", "usageType", "regionCode", "availabilityZone", "userTagTestValue", "awsTagTestValue"},
+			aqi:  aqi,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "resourceID",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "productCode",
+					Category:          opencost.OtherCategory,
+					Labels: opencost.CloudCostLabels{
+						"test":     "userTagTestValue",
+						"aws_test": "awsTagTestValue",
+					},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 1,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 1,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              3,
+					KubernetesPercent: 1,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 1,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              4,
+					KubernetesPercent: 1,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "valid non-kubernetes, no labels",
+			row:  []string{"1", "2", "3", "4", "false", "2024-09-01 00:00:00.000", "resourceID", "payerAccountID", "usageAccountID", "productCode", "usageType", "regionCode", "availabilityZone", "", ""},
+			aqi:  aqi,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "resourceID",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "productCode",
+					Category:          opencost.OtherCategory,
+					Labels:            opencost.CloudCostLabels{},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              3,
+					KubernetesPercent: 0,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              4,
+					KubernetesPercent: 0,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "valid load balancer product code",
+			row:  []string{"1", "2", "3", "4", "false", "2024-09-01 00:00:00.000", "resourceID/lbID", "payerAccountID", "usageAccountID", "AWSELB", "usageType", "regionCode", "availabilityZone", "", ""},
+			aqi:  aqi,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "lbID",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "AWSELB",
+					Category:          opencost.NetworkCategory,
+					Labels:            opencost.CloudCostLabels{},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              3,
+					KubernetesPercent: 0,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              4,
+					KubernetesPercent: 0,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "valid non-kubernetes, Fargate CPU",
+			row:  []string{"1", "2", "3", "4", "false", "2024-09-01 00:00:00.000", "123:pod/resource", "payerAccountID", "usageAccountID", "AmazonEKS", "CPU", "regionCode", "availabilityZone", "", ""},
+			aqi:  aqi,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "123:pod/resource/CPU",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "AmazonEKS",
+					Category:          opencost.ComputeCategory,
+					Labels:            opencost.CloudCostLabels{},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              3,
+					KubernetesPercent: 0,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              4,
+					KubernetesPercent: 0,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name: "valid non-kubernetes, Fargate RAM",
+			row:  []string{"1", "2", "3", "4", "false", "2024-09-01 00:00:00.000", "123:pod/resource", "payerAccountID", "usageAccountID", "AmazonEKS", "GB", "regionCode", "availabilityZone", "", ""},
+			aqi:  aqi,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "123:pod/resource/RAM",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "AmazonEKS",
+					Category:          opencost.ComputeCategory,
+					Labels:            opencost.CloudCostLabels{},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              3,
+					KubernetesPercent: 0,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              4,
+					KubernetesPercent: 0,
+				},
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			row := stringsToRow(tt.row)
+			got, err := athenaRowToCloudCost(row, tt.aqi)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("RowToCloudCost() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("RowToCloudCost() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func stringsToRow(strings []string) types.Row {
+	var data []types.Datum
+	for _, str := range strings {
+		varChar := str
+		data = append(data, types.Datum{VarCharValue: &varChar})
+	}
+	return types.Row{Data: data}
+}

+ 175 - 163
pkg/cloud/aws/s3selectintegration.go

@@ -37,6 +37,13 @@ const S3SelectUserLabelPrefix = "resourceTags/user:"
 const S3SelectAWSLabelPrefix = "resourceTags/aws:"
 const S3SelectResourceTagsPrefix = "resourceTags/"
 
+const (
+	TypeSavingsPlanCoveredUsage = "SavingsPlanCoveredUsage"
+	TypeDiscountedUsage         = "DiscountedUsage"
+	TypeEDPDiscount             = "EdpDiscount"
+	TypePrivateRateDiscount     = "PrivateRateDiscount"
+)
+
 type S3SelectIntegration struct {
 	S3SelectQuerier
 }
@@ -96,40 +103,40 @@ func (s3si *S3SelectIntegration) GetCloudCost(
 		S3SelectAvailabilityZone,
 		S3SelectListCost,
 	}
-	_, checkNet := allColumns[S3SelectNetCost]
-	if checkNet {
+
+	if _, ok := allColumns[S3SelectNetCost]; ok {
 		selectColumns = append(selectColumns, S3SelectNetCost)
 	}
 
 	// Check for Reservation columns in CUR and query if available
-	_, checkReservations := allColumns[S3SelectRICost]
-	if checkReservations {
+
+	if _, ok := allColumns[S3SelectRICost]; ok {
 		selectColumns = append(selectColumns, S3SelectRICost)
 	}
-	_, checkNetReservations := allColumns[S3SelectNetRICost]
-	if checkNetReservations {
+
+	if _, ok := allColumns[S3SelectNetRICost]; ok {
 		selectColumns = append(selectColumns, S3SelectNetRICost)
 	}
 
 	// Check for Savings Plan Columns in CUR and query if available
-	_, checkSavingsPlan := allColumns[S3SelectSPCost]
-	if checkSavingsPlan {
+
+	if _, ok := allColumns[S3SelectSPCost]; ok {
 		selectColumns = append(selectColumns, S3SelectSPCost)
 	}
-	_, checkNetSavingsPlan := allColumns[S3SelectNetSPCost]
-	if checkNetSavingsPlan {
+
+	if _, ok := allColumns[S3SelectNetSPCost]; ok {
 		selectColumns = append(selectColumns, S3SelectNetSPCost)
 	}
 
 	// Determine which columns are user-defined tags and add those to the list
 	// of columns to query.
-	labelColumns := []string{}
+	userLabelColumns := []string{}
 	awsLabelColumns := []string{}
 	for column := range allColumns {
 		if strings.HasPrefix(column, S3SelectUserLabelPrefix) {
 			quotedTag := fmt.Sprintf(`s."%s"`, column)
 			selectColumns = append(selectColumns, quotedTag)
-			labelColumns = append(labelColumns, quotedTag)
+			userLabelColumns = append(userLabelColumns, quotedTag)
 		}
 		if strings.HasPrefix(column, S3SelectAWSLabelPrefix) {
 			quotedTag := fmt.Sprintf(`s."%s"`, column)
@@ -162,172 +169,177 @@ func (s3si *S3SelectIntegration) GetCloudCost(
 				return nil
 			}
 
-			startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
-			billPayerAccountID := GetCSVRowValue(row, columnIndexes, S3SelectBillPayerAccountID)
-			itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
-			itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
-			lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
-			itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
-			usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
-			regionCode := GetCSVRowValue(row, columnIndexes, S3SelectRegionCode)
-			availabilityZone := GetCSVRowValue(row, columnIndexes, S3SelectAvailabilityZone)
-
-			// Iterate through the slice of tag columns, assigning
-			// values to the column names, minus the tag prefix.
-			labels := opencost.CloudCostLabels{}
-			for _, labelColumnName := range labelColumns {
-				// remove quotes
-				labelName := strings.TrimPrefix(labelColumnName, `s."`)
-				labelName = strings.TrimSuffix(labelName, `"`)
-				// remove prefix
-				labelName = strings.TrimPrefix(labelName, S3SelectUserLabelPrefix)
-				value := GetCSVRowValue(row, columnIndexes, labelColumnName)
-				if value != "" {
-					labels[labelName] = value
-				}
-			}
-			for _, awsLabelColumnName := range awsLabelColumns {
-				// remove quotes
-				labelName := strings.TrimPrefix(awsLabelColumnName, `s."`)
-				labelName = strings.TrimSuffix(labelName, `"`)
-				// partially remove prefix leaving "aws:"
-				labelName = strings.TrimPrefix(labelName, S3SelectResourceTagsPrefix)
-				value := GetCSVRowValue(row, columnIndexes, awsLabelColumnName)
-				if value != "" {
-					labels[labelName] = value
-				}
+			cc, err3 := s3RowToCloudCost(row, columnIndexes, userLabelColumns, awsLabelColumns)
+			if err3 != nil {
+				log.Errorf("error creating cloud cost from row: %s", err3.Error())
+				continue
 			}
 
-			isKubernetes := 0.0
-			if itemProductCode == "AmazonEKS" || hasK8sLabel(labels) {
-				isKubernetes = 1.0
-			}
+			ccsr.LoadCloudCost(cc)
+		}
+	}
+	err = s3si.Query(query, queryKeys, client, processResults)
+	if err != nil {
+		return nil, err
+	}
 
-			var (
-				amortizedCost    float64
-				amortizedNetCost float64
-				listCost         float64
-				netCost          float64
-			)
-			// Get list and net costs
-			if lineItemType != "EdpDiscount" && lineItemType != "PrivateRateDiscount" {
-				listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
-				if err != nil {
-					return err
-				}
-			}
+	return ccsr, nil
+}
 
-			// Get net cost if available
-			netCost = listCost
-			if checkNet {
-				netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
-				if err != nil {
-					return err
-				}
-			}
+func s3RowToCloudCost(row []string, columnIndexes map[string]int, userLabelColumns, awsLabelColumns []string) (*opencost.CloudCost, error) {
+	startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
+	billPayerAccountID := GetCSVRowValue(row, columnIndexes, S3SelectBillPayerAccountID)
+	itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
+	itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
+	lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
+	itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
+	usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
+	regionCode := GetCSVRowValue(row, columnIndexes, S3SelectRegionCode)
+	availabilityZone := GetCSVRowValue(row, columnIndexes, S3SelectAvailabilityZone)
 
-			// If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
-			amortizedCost = listCost
-			amortizedNetCost = listCost
-			if lineItemType == "DiscountedUsage" {
-				if checkReservations {
-					amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
-					if err != nil {
-						log.Errorf(err.Error())
-						continue
-					}
-					amortizedNetCost = amortizedCost
-				}
-				if checkNetReservations {
-					amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetRICost)
-					if err != nil {
-						log.Errorf(err.Error())
-						continue
-					}
-				}
-				// If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
-			} else if lineItemType == "SavingsPlanCoveredUsage" {
-				if checkSavingsPlan {
-					amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
-					if err != nil {
-						log.Errorf(err.Error())
-						continue
-					}
-					amortizedNetCost = amortizedCost
-				}
-				if checkNetSavingsPlan {
-					amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetSPCost)
-					if err != nil {
-						log.Errorf(err.Error())
-						continue
-					}
-				}
-			}
+	// Iterate through the slice of tag columns, assigning
+	// values to the column names, minus the tag prefix.
+	labels := opencost.CloudCostLabels{}
+	for _, labelColumnName := range userLabelColumns {
+		// remove quotes
+		labelName := strings.TrimPrefix(labelColumnName, `s."`)
+		labelName = strings.TrimSuffix(labelName, `"`)
+		// remove prefix
+		labelName = strings.TrimPrefix(labelName, S3SelectUserLabelPrefix)
+		value := GetCSVRowValue(row, columnIndexes, labelColumnName)
+		if value != "" {
+			labels[labelName] = value
+		}
+	}
+	for _, awsLabelColumnName := range awsLabelColumns {
+		// remove quotes
+		labelName := strings.TrimPrefix(awsLabelColumnName, `s."`)
+		labelName = strings.TrimSuffix(labelName, `"`)
+		// partially remove prefix leaving "aws:"
+		labelName = strings.TrimPrefix(labelName, S3SelectResourceTagsPrefix)
+		value := GetCSVRowValue(row, columnIndexes, awsLabelColumnName)
+		if value != "" {
+			labels[labelName] = value
+		}
+	}
 
-			category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
-			// Retrieve final stanza of product code for ProviderID
-			if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
-				itemProviderID = ParseARN(itemProviderID)
-			}
+	isKubernetes := 0.0
+	if itemProductCode == "AmazonEKS" || hasK8sLabel(labels) {
+		isKubernetes = 1.0
+	}
 
-			properties := opencost.CloudCostProperties{}
-			properties.Provider = opencost.AWSProvider
-			properties.InvoiceEntityID = billPayerAccountID
-			properties.InvoiceEntityName = billPayerAccountID
-			properties.AccountID = itemAccountID
-			properties.AccountName = itemAccountID
-			properties.Category = category
-			properties.Service = itemProductCode
-			properties.ProviderID = itemProviderID
-			properties.RegionID = regionCode
-			properties.AvailabilityZone = availabilityZone
-			properties.Labels = labels
-
-			itemStart, err := time.Parse(S3SelectDateLayout, startStr)
+	var (
+		amortizedCost    float64
+		amortizedNetCost float64
+		listCost         float64
+		netCost          float64
+		err              error
+	)
+	// Get list and net costs
+	if lineItemType != TypeEDPDiscount && lineItemType != TypePrivateRateDiscount {
+		listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	// Get net cost if available
+	netCost = listCost
+	if _, ok := columnIndexes[S3SelectNetCost]; ok {
+		netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	// If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
+	amortizedCost = listCost
+	amortizedNetCost = listCost
+	if lineItemType == TypeDiscountedUsage {
+		if _, ok := columnIndexes[S3SelectRICost]; ok {
+			amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
 			if err != nil {
-				log.Infof(
-					"Unable to parse '%s': '%s'",
-					S3SelectStartDate,
-					err.Error(),
-				)
-				itemStart = time.Now()
+				return nil, err
 			}
-			itemStart = itemStart.Truncate(time.Hour * 24)
-			itemEnd := itemStart.AddDate(0, 0, 1)
-
-			cc := &opencost.CloudCost{
-				Properties: &properties,
-				Window:     opencost.NewWindow(&itemStart, &itemEnd),
-				ListCost: opencost.CostMetric{
-					Cost:              listCost,
-					KubernetesPercent: isKubernetes,
-				},
-				NetCost: opencost.CostMetric{
-					Cost:              netCost,
-					KubernetesPercent: isKubernetes,
-				},
-				AmortizedNetCost: opencost.CostMetric{
-					Cost:              amortizedCost,
-					KubernetesPercent: isKubernetes,
-				},
-				AmortizedCost: opencost.CostMetric{
-					Cost:              amortizedNetCost,
-					KubernetesPercent: isKubernetes,
-				},
-				InvoicedCost: opencost.CostMetric{
-					Cost:              netCost,
-					KubernetesPercent: isKubernetes,
-				},
+			amortizedNetCost = amortizedCost
+		}
+		if _, ok := columnIndexes[S3SelectNetRICost]; ok {
+			amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetRICost)
+			if err != nil {
+				return nil, err
+			}
+		}
+		// If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
+	} else if lineItemType == TypeSavingsPlanCoveredUsage {
+		if _, ok := columnIndexes[S3SelectSPCost]; ok {
+			amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
+			if err != nil {
+				return nil, err
+			}
+			amortizedNetCost = amortizedCost
+		}
+		if _, ok := columnIndexes[S3SelectNetSPCost]; ok {
+			amortizedNetCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetSPCost)
+			if err != nil {
+				return nil, err
 			}
-			ccsr.LoadCloudCost(cc)
 		}
 	}
-	err = s3si.Query(query, queryKeys, client, processResults)
+
+	category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
+	// Retrieve final stanza of product code for ProviderID
+	if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
+		itemProviderID = ParseARN(itemProviderID)
+	}
+
+	properties := opencost.CloudCostProperties{}
+	properties.Provider = opencost.AWSProvider
+	properties.InvoiceEntityID = billPayerAccountID
+	properties.InvoiceEntityName = billPayerAccountID
+	properties.AccountID = itemAccountID
+	properties.AccountName = itemAccountID
+	properties.Category = category
+	properties.Service = itemProductCode
+	properties.ProviderID = itemProviderID
+	properties.RegionID = regionCode
+	properties.AvailabilityZone = availabilityZone
+	properties.Labels = labels
+
+	itemStart, err := time.Parse(S3SelectDateLayout, startStr)
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf(
+			"Unable to parse '%s': '%s'",
+			S3SelectStartDate,
+			err.Error(),
+		)
 	}
+	itemStart = itemStart.Truncate(time.Hour * 24)
+	itemEnd := itemStart.AddDate(0, 0, 1)
 
-	return ccsr, nil
+	return &opencost.CloudCost{
+		Properties: &properties,
+		Window:     opencost.NewWindow(&itemStart, &itemEnd),
+		ListCost: opencost.CostMetric{
+			Cost:              listCost,
+			KubernetesPercent: isKubernetes,
+		},
+		NetCost: opencost.CostMetric{
+			Cost:              netCost,
+			KubernetesPercent: isKubernetes,
+		},
+		AmortizedNetCost: opencost.CostMetric{
+			Cost:              amortizedNetCost,
+			KubernetesPercent: isKubernetes,
+		},
+		AmortizedCost: opencost.CostMetric{
+			Cost:              amortizedCost,
+			KubernetesPercent: isKubernetes,
+		},
+		InvoicedCost: opencost.CostMetric{
+			Cost:              netCost,
+			KubernetesPercent: isKubernetes,
+		},
+	}, nil
 }
 
 const (

+ 376 - 0
pkg/cloud/aws/s3selectintegration_test.go

@@ -2,9 +2,11 @@ package aws
 
 import (
 	"os"
+	"reflect"
 	"testing"
 	"time"
 
+	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/util/json"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
@@ -67,3 +69,377 @@ func TestS3Integration_GetCloudCost(t *testing.T) {
 		})
 	}
 }
+
+func Test_s3RowToCloudCost(t *testing.T) {
+	columnIndexes := map[string]int{
+		S3SelectListCost:                         0,
+		S3SelectNetCost:                          1,
+		S3SelectRICost:                           2,
+		S3SelectNetRICost:                        3,
+		S3SelectSPCost:                           4,
+		S3SelectNetSPCost:                        5,
+		S3SelectStartDate:                        6,
+		S3SelectBillPayerAccountID:               7,
+		S3SelectAccountID:                        8,
+		S3SelectResourceID:                       9,
+		S3SelectItemType:                         10,
+		S3SelectProductCode:                      11,
+		S3SelectUsageType:                        12,
+		S3SelectRegionCode:                       13,
+		S3SelectAvailabilityZone:                 14,
+		`s."resourceTags/user:test"`:             15,
+		`s."resourceTags/aws:test"`:              16,
+		`s."resourceTags/user:eks:cluster-name"`: 17,
+	}
+
+	userTagColumns := []string{`s."resourceTags/user:test"`, `s."resourceTags/user:eks:cluster-name"`}
+	awsTagColumns := []string{`s."resourceTags/aws:test"`}
+
+	tests := []struct {
+		name           string
+		row            []string
+		columnIndexes  map[string]int
+		userTagColumns []string
+		awsTagColumns  []string
+		want           *opencost.CloudCost
+		wantErr        bool
+	}{
+		{
+			name:           "invalid list cost",
+			row:            []string{"invalid", "2", "3", "4", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", "itemType", "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want:           nil,
+			wantErr:        true,
+		},
+		{
+			name:           "invalid net cost",
+			row:            []string{"1", "invalid", "3", "4", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", "itemType", "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want:           nil,
+			wantErr:        true,
+		},
+		{
+			name:           "invalid RI cost",
+			row:            []string{"1", "2", "invalid", "4", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", TypeDiscountedUsage, "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want:           nil,
+			wantErr:        true,
+		},
+		{
+			name:           "invalid net RI cost",
+			row:            []string{"1", "2", "3", "invalid", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", TypeDiscountedUsage, "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want:           nil,
+			wantErr:        true,
+		},
+		{
+			name:           "invalid SP cost",
+			row:            []string{"1", "2", "3", "4", "invalid", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", TypeSavingsPlanCoveredUsage, "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want:           nil,
+			wantErr:        true,
+		},
+		{
+			name:           "invalid net SP cost",
+			row:            []string{"1", "2", "3", "4", "5", "invalid", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", TypeSavingsPlanCoveredUsage, "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want:           nil,
+			wantErr:        true,
+		},
+		{
+			name:           "invalid date",
+			row:            []string{"1", "2", "3", "4", "5", "6", "invalid", "payerAccountID", "usageAccountID", "resourceID", "itemType", "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want:           nil,
+			wantErr:        true,
+		},
+		{
+			name:           "valid empty labels",
+			row:            []string{"1", "2", "3", "4", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", "itemType", "productCode", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "resourceID",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "productCode",
+					Category:          opencost.OtherCategory,
+					Labels:            opencost.CloudCostLabels{},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name:           "valid Kubernetes RI with labels",
+			row:            []string{"1", "2", "3", "4", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", TypeDiscountedUsage, "productCode", "usageType", "regionCode", "availabilityZone", "userTagTestValue", "awsTagTestValue", "clusterName"},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "resourceID",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "productCode",
+					Category:          opencost.OtherCategory,
+					Labels: opencost.CloudCostLabels{
+						"test":             "userTagTestValue",
+						"eks:cluster-name": "clusterName",
+						"aws:test":         "awsTagTestValue",
+					},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 1,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 1,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              4,
+					KubernetesPercent: 1,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 1,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              3,
+					KubernetesPercent: 1,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name:           "valid Kubernetes SP no labels",
+			row:            []string{"1", "2", "3", "4", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID", TypeSavingsPlanCoveredUsage, "AmazonEKS", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "resourceID",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "AmazonEKS",
+					Category:          opencost.ManagementCategory,
+					Labels:            opencost.CloudCostLabels{},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 1,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 1,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              6,
+					KubernetesPercent: 1,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 1,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              5,
+					KubernetesPercent: 1,
+				},
+			},
+			wantErr: false,
+		},
+		{
+			name:           "valid Kubernetes load balancer product code",
+			row:            []string{"1", "2", "3", "4", "5", "6", "2024-09-01T00:00:00Z", "payerAccountID", "usageAccountID", "resourceID/lbID", TypeSavingsPlanCoveredUsage, "AWSELB", "usageType", "regionCode", "availabilityZone", "", "", ""},
+			columnIndexes:  columnIndexes,
+			userTagColumns: userTagColumns,
+			awsTagColumns:  awsTagColumns,
+			want: &opencost.CloudCost{
+				Properties: &opencost.CloudCostProperties{
+					ProviderID:        "lbID",
+					Provider:          "AWS",
+					AccountID:         "usageAccountID",
+					AccountName:       "usageAccountID",
+					InvoiceEntityID:   "payerAccountID",
+					InvoiceEntityName: "payerAccountID",
+					RegionID:          "regionCode",
+					AvailabilityZone:  "availabilityZone",
+					Service:           "AWSELB",
+					Category:          opencost.NetworkCategory,
+					Labels:            opencost.CloudCostLabels{},
+				},
+				Window: opencost.NewClosedWindow(
+					time.Date(2024, 9, 1, 0, 0, 0, 0, time.UTC),
+					time.Date(2024, 9, 2, 0, 0, 0, 0, time.UTC),
+				),
+				ListCost: opencost.CostMetric{
+					Cost:              1,
+					KubernetesPercent: 0,
+				},
+				NetCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedNetCost: opencost.CostMetric{
+					Cost:              6,
+					KubernetesPercent: 0,
+				},
+				InvoicedCost: opencost.CostMetric{
+					Cost:              2,
+					KubernetesPercent: 0,
+				},
+				AmortizedCost: opencost.CostMetric{
+					Cost:              5,
+					KubernetesPercent: 0,
+				},
+			},
+			wantErr: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := s3RowToCloudCost(tt.row, tt.columnIndexes, tt.userTagColumns, tt.awsTagColumns)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("s3RowToCloudCost() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("s3RowToCloudCost() got = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_hasK8sLabel(t *testing.T) {
+	tests := []struct {
+		name   string
+		labels opencost.CloudCostLabels
+		want   bool
+	}{
+		{
+			name:   "empty",
+			labels: opencost.CloudCostLabels{},
+			want:   false,
+		},
+		{
+			name: "no k8s label",
+			labels: opencost.CloudCostLabels{
+				"key": "value",
+			},
+			want: false,
+		},
+		{
+			name: "aws eks cluster name",
+			labels: opencost.CloudCostLabels{
+				TagAWSEKSClusterName: "value",
+			},
+			want: true,
+		},
+		{
+			name: "eks cluster name",
+			labels: opencost.CloudCostLabels{
+				TagEKSClusterName: "value",
+			},
+			want: true,
+		},
+		{
+			name: "eks ctl cluster name",
+			labels: opencost.CloudCostLabels{
+				TagEKSCtlClusterName: "value",
+			},
+			want: true,
+		},
+		{
+			name: "kubernetes service name",
+			labels: opencost.CloudCostLabels{
+				TagKubernetesServiceName: "value",
+			},
+			want: true,
+		},
+		{
+			name: "kubernetes pvc name",
+			labels: opencost.CloudCostLabels{
+				TagKubernetesPVCName: "value",
+			},
+			want: true,
+		},
+		{
+			name: "kubernetes pv name",
+			labels: opencost.CloudCostLabels{
+				TagKubernetesPVName: "value",
+			},
+			want: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := hasK8sLabel(tt.labels); got != tt.want {
+				t.Errorf("hasK8sLabel() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}