Răsfoiți Sursa

Merge pull request #2199 from opencost/v1.106-patch/aws-integration-improvements

Improve AWS integrations
Sean Holcomb 2 ani în urmă
părinte
comite
bb967a0f71

+ 103 - 170
pkg/cloud/aws/athenaintegration.go

@@ -29,14 +29,16 @@ const AthenaSPPricingColumn = "savings_plan_savings_plan_effective_cost"
 // Net Cost Columns
 const AthenaNetPricingColumn = "line_item_net_unblended_cost"
 
+var AthenaNetPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetPricingColumn, AthenaPricingColumn)
+
 // Amortized Net Cost Columns
 const AthenaNetRIPricingColumn = "reservation_net_effective_cost"
+
+var AthenaNetRIPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetRIPricingColumn, AthenaRIPricingColumn)
+
 const AthenaNetSPPricingColumn = "savings_plan_net_savings_plan_effective_cost"
 
-// Category Columns
-const AthenaIsNode = "SUBSTRING(line_item_resource_id,1,2) = 'i-'"
-const AthenaIsVol = "SUBSTRING(line_item_resource_id, 1, 4) = 'vol-'"
-const AthenaIsNetwork = "line_item_usage_type LIKE '%Bytes'"
+var AthenaNetSPPricingCoalesce = fmt.Sprintf("COALESCE(%s, %s, 0)", AthenaNetSPPricingColumn, AthenaSPPricingColumn)
 
 // athenaDateTruncColumn Aggregates line items from the hourly level to daily. "line_item_usage_start_date" is used because at
 // all time values 00:00-23:00 it will truncate to the correct date.
@@ -48,19 +50,14 @@ const AthenaWhereUsage = "(line_item_line_item_type = 'Usage' OR line_item_line_
 
 // AthenaQueryIndexes is a struct for holding the context of a query
 type AthenaQueryIndexes struct {
-	Query                     string
-	ColumnIndexes             map[string]int
-	TagColumns                []string
-	ListCostColumn            string
-	ListK8sCostColumn         string
-	NetCostColumn             string
-	NetK8sCostColumn          string
-	AmortizedNetCostColumn    string
-	AmortizedNetK8sCostColumn string
-	AmortizedCostColumn       string
-	AmortizedK8sCostColumn    string
-	InvoicedCostColumn        string
-	InvoicedK8sCostColumn     string
+	Query                  string
+	ColumnIndexes          map[string]int
+	TagColumns             []string
+	ListCostColumn         string
+	NetCostColumn          string
+	AmortizedNetCostColumn string
+	AmortizedCostColumn    string
+	IsK8sColumn            string
 }
 
 type AthenaIntegration struct {
@@ -84,14 +81,16 @@ func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*kubecost.Cloud
 		"line_item_usage_account_id",
 		"line_item_product_code",
 		"line_item_usage_type",
-		AthenaIsNode,
-		AthenaIsVol,
-		AthenaIsNetwork,
 	}
 
 	// Create query indices
 	aqi := AthenaQueryIndexes{}
 
+	// Add is k8s column
+	isK8sColumn := ai.GetIsKubernetesColumn(allColumns)
+	groupByColumns = append(groupByColumns, isK8sColumn)
+	aqi.IsK8sColumn = isK8sColumn
+
 	// Determine which columns are user-defined tags and add those to the list
 	// of columns to query.
 	for column := range allColumns {
@@ -109,69 +108,39 @@ func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*kubecost.Cloud
 	ai.RemoveColumnAliases(groupByColumns)
 
 	// Build list cost column and add it to the select columns
-	listCostColumn := fmt.Sprintf("SUM(%s) as list_cost", ai.GetListCostColumn())
+	listCostColumn := ai.GetListCostColumn()
 	selectColumns = append(selectColumns, listCostColumn)
 	aqi.ListCostColumn = listCostColumn
-	listK8sCostColumn := fmt.Sprintf(
-		"SUM(%s) as list_kubernetes_cost",
-		ai.GetKubernetesCostColumn(allColumns, ai.GetListCostColumn()),
-	)
-	selectColumns = append(selectColumns, listK8sCostColumn)
-	aqi.ListK8sCostColumn = listK8sCostColumn
 
 	// Build net cost column and add it to select columns
-	netCostColumn := fmt.Sprintf("SUM(%s) as net_cost", ai.GetNetCostColumn(allColumns))
+	netCostColumn := ai.GetNetCostColumn(allColumns)
 	selectColumns = append(selectColumns, netCostColumn)
 	aqi.NetCostColumn = netCostColumn
-	netK8sCostColumn := fmt.Sprintf(
-		"SUM(%s) as net_kubernetes_cost",
-		ai.GetKubernetesCostColumn(allColumns, ai.GetNetCostColumn(allColumns)),
-	)
-	selectColumns = append(selectColumns, netK8sCostColumn)
-	aqi.NetK8sCostColumn = netK8sCostColumn
 
 	// Build amortized net cost column and add it to select columns
-	amortizedNetCostColumn := fmt.Sprintf("SUM(%s) as amortized_net_cost", ai.GetAmortizedNetCostColumn(allColumns))
+	amortizedNetCostColumn := ai.GetAmortizedNetCostColumn(allColumns)
 	selectColumns = append(selectColumns, amortizedNetCostColumn)
 	aqi.AmortizedNetCostColumn = amortizedNetCostColumn
-	amortizedNetK8sCostColumn := fmt.Sprintf(
-		"SUM(%s) as amortized_net_kubernetes_cost",
-		ai.GetKubernetesCostColumn(allColumns, ai.GetNetCostColumn(allColumns)),
-	)
-	selectColumns = append(selectColumns, amortizedNetK8sCostColumn)
-	aqi.AmortizedNetK8sCostColumn = amortizedNetK8sCostColumn
 
 	// Build Amortized cost column and add it to select columns
-	amortizedCostColumn := fmt.Sprintf("SUM(%s) as amortized_cost", ai.GetAmortizedCostCase(allColumns))
+	amortizedCostColumn := ai.GetAmortizedCostColumn(allColumns)
 	selectColumns = append(selectColumns, amortizedCostColumn)
 	aqi.AmortizedCostColumn = amortizedCostColumn
-	amortizedK8sCostColumn := fmt.Sprintf(
-		"SUM(%s) as amortized_kubernetes_cost",
-		ai.GetKubernetesCostColumn(allColumns, ai.GetAmortizedCostCase(allColumns)),
-	)
-	selectColumns = append(selectColumns, amortizedK8sCostColumn)
-	aqi.AmortizedK8sCostColumn = amortizedK8sCostColumn
-
-	// We are using Net Cost for Invoiced Cost for now as it is the closest approximation
-	invoicedCostColumn := netCostColumn
-	selectColumns = append(selectColumns, invoicedCostColumn)
-	aqi.InvoicedCostColumn = invoicedCostColumn
-	invoicedK8sCostColumn := netK8sCostColumn
-	selectColumns = append(selectColumns, invoicedK8sCostColumn)
-	aqi.InvoicedK8sCostColumn = invoicedK8sCostColumn
 
 	// Build map of query columns to use for parsing query
 	aqi.ColumnIndexes = map[string]int{}
 	for i, column := range selectColumns {
 		aqi.ColumnIndexes[column] = i
 	}
-	athenaWhereDate := fmt.Sprintf(AthenaWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
+	whereDate := fmt.Sprintf(AthenaWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
+	wherePartitions := ai.GetPartitionWhere(start, end)
 
 	// Query for all line items with a resource_id or from AWS Marketplace, which did not end before
 	// the range or start after it. This captures all costs with any amount of
 	// overlap with the range, for which we will only extract the relevant costs
 	whereConjuncts := []string{
-		athenaWhereDate,
+		wherePartitions,
+		whereDate,
 		AthenaWhereUsage,
 	}
 	columnStr := strings.Join(selectColumns, ", ")
@@ -204,10 +173,8 @@ func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*kubecost.Cloud
 		return nil, err
 	}
 
-	for _, ccs := range ccsr.CloudCostSets {
-		log.Debugf("AthenaIntegration[%s]: GetCloudCost: writing compute items for window %s: %d", ai.Key(), ccs.Window, len(ccs.CloudCosts))
-		ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccs, ai.ConnectionStatus)
-	}
+	ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccsr, ai.ConnectionStatus)
+
 	return ccsr, nil
 
 }
@@ -220,17 +187,22 @@ func (ai *AthenaIntegration) GetListCostColumn() string {
 	listCostBuilder.WriteString(" ELSE ")
 	listCostBuilder.WriteString(AthenaPricingColumn)
 	listCostBuilder.WriteString(" END")
-	return listCostBuilder.String()
+	return fmt.Sprintf("SUM(%s) as list_cost", listCostBuilder.String())
 }
 
 func (ai *AthenaIntegration) GetNetCostColumn(allColumns map[string]bool) string {
 	netCostColumn := ""
 	if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
-		netCostColumn = AthenaNetPricingColumn
+		netCostColumn = AthenaNetPricingCoalesce
 	} else { // Non-net for if there's no net pricing.
 		netCostColumn = AthenaPricingColumn
 	}
-	return netCostColumn
+	return fmt.Sprintf("SUM(%s) as net_cost", netCostColumn)
+}
+
+func (ai *AthenaIntegration) GetAmortizedCostColumn(allColumns map[string]bool) string {
+	amortizedCostCase := ai.GetAmortizedCostCase(allColumns)
+	return fmt.Sprintf("SUM(%s) as amortized_cost", amortizedCostCase)
 }
 
 func (ai *AthenaIntegration) GetAmortizedNetCostColumn(allColumns map[string]bool) string {
@@ -240,43 +212,7 @@ func (ai *AthenaIntegration) GetAmortizedNetCostColumn(allColumns map[string]boo
 	} else { // Non-net for if there's no net pricing.
 		amortizedNetCostCase = ai.GetAmortizedCostCase(allColumns)
 	}
-	return amortizedNetCostCase
-}
-
-// getIsKubernetesColumn generates a boolean column which determines whether a line item is from kubernetes
-func (ai *AthenaIntegration) GetIsKubernetesColumn(allColumns map[string]bool) string {
-	return ai.GetIsKubernetesCase(allColumns)
-}
-
-// getKubernetesCostColumn generates a double column which determines the cost of k8s items in an aggregate
-func (ai *AthenaIntegration) GetKubernetesCostColumn(allColumns map[string]bool, pricingCase string) string {
-	k8sCase := ai.GetIsKubernetesCase(allColumns)
-	return fmt.Sprintf("CAST((%s) as double) * (%s)", k8sCase, pricingCase)
-
-}
-
-func (ai *AthenaIntegration) RemoveColumnAliases(columns []string) {
-	for i, column := range columns {
-		if strings.Contains(column, " as ") {
-			columnValues := strings.Split(column, " as ")
-			columns[i] = columnValues[0]
-		}
-	}
-}
-
-func (ai *AthenaIntegration) ConvertLabelToAWSTag(label string) string {
-	// if the label already has the column prefix assume that it is in the correct format
-	if strings.HasPrefix(label, LabelColumnPrefix) {
-		return label
-	}
-	// replace characters with underscore
-	tag := label
-	tag = strings.ReplaceAll(tag, ".", "_")
-	tag = strings.ReplaceAll(tag, "/", "_")
-	tag = strings.ReplaceAll(tag, ":", "_")
-	tag = strings.ReplaceAll(tag, "-", "_")
-	// add prefix and return
-	return LabelColumnPrefix + tag
+	return fmt.Sprintf("SUM(%s) as amortized_net_cost", amortizedNetCostCase)
 }
 
 func (ai *AthenaIntegration) GetAmortizedCostCase(allColumns map[string]bool) string {
@@ -306,32 +242,58 @@ func (ai *AthenaIntegration) GetAmortizedCostCase(allColumns map[string]bool) st
 func (ai *AthenaIntegration) GetAmortizedNetCostCase(allColumns map[string]bool) string {
 	// Use net unblended costs if Reserved Instances/Savings Plans aren't in use
 	if !allColumns[AthenaNetRIPricingColumn] && !allColumns[AthenaNetSPPricingColumn] {
-		return AthenaNetPricingColumn
+		return AthenaNetPricingCoalesce
 	}
 
 	var costBuilder strings.Builder
 	costBuilder.WriteString("CASE line_item_line_item_type")
 	if allColumns[AthenaNetRIPricingColumn] {
 		costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
-		costBuilder.WriteString(AthenaNetRIPricingColumn)
+		costBuilder.WriteString(AthenaNetRIPricingCoalesce)
 	}
 
 	if allColumns[AthenaNetSPPricingColumn] {
 		costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
-		costBuilder.WriteString(AthenaNetSPPricingColumn)
+		costBuilder.WriteString(AthenaNetSPPricingCoalesce)
 	}
 
 	costBuilder.WriteString(" ELSE ")
-	costBuilder.WriteString(AthenaNetPricingColumn)
+	costBuilder.WriteString(AthenaNetPricingCoalesce)
 	costBuilder.WriteString(" END")
 	return costBuilder.String()
 }
 
-// GetIsKubernetesCase builds a "CASE" clause which attempts to determine if a line item is kubernetes based on labels
-// that may be available in the CUR
-func (ai *AthenaIntegration) GetIsKubernetesCase(allColumns map[string]bool) string {
-	// k8sColumns is a list of columns where the presence of a value indicates that a resource is part of a kubernetes cluster
-	k8sColumns := []string{
+func (ai *AthenaIntegration) RemoveColumnAliases(columns []string) {
+	for i, column := range columns {
+		if strings.Contains(column, " as ") {
+			columnValues := strings.Split(column, " as ")
+			columns[i] = columnValues[0]
+		}
+	}
+}
+
+func (ai *AthenaIntegration) ConvertLabelToAWSTag(label string) string {
+	// if the label already has the column prefix assume that it is in the correct format
+	if strings.HasPrefix(label, LabelColumnPrefix) {
+		return label
+	}
+	// replace characters with underscore
+	tag := label
+	tag = strings.ReplaceAll(tag, ".", "_")
+	tag = strings.ReplaceAll(tag, "/", "_")
+	tag = strings.ReplaceAll(tag, ":", "_")
+	tag = strings.ReplaceAll(tag, "-", "_")
+	// add prefix and return
+	return LabelColumnPrefix + tag
+}
+
+// GetIsKubernetesColumn builds a column that determines if a row represents kubernetes spend
+func (ai *AthenaIntegration) GetIsKubernetesColumn(allColumns map[string]bool) string {
+	disjuncts := []string{
+		"line_item_product_code = 'AmazonEKS'", // EKS is always kubernetes
+	}
+	// tagColumns is a list of columns where the presence of a value indicates that a resource is part of a kubernetes cluster
+	tagColumns := []string{
 		"resource_tags_aws_eks_cluster_name",
 		"resource_tags_user_eks_cluster_name",
 		"resource_tags_user_alpha_eksctl_io_cluster_name",
@@ -339,21 +301,28 @@ func (ai *AthenaIntegration) GetIsKubernetesCase(allColumns map[string]bool) str
 		"resource_tags_user_kubernetes_io_created_for_pvc_name",
 		"resource_tags_user_kubernetes_io_created_for_pv_name",
 	}
-	var k8sBuilder strings.Builder
 
-	k8sBuilder.WriteString("CASE ")
-	// EKS is always kubernetes
-	k8sBuilder.WriteString("WHEN line_item_product_code = 'AmazonEKS' THEN TRUE ")
-	for _, k8sColumn := range k8sColumns {
-		if _, ok := allColumns[k8sColumn]; ok {
-			k8sBuilder.WriteString("WHEN ")
-			k8sBuilder.WriteString(k8sColumn)
-			k8sBuilder.WriteString(" <> '' THEN TRUE ")
+	for _, tagColumn := range tagColumns {
+		// if tag column is present in the CUR check for it
+		if _, ok := allColumns[tagColumn]; ok {
+			disjunctStr := fmt.Sprintf("%s <> ''", tagColumn)
+			disjuncts = append(disjuncts, disjunctStr)
 		}
 	}
 
-	k8sBuilder.WriteString("ELSE FALSE END")
-	return k8sBuilder.String()
+	return fmt.Sprintf("(%s) as is_kubernetes", strings.Join(disjuncts, " OR "))
+}
+
+func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
+	month := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, time.UTC)
+	endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, time.UTC)
+	var disjuncts []string
+	for !month.After(endMonth) {
+		disjuncts = append(disjuncts, fmt.Sprintf("(year = '%d' AND month = '%d')", month.Year(), month.Month()))
+		month = month.AddDate(0, 1, 0)
+	}
+	str := fmt.Sprintf("(%s)", strings.Join(disjuncts, " OR "))
+	return str
 }
 
 func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexes, ccsr *kubecost.CloudCostSetRange) error {
@@ -380,16 +349,13 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
 	providerID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_resource_id")
 	productCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_product_code")
 	usageType := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_type")
-	isNode, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaIsNode))
-	isVol, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaIsVol))
-	isNetwork, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaIsNetwork))
-
-	listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
-	if err != nil {
-		return err
+	isK8s, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, aqi.IsK8sColumn))
+	k8sPct := 0.0
+	if isK8s {
+		k8sPct = 1.0
 	}
 
-	listK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListK8sCostColumn)
+	listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
 	if err != nil {
 		return err
 	}
@@ -399,42 +365,18 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
 		return err
 	}
 
-	netK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetK8sCostColumn)
-	if err != nil {
-		return err
-	}
-
 	amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
 	if err != nil {
 		return err
 	}
 
-	amortizedNetK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetK8sCostColumn)
-	if err != nil {
-		return err
-	}
 	amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
 	if err != nil {
 		return err
 	}
 
-	amortizedK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedK8sCostColumn)
-	if err != nil {
-		return err
-	}
-
-	invoicedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.InvoicedCostColumn)
-	if err != nil {
-		return err
-	}
-
-	invoicedK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.InvoicedK8sCostColumn)
-	if err != nil {
-		return err
-	}
-
 	// Identify resource category in the CUR
-	category := SelectAWSCategory(isNode, isVol, isNetwork, providerID, productCode)
+	category := SelectAWSCategory(providerID, usageType, productCode)
 
 	// Retrieve final stanza of product code for ProviderID
 	if productCode == "AWSELB" || productCode == "AmazonFSx" {
@@ -470,23 +412,23 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
 		Window:     kubecost.NewWindow(&start, &end),
 		ListCost: kubecost.CostMetric{
 			Cost:              listCost,
-			KubernetesPercent: ai.CalculateK8sPercent(listCost, listK8sCost),
+			KubernetesPercent: k8sPct,
 		},
 		NetCost: kubecost.CostMetric{
 			Cost:              netCost,
-			KubernetesPercent: ai.CalculateK8sPercent(netCost, netK8sCost),
+			KubernetesPercent: k8sPct,
 		},
 		AmortizedNetCost: kubecost.CostMetric{
 			Cost:              amortizedNetCost,
-			KubernetesPercent: ai.CalculateK8sPercent(amortizedNetCost, amortizedNetK8sCost),
+			KubernetesPercent: k8sPct,
 		},
 		AmortizedCost: kubecost.CostMetric{
 			Cost:              amortizedCost,
-			KubernetesPercent: ai.CalculateK8sPercent(amortizedCost, amortizedK8sCost),
+			KubernetesPercent: k8sPct,
 		},
 		InvoicedCost: kubecost.CostMetric{
-			Cost:              invoicedCost,
-			KubernetesPercent: ai.CalculateK8sPercent(invoicedCost, invoicedK8sCost),
+			Cost:              netCost, // We are using Net Cost for Invoiced Cost for now as it is the closest approximation
+			KubernetesPercent: k8sPct,
 		},
 	}
 
@@ -494,15 +436,6 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
 	return nil
 }
 
-func (ai *AthenaIntegration) CalculateK8sPercent(cost, k8sCost float64) float64 {
-	// Calculate percent of cost that is k8s with the k8sCost
-	k8sPercent := 0.0
-	if k8sCost != 0.0 && cost != 0.0 {
-		k8sPercent = k8sCost / cost
-	}
-	return k8sPercent
-}
-
 func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {
 	if result.IsEmpty() && currentStatus != cloud.SuccessfulConnection {
 		return cloud.MissingData

+ 4 - 4
pkg/cloud/aws/athenaquerier.go

@@ -190,18 +190,18 @@ func GetAthenaRowValueFloat(row types.Row, queryColumnIndexes map[string]int, co
 	return cost, nil
 }
 
-func SelectAWSCategory(isNode, isVol, isNetwork bool, providerID, service string) string {
+func SelectAWSCategory(providerID, usageType, service string) string {
 	// Network has the highest priority and is based on the usage type ending in "Bytes"
-	if isNetwork {
+	if strings.HasSuffix(usageType, "Bytes") {
 		return kubecost.NetworkCategory
 	}
 	// The node and volume conditions are mutually exclusive.
 	// Provider ID has prefix "i-"
-	if isNode {
+	if strings.HasPrefix(providerID, "i-") {
 		return kubecost.ComputeCategory
 	}
 	// Provider ID has prefix "vol-"
-	if isVol {
+	if strings.HasPrefix(providerID, "vol-") {
 		return kubecost.StorageCategory
 	}
 

+ 36 - 43
pkg/cloud/aws/s3selectintegration.go

@@ -4,7 +4,6 @@ import (
 	"encoding/csv"
 	"fmt"
 	"io"
-	"strconv"
 	"strings"
 	"time"
 
@@ -14,26 +13,23 @@ import (
 	"github.com/opencost/opencost/pkg/util/timeutil"
 )
 
-const s3SelectDateLayout = "2006-01-02T15:04:05Z"
+const S3SelectDateLayout = "2006-01-02T15:04:05Z"
 
 // S3Object is aliased as "s" in queries
-const s3SelectAccountID = `s."bill/PayerAccountId"`
+const S3SelectAccountID = `s."bill/PayerAccountId"`
 
-const s3SelectItemType = `s."lineItem/LineItemType"`
-const s3SelectStartDate = `s."lineItem/UsageStartDate"`
-const s3SelectProductCode = `s."lineItem/ProductCode"`
-const s3SelectResourceID = `s."lineItem/ResourceId"`
+const S3SelectItemType = `s."lineItem/LineItemType"`
+const S3SelectStartDate = `s."lineItem/UsageStartDate"`
+const S3SelectProductCode = `s."lineItem/ProductCode"`
+const S3SelectResourceID = `s."lineItem/ResourceId"`
+const S3SelectUsageType = `s."lineItem/UsageType"`
 
-const s3SelectIsNode = `SUBSTRING(s."lineItem/ResourceId",1,2) = 'i-'`
-const s3SelectIsVol = `SUBSTRING(s."lineItem/ResourceId", 1, 4) = 'vol-'`
-const s3SelectIsNetwork = `s."lineItem/UsageType" LIKE '%Bytes'`
-
-const s3SelectListCost = `s."lineItem/UnblendedCost"`
-const s3SelectNetCost = `s."lineItem/NetUnblendedCost"`
+const S3SelectListCost = `s."lineItem/UnblendedCost"`
+const S3SelectNetCost = `s."lineItem/NetUnblendedCost"`
 
 // These two may be used for Amortized<Net>Cost
-const s3SelectRICost = `s."reservation/EffectiveCost"`
-const s3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
+const S3SelectRICost = `s."reservation/EffectiveCost"`
+const S3SelectSPCost = `s."savingsPlan/SavingsPlanEffectiveCost"`
 
 type S3SelectIntegration struct {
 	S3SelectQuerier
@@ -93,27 +89,25 @@ func (s3si *S3SelectIntegration) GetCloudCost(
 	formattedStart := start.Format("2006-01-02")
 	formattedEnd := end.Format("2006-01-02")
 	selectColumns := []string{
-		s3SelectStartDate,
-		s3SelectAccountID,
-		s3SelectResourceID,
-		s3SelectItemType,
-		s3SelectProductCode,
-		s3SelectIsNode,
-		s3SelectIsVol,
-		s3SelectIsNetwork,
-		s3SelectListCost,
+		S3SelectStartDate,
+		S3SelectAccountID,
+		S3SelectResourceID,
+		S3SelectItemType,
+		S3SelectProductCode,
+		S3SelectUsageType,
+		S3SelectListCost,
 	}
 	// OC equivalent to KCM env flags relevant at all?
 	// Check for Reservation columns in CUR and query if available
-	checkReservations := allColumns[s3SelectRICost]
+	checkReservations := allColumns[S3SelectRICost]
 	if checkReservations {
-		selectColumns = append(selectColumns, s3SelectRICost)
+		selectColumns = append(selectColumns, S3SelectRICost)
 	}
 
 	// Check for Savings Plan Columns in CUR and query if available
-	checkSavingsPlan := allColumns[s3SelectSPCost]
+	checkSavingsPlan := allColumns[S3SelectSPCost]
 	if checkSavingsPlan {
-		selectColumns = append(selectColumns, s3SelectSPCost)
+		selectColumns = append(selectColumns, S3SelectSPCost)
 	}
 
 	// Build map of query columns to use for parsing query
@@ -149,39 +143,38 @@ func (s3si *S3SelectIntegration) GetCloudCost(
 				return nil
 			}
 
-			startStr := GetCSVRowValue(row, columnIndexes, s3SelectStartDate)
-			itemAccountID := GetCSVRowValue(row, columnIndexes, s3SelectAccountID)
-			itemProviderID := GetCSVRowValue(row, columnIndexes, s3SelectResourceID)
-			lineItemType := GetCSVRowValue(row, columnIndexes, s3SelectItemType)
-			itemProductCode := GetCSVRowValue(row, columnIndexes, s3SelectProductCode)
-			isNode, _ := strconv.ParseBool(GetCSVRowValue(row, columnIndexes, s3SelectIsNode))
-			isVol, _ := strconv.ParseBool(GetCSVRowValue(row, columnIndexes, s3SelectIsVol))
-			isNetwork, _ := strconv.ParseBool(GetCSVRowValue(row, columnIndexes, s3SelectIsNetwork))
+			startStr := GetCSVRowValue(row, columnIndexes, S3SelectStartDate)
+			itemAccountID := GetCSVRowValue(row, columnIndexes, S3SelectAccountID)
+			itemProviderID := GetCSVRowValue(row, columnIndexes, S3SelectResourceID)
+			lineItemType := GetCSVRowValue(row, columnIndexes, S3SelectItemType)
+			itemProductCode := GetCSVRowValue(row, columnIndexes, S3SelectProductCode)
+			usageType := GetCSVRowValue(row, columnIndexes, S3SelectUsageType)
+
 			var (
 				amortizedCost float64
 				listCost      float64
 				netCost       float64
 			)
 			// Get list and net costs
-			listCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectListCost)
+			listCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectListCost)
 			if err != nil {
 				return err
 			}
-			netCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectNetCost)
+			netCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectNetCost)
 			if err != nil {
 				return err
 			}
 
 			// If there is a reservation_reservation_a_r_n on the line item use the awsRIPricingSUMColumn as cost
 			if checkReservations && lineItemType == "DiscountedUsage" {
-				amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectRICost)
+				amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectRICost)
 				if err != nil {
 					log.Errorf(err.Error())
 					continue
 				}
 				// If there is a lineItemType of SavingsPlanCoveredUsage use the awsSPPricingSUMColumn
 			} else if checkSavingsPlan && lineItemType == "SavingsPlanCoveredUsage" {
-				amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, s3SelectSPCost)
+				amortizedCost, err = GetCSVRowValueFloat(row, columnIndexes, S3SelectSPCost)
 				if err != nil {
 					log.Errorf(err.Error())
 					continue
@@ -190,7 +183,7 @@ func (s3si *S3SelectIntegration) GetCloudCost(
 				// Default to listCost
 				amortizedCost = listCost
 			}
-			category := SelectAWSCategory(isNode, isVol, isNetwork, itemProductCode, "")
+			category := SelectAWSCategory(itemProviderID, usageType, itemProductCode)
 			// Retrieve final stanza of product code for ProviderID
 			if itemProductCode == "AWSELB" || itemProductCode == "AmazonFSx" {
 				itemProviderID = ParseARN(itemProviderID)
@@ -203,11 +196,11 @@ func (s3si *S3SelectIntegration) GetCloudCost(
 			properties.Service = itemProductCode
 			properties.ProviderID = itemProviderID
 
-			itemStart, err := time.Parse(s3SelectDateLayout, startStr)
+			itemStart, err := time.Parse(S3SelectDateLayout, startStr)
 			if err != nil {
 				log.Infof(
 					"Unable to parse '%s': '%s'",
-					s3SelectStartDate,
+					S3SelectStartDate,
 					err.Error(),
 				)
 				itemStart = time.Now()