Просмотр исходного кода

Merge branch 'develop' into feat/arm-node-exporter

Mark 3 лет назад
Родитель
Сommit
16d30a357d

+ 0 - 10
pkg/cloud/aws/athenaconfiguration.go

@@ -3,7 +3,6 @@ package aws
 import (
 	"fmt"
 
-	"github.com/aws/aws-sdk-go-v2/service/athena"
 	"github.com/opencost/opencost/pkg/cloud/config"
 	"github.com/opencost/opencost/pkg/util/json"
 )
@@ -175,15 +174,6 @@ func (ac *AthenaConfiguration) UnmarshalJSON(b []byte) error {
 	return nil
 }
 
-func (ac *AthenaConfiguration) GetAthenaClient() (*athena.Client, error) {
-	cfg, err := ac.Authorizer.CreateAWSConfig(ac.Region)
-	if err != nil {
-		return nil, err
-	}
-	cli := athena.NewFromConfig(cfg)
-	return cli, nil
-}
-
 // ConvertAwsAthenaInfoToConfig takes a legacy config and generates a Config based on the presence of properties to match
 // legacy behavior
 func ConvertAwsAthenaInfoToConfig(aai AwsAthenaInfo) config.KeyedConfig {

+ 521 - 0
pkg/cloud/aws/athenaintegration.go

@@ -0,0 +1,521 @@
+package aws
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/aws/aws-sdk-go-v2/service/athena/types"
+	"github.com/opencost/opencost/pkg/cloud"
+	"github.com/opencost/opencost/pkg/kubecost"
+	"github.com/opencost/opencost/pkg/log"
+	"github.com/opencost/opencost/pkg/util/timeutil"
+)
+
+const LabelColumnPrefix = "resource_tags_user_"
+
+// athenaDateLayout is the default AWS date format
+const AthenaDateLayout = "2006-01-02 15:04:05.000"
+
+// Cost Columns
+const AthenaPricingColumn = "line_item_unblended_cost"
+
+// Amortized Cost Columns
+const AthenaRIPricingColumn = "reservation_effective_cost"
+const AthenaSPPricingColumn = "savings_plan_savings_plan_effective_cost"
+
+// Net Cost Columns
+const AthenaNetPricingColumn = "line_item_net_unblended_cost"
+
+// Amortized Net Cost Columns
+const AthenaNetRIPricingColumn = "reservation_net_effective_cost"
+const AthenaNetSPPricingColumn = "savings_plan_net_savings_plan_effective_cost"
+
+// Category Columns
+const AthenaIsNode = "SUBSTRING(line_item_resource_id,1,2) = 'i-'"
+const AthenaIsVol = "SUBSTRING(line_item_resource_id, 1, 4) = 'vol-'"
+const AthenaIsNetwork = "line_item_usage_type LIKE '%Bytes'"
+
+// athenaDateTruncColumn Aggregates line items from the hourly level to daily. "line_item_usage_start_date" is used because at
+// all time values 00:00-23:00 it will truncate to the correct date.
+const AthenaDateColumn = "line_item_usage_start_date"
+const AthenaDateTruncColumn = "DATE_TRUNC('day'," + AthenaDateColumn + ") as usage_date"
+
+const AthenaWhereDateFmt = `line_item_usage_start_date >= date '%s' AND line_item_usage_start_date < date '%s'`
+const AthenaWhereUsage = "(line_item_line_item_type = 'Usage' OR line_item_line_item_type = 'DiscountedUsage' OR line_item_line_item_type = 'SavingsPlanCoveredUsage' OR line_item_line_item_type = 'EdpDiscount' OR line_item_line_item_type = 'PrivateRateDiscount')"
+
+// AthenaQueryIndexes is a struct for holding the context of a query
+type AthenaQueryIndexes struct {
+	Query                     string
+	ColumnIndexes             map[string]int
+	TagColumns                []string
+	ListCostColumn            string
+	ListK8sCostColumn         string
+	NetCostColumn             string
+	NetK8sCostColumn          string
+	AmortizedNetCostColumn    string
+	AmortizedNetK8sCostColumn string
+	AmortizedCostColumn       string
+	AmortizedK8sCostColumn    string
+	InvoicedCostColumn        string
+	InvoicedK8sCostColumn     string
+}
+
+type AthenaIntegration struct {
+	AthenaQuerier
+}
+
+// Query Athena for CUR data and build a new CloudCostSetRange containing the info
+func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*kubecost.CloudCostSetRange, error) {
+	log.Infof("AthenaIntegration[%s]: StoreCloudCost: %s", ai.Key(), kubecost.NewWindow(&start, &end).String())
+	// Query for all column names
+	allColumns, err := ai.GetColumns()
+	if err != nil {
+		return nil, fmt.Errorf("GetCloudCost: error getting Athena columns: %w", err)
+	}
+
+	// List known, hard-coded columns to query
+	groupByColumns := []string{
+		AthenaDateTruncColumn,
+		"line_item_resource_id",
+		"bill_payer_account_id",
+		"line_item_usage_account_id",
+		"line_item_product_code",
+		"line_item_usage_type",
+		AthenaIsNode,
+		AthenaIsVol,
+		AthenaIsNetwork,
+	}
+
+	// Create query indices
+	aqi := AthenaQueryIndexes{}
+
+	// Determine which columns are user-defined tags and add those to the list
+	// of columns to query.
+	for column := range allColumns {
+		if strings.HasPrefix(column, LabelColumnPrefix) {
+			groupByColumns = append(groupByColumns, column)
+			aqi.TagColumns = append(aqi.TagColumns, column)
+		}
+	}
+	var selectColumns []string
+
+	// Duplicate GroupBy Columns into select columns
+	selectColumns = append(selectColumns, groupByColumns...)
+
+	// Clean Up group by columns
+	ai.RemoveColumnAliases(groupByColumns)
+
+	// Build list cost column and add it to the select columns
+	listCostColumn := fmt.Sprintf("SUM(%s) as list_cost", ai.GetListCostColumn())
+	selectColumns = append(selectColumns, listCostColumn)
+	aqi.ListCostColumn = listCostColumn
+	listK8sCostColumn := fmt.Sprintf(
+		"SUM(%s) as list_kubernetes_cost",
+		ai.GetKubernetesCostColumn(allColumns, ai.GetListCostColumn()),
+	)
+	selectColumns = append(selectColumns, listK8sCostColumn)
+	aqi.ListK8sCostColumn = listK8sCostColumn
+
+	// Build net cost column and add it to select columns
+	netCostColumn := fmt.Sprintf("SUM(%s) as net_cost", ai.GetNetCostColumn(allColumns))
+	selectColumns = append(selectColumns, netCostColumn)
+	aqi.NetCostColumn = netCostColumn
+	netK8sCostColumn := fmt.Sprintf(
+		"SUM(%s) as net_kubernetes_cost",
+		ai.GetKubernetesCostColumn(allColumns, ai.GetNetCostColumn(allColumns)),
+	)
+	selectColumns = append(selectColumns, netK8sCostColumn)
+	aqi.NetK8sCostColumn = netK8sCostColumn
+
+	// Build amortized net cost column and add it to select columns
+	amortizedNetCostColumn := fmt.Sprintf("SUM(%s) as amortized_net_cost", ai.GetAmortizedNetCostColumn(allColumns))
+	selectColumns = append(selectColumns, amortizedNetCostColumn)
+	aqi.AmortizedNetCostColumn = amortizedNetCostColumn
+	amortizedNetK8sCostColumn := fmt.Sprintf(
+		"SUM(%s) as amortized_net_kubernetes_cost",
+		ai.GetKubernetesCostColumn(allColumns, ai.GetNetCostColumn(allColumns)),
+	)
+	selectColumns = append(selectColumns, amortizedNetK8sCostColumn)
+	aqi.AmortizedNetK8sCostColumn = amortizedNetK8sCostColumn
+
+	// Build Amortized cost column and add it to select columns
+	amortizedCostColumn := fmt.Sprintf("SUM(%s) as amortized_cost", ai.GetAmortizedCostCase(allColumns))
+	selectColumns = append(selectColumns, amortizedCostColumn)
+	aqi.AmortizedCostColumn = amortizedCostColumn
+	amortizedK8sCostColumn := fmt.Sprintf(
+		"SUM(%s) as amortized_kubernetes_cost",
+		ai.GetKubernetesCostColumn(allColumns, ai.GetAmortizedCostCase(allColumns)),
+	)
+	selectColumns = append(selectColumns, amortizedK8sCostColumn)
+	aqi.AmortizedK8sCostColumn = amortizedK8sCostColumn
+
+	// We are using Net Cost for Invoiced Cost for now as it is the closest approximation
+	invoicedCostColumn := netCostColumn
+	selectColumns = append(selectColumns, invoicedCostColumn)
+	aqi.InvoicedCostColumn = invoicedCostColumn
+	invoicedK8sCostColumn := netK8sCostColumn
+	selectColumns = append(selectColumns, invoicedK8sCostColumn)
+	aqi.InvoicedK8sCostColumn = invoicedK8sCostColumn
+
+	// Build map of query columns to use for parsing query
+	aqi.ColumnIndexes = map[string]int{}
+	for i, column := range selectColumns {
+		aqi.ColumnIndexes[column] = i
+	}
+	athenaWhereDate := fmt.Sprintf(AthenaWhereDateFmt, start.Format("2006-01-02"), end.Format("2006-01-02"))
+
+	// Query for all line items with a resource_id or from AWS Marketplace, which did not end before
+	// the range or start after it. This captures all costs with any amount of
+	// overlap with the range, for which we will only extract the relevant costs
+	whereConjuncts := []string{
+		athenaWhereDate,
+		AthenaWhereUsage,
+	}
+	columnStr := strings.Join(selectColumns, ", ")
+	whereClause := strings.Join(whereConjuncts, " AND ")
+	groupByStr := strings.Join(groupByColumns, ", ")
+	queryStr := `
+		SELECT %s
+		FROM %s
+		WHERE %s
+		GROUP BY %s
+	`
+	aqi.Query = fmt.Sprintf(queryStr, columnStr, ai.Table, whereClause, groupByStr)
+
+	CCSR, err := kubecost.NewCloudCostSetRange(start, end, timeutil.Day, ai.Key())
+	if err != nil {
+		return nil, err
+	}
+
+	// Generate row handling function.
+	rowHandler := func(row types.Row) {
+		err2 := ai.RowToCloudCost(row, aqi, CCSR)
+		if err2 != nil {
+			log.Errorf("AthenaIntegration: queryCloudCostCompute: error while parsing row: %s", err2.Error())
+		}
+	}
+	log.Debugf("AthenaIntegration[%s]: queryCloudCostCompute: querying: %s", ai.Key(), aqi.Query)
+	// Query CUR data and fill out CCSR
+	err = ai.Query(context.TODO(), aqi.Query, GetAthenaQueryFunc(rowHandler))
+	if err != nil {
+		return nil, err
+	}
+
+	// TODO: May not be needed anymore?
+	for _, ccs := range CCSR.CloudCostSets {
+		log.Debugf("AthenaIntegration[%s]: queryCloudCostCompute: writing compute items for window %s: %d", ai.Key(), ccs.Window, len(ccs.CloudCosts))
+		ai.ConnectionStatus = ai.GetConnectionStatusFromResult(ccs, ai.ConnectionStatus)
+	}
+	return CCSR, nil
+
+}
+
+func (ai *AthenaIntegration) GetListCostColumn() string {
+	var listCostBuilder strings.Builder
+	listCostBuilder.WriteString("CASE line_item_line_item_type")
+	listCostBuilder.WriteString(" WHEN 'EdpDiscount' THEN 0")
+	listCostBuilder.WriteString(" WHEN 'PrivateRateDiscount' THEN 0")
+	listCostBuilder.WriteString(" ELSE ")
+	listCostBuilder.WriteString(AthenaPricingColumn)
+	listCostBuilder.WriteString(" END")
+	return listCostBuilder.String()
+}
+
+func (ai *AthenaIntegration) GetNetCostColumn(allColumns map[string]bool) string {
+	netCostColumn := ""
+	if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
+		netCostColumn = AthenaNetPricingColumn
+	} else { // Non-net for if there's no net pricing.
+		netCostColumn = AthenaPricingColumn
+	}
+	return netCostColumn
+}
+
+func (ai *AthenaIntegration) GetAmortizedNetCostColumn(allColumns map[string]bool) string {
+	amortizedNetCostCase := ""
+	if allColumns[AthenaNetPricingColumn] { // if Net pricing exists
+		amortizedNetCostCase = ai.GetAmortizedNetCostCase(allColumns)
+	} else { // Non-net for if there's no net pricing.
+		amortizedNetCostCase = ai.GetAmortizedCostCase(allColumns)
+	}
+	return amortizedNetCostCase
+}
+
+// getIsKubernetesColumn generates a boolean column which determines whether a line item is from kubernetes
+func (ai *AthenaIntegration) GetIsKubernetesColumn(allColumns map[string]bool) string {
+	return ai.GetIsKubernetesCase(allColumns)
+}
+
+// getKubernetesCostColumn generates a double column which determines the cost of k8s items in an aggregate
+func (ai *AthenaIntegration) GetKubernetesCostColumn(allColumns map[string]bool, pricingCase string) string {
+	k8sCase := ai.GetIsKubernetesCase(allColumns)
+	return fmt.Sprintf("CAST((%s) as double) * (%s)", k8sCase, pricingCase)
+
+}
+
+func (ai *AthenaIntegration) RemoveColumnAliases(columns []string) {
+	for i, column := range columns {
+		if strings.Contains(column, " as ") {
+			columnValues := strings.Split(column, " as ")
+			columns[i] = columnValues[0]
+		}
+	}
+}
+
+func (ai *AthenaIntegration) ConvertLabelToAWSTag(label string) string {
+	// if the label already has the column prefix assume that it is in the correct format
+	if strings.HasPrefix(label, LabelColumnPrefix) {
+		return label
+	}
+	// replace characters with underscore
+	tag := label
+	tag = strings.ReplaceAll(tag, ".", "_")
+	tag = strings.ReplaceAll(tag, "/", "_")
+	tag = strings.ReplaceAll(tag, ":", "_")
+	tag = strings.ReplaceAll(tag, "-", "_")
+	// add prefix and return
+	return LabelColumnPrefix + tag
+}
+
+func (ai *AthenaIntegration) GetAmortizedCostCase(allColumns map[string]bool) string {
+	// Use unblended costs if Reserved Instances/Savings Plans aren't in use
+	if !allColumns[AthenaRIPricingColumn] && !allColumns[AthenaSPPricingColumn] {
+		return AthenaPricingColumn
+	}
+
+	var costBuilder strings.Builder
+	costBuilder.WriteString("CASE line_item_line_item_type")
+	if allColumns[AthenaRIPricingColumn] {
+		costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
+		costBuilder.WriteString(AthenaRIPricingColumn)
+	}
+
+	if allColumns[AthenaSPPricingColumn] {
+		costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
+		costBuilder.WriteString(AthenaSPPricingColumn)
+	}
+
+	costBuilder.WriteString(" ELSE ")
+	costBuilder.WriteString(AthenaPricingColumn)
+	costBuilder.WriteString(" END")
+	return costBuilder.String()
+}
+
+func (ai *AthenaIntegration) GetAmortizedNetCostCase(allColumns map[string]bool) string {
+	// Use net unblended costs if Reserved Instances/Savings Plans aren't in use
+	if !allColumns[AthenaNetRIPricingColumn] && !allColumns[AthenaNetSPPricingColumn] {
+		return AthenaNetPricingColumn
+	}
+
+	var costBuilder strings.Builder
+	costBuilder.WriteString("CASE line_item_line_item_type")
+	if allColumns[AthenaNetRIPricingColumn] {
+		costBuilder.WriteString(" WHEN 'DiscountedUsage' THEN ")
+		costBuilder.WriteString(AthenaNetRIPricingColumn)
+	}
+
+	if allColumns[AthenaNetSPPricingColumn] {
+		costBuilder.WriteString(" WHEN 'SavingsPlanCoveredUsage' THEN ")
+		costBuilder.WriteString(AthenaNetSPPricingColumn)
+	}
+
+	costBuilder.WriteString(" ELSE ")
+	costBuilder.WriteString(AthenaNetPricingColumn)
+	costBuilder.WriteString(" END")
+	return costBuilder.String()
+}
+
+// GetIsKubernetesCase builds a "CASE" clause which attempts to determine if a line item is kubernetes based on labels
+// that may be available in the CUR
+func (ai *AthenaIntegration) GetIsKubernetesCase(allColumns map[string]bool) string {
+	// k8sColumns is a list of columns where the presence of a value indicates that a resource is part of a kubernetes cluster
+	k8sColumns := []string{
+		"resource_tags_aws_eks_cluster_name",
+		"resource_tags_user_eks_cluster_name",
+		"resource_tags_user_alpha_eksctl_io_cluster_name",
+		"resource_tags_user_kubernetes_io_service_name",
+		"resource_tags_user_kubernetes_io_created_for_pvc_name",
+		"resource_tags_user_kubernetes_io_created_for_pv_name",
+	}
+	var k8sBuilder strings.Builder
+
+	k8sBuilder.WriteString("CASE ")
+	// EKS is always kubernetes
+	k8sBuilder.WriteString("WHEN line_item_product_code = 'AmazonEKS' THEN TRUE ")
+	for _, k8sColumn := range k8sColumns {
+		if _, ok := allColumns[k8sColumn]; ok {
+			k8sBuilder.WriteString("WHEN ")
+			k8sBuilder.WriteString(k8sColumn)
+			k8sBuilder.WriteString(" <> '' THEN TRUE ")
+		}
+	}
+
+	k8sBuilder.WriteString("ELSE FALSE END")
+	return k8sBuilder.String()
+}
+
+func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexes, ccsr *kubecost.CloudCostSetRange) error {
+	if len(row.Data) < len(aqi.ColumnIndexes) {
+		return fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
+	}
+
+	// Iterate through the slice of tag columns, assigning
+	// values to the column names, minus the tag prefix.
+	labels := kubecost.CloudCostLabels{}
+	labelValues := []string{}
+	for _, tagColumnName := range aqi.TagColumns {
+		labelName := strings.TrimPrefix(tagColumnName, LabelColumnPrefix)
+		value := GetAthenaRowValue(row, aqi.ColumnIndexes, tagColumnName)
+		if value != "" {
+			labels[labelName] = value
+			labelValues = append(labelValues, value)
+		}
+	}
+
+	invoiceEntityID := GetAthenaRowValue(row, aqi.ColumnIndexes, "bill_payer_account_id")
+	accountID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_account_id")
+	startStr := GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaDateTruncColumn)
+	providerID := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_resource_id")
+	productCode := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_product_code")
+	usageType := GetAthenaRowValue(row, aqi.ColumnIndexes, "line_item_usage_type")
+	isNode, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaIsNode))
+	isVol, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaIsVol))
+	isNetwork, _ := strconv.ParseBool(GetAthenaRowValue(row, aqi.ColumnIndexes, AthenaIsNetwork))
+
+	listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
+	if err != nil {
+		return err
+	}
+
+	listK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListK8sCostColumn)
+	if err != nil {
+		return err
+	}
+
+	netCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetCostColumn)
+	if err != nil {
+		return err
+	}
+
+	netK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetK8sCostColumn)
+	if err != nil {
+		return err
+	}
+
+	amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
+	if err != nil {
+		return err
+	}
+
+	amortizedNetK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetK8sCostColumn)
+	if err != nil {
+		return err
+	}
+	amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
+	if err != nil {
+		return err
+	}
+
+	amortizedK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedK8sCostColumn)
+	if err != nil {
+		return err
+	}
+
+	invoicedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.InvoicedCostColumn)
+	if err != nil {
+		return err
+	}
+
+	invoicedK8sCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.InvoicedK8sCostColumn)
+	if err != nil {
+		return err
+	}
+
+	// Identify resource category in the CUR
+	category := SelectAWSCategory(isNode, isVol, isNetwork, providerID, productCode)
+
+	// Retrieve final stanza of product code for ProviderID
+	if productCode == "AWSELB" || productCode == "AmazonFSx" {
+		providerID = ParseARN(providerID)
+	}
+
+	if productCode == "AmazonEKS" && category == kubecost.ComputeCategory {
+		if strings.Contains(usageType, "CPU") {
+			providerID = fmt.Sprintf("%s/CPU", providerID)
+		} else if strings.Contains(usageType, "GB") {
+			providerID = fmt.Sprintf("%s/RAM", providerID)
+		}
+	}
+
+	properties := kubecost.CloudCostProperties{
+		ProviderID:      providerID,
+		Provider:        kubecost.AWSProvider,
+		AccountID:       accountID,
+		InvoiceEntityID: invoiceEntityID,
+		Service:         productCode,
+		Category:        category,
+		Labels:          labels,
+	}
+
+	start, err := time.Parse(AthenaDateLayout, startStr)
+	if err != nil {
+		return fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
+	}
+	end := start.AddDate(0, 0, 1)
+
+	cc := &kubecost.CloudCost{
+		Properties: &properties,
+		Window:     kubecost.NewWindow(&start, &end),
+		ListCost: kubecost.CostMetric{
+			Cost:              listCost,
+			KubernetesPercent: ai.CalculateK8sPercent(listCost, listK8sCost),
+		},
+		NetCost: kubecost.CostMetric{
+			Cost:              netCost,
+			KubernetesPercent: ai.CalculateK8sPercent(netCost, netK8sCost),
+		},
+		AmortizedNetCost: kubecost.CostMetric{
+			Cost:              amortizedNetCost,
+			KubernetesPercent: ai.CalculateK8sPercent(amortizedNetCost, amortizedNetK8sCost),
+		},
+		AmortizedCost: kubecost.CostMetric{
+			Cost:              amortizedCost,
+			KubernetesPercent: ai.CalculateK8sPercent(amortizedCost, amortizedK8sCost),
+		},
+		InvoicedCost: kubecost.CostMetric{
+			Cost:              invoicedCost,
+			KubernetesPercent: ai.CalculateK8sPercent(invoicedCost, invoicedK8sCost),
+		},
+	}
+
+	ccsr.LoadCloudCost(cc)
+	return nil
+}
+
+func (ai *AthenaIntegration) CalculateK8sPercent(cost, k8sCost float64) float64 {
+	// Calculate percent of cost that is k8s with the k8sCost
+	k8sPercent := 0.0
+	if k8sCost != 0.0 && cost != 0.0 {
+		k8sPercent = k8sCost / cost
+	}
+	return k8sPercent
+}
+
+func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {
+	if result.IsEmpty() && currentStatus != cloud.SuccessfulConnection {
+		return cloud.MissingData
+	}
+	return cloud.SuccessfulConnection
+}
+
+func (ai *AthenaIntegration) GetConnectionStatus() string {
+	// initialize status if it has not done so; this can happen if the integration is inactive
+	if ai.ConnectionStatus.String() == "" {
+		ai.ConnectionStatus = cloud.InitialStatus
+	}
+
+	return ai.ConnectionStatus.String()
+}

+ 65 - 0
pkg/cloud/aws/athenaintegration_test.go

@@ -0,0 +1,65 @@
+package aws
+
+import (
+	"os"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/pkg/util/json"
+	"github.com/opencost/opencost/pkg/util/timeutil"
+)
+
+func GetCloudCost_Test(t *testing.T) {
+	athenaConfigPath := os.Getenv("ATHENA_CONFIGURATION")
+	if athenaConfigPath == "" {
+		t.Skip("skipping integration test, set environment variable ATHENA_CONFIGURATION")
+	}
+	athenaConfigBin, err := os.ReadFile(athenaConfigPath)
+	if err != nil {
+		t.Fatalf("failed to read config file: %s", err.Error())
+	}
+	var athenaConfig AthenaConfiguration
+	err = json.Unmarshal(athenaConfigBin, &athenaConfig)
+	if err != nil {
+		t.Fatalf("failed to unmarshal config from JSON: %s", err.Error())
+	}
+	testCases := map[string]struct {
+		integration *AthenaIntegration
+		start       time.Time
+		end         time.Time
+		expected    bool
+	}{
+		// No CUR data is expected within 2 days of now
+		"too_recent_window": {
+			integration: &AthenaIntegration{
+				AthenaQuerier: AthenaQuerier{
+					AthenaConfiguration: athenaConfig,
+				},
+			},
+			end:      time.Now(),
+			start:    time.Now().Add(-timeutil.Day),
+			expected: true,
+		},
+		// CUR data should be available
+		"last week window": {
+			integration: &AthenaIntegration{
+				AthenaQuerier: AthenaQuerier{
+					AthenaConfiguration: athenaConfig,
+				},
+			},
+			end:      time.Now().Add(-7 * timeutil.Day),
+			start:    time.Now().Add(-8 * timeutil.Day),
+			expected: false,
+		},
+	}
+	for name, testCase := range testCases {
+		t.Run(name, func(t *testing.T) {
+			actual, err := testCase.integration.GetCloudCost(testCase.start, testCase.end)
+			if err != nil {
+				t.Errorf("Other error during testing %s", err)
+			} else if actual.IsEmpty() != testCase.expected {
+				t.Errorf("Incorrect result, actual emptiness: %t, expected: %t", actual.IsEmpty(), testCase.expected)
+			}
+		})
+	}
+}

+ 53 - 2
pkg/cloud/aws/athenaquerier.go

@@ -8,6 +8,7 @@ import (
 	"strings"
 	"time"
 
+	"github.com/opencost/opencost/pkg/cloud"
 	cloudconfig "github.com/opencost/opencost/pkg/cloud/config"
 
 	"github.com/aws/aws-sdk-go-v2/aws"
@@ -20,6 +21,7 @@ import (
 
 type AthenaQuerier struct {
 	AthenaConfiguration
+	ConnectionStatus cloud.ConnectionStatus
 }
 
 func (aq *AthenaQuerier) Equals(config cloudconfig.Config) bool {
@@ -31,9 +33,58 @@ func (aq *AthenaQuerier) Equals(config cloudconfig.Config) bool {
 	return aq.AthenaConfiguration.Equals(&thatConfig.AthenaConfiguration)
 }
 
+// GetColumns returns a list of the names of all columns in the configured
+// Athena table
+func (aq *AthenaQuerier) GetColumns() (map[string]bool, error) {
+	columnSet := map[string]bool{}
+
+	// This Query is supported by Athena tables and views
+	q := `SELECT column_name FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'`
+	query := fmt.Sprintf(q, aq.Database, aq.Table)
+
+	athenaErr := aq.Query(context.TODO(), query, GetAthenaQueryFunc(func(row types.Row) {
+		columnSet[*row.Data[0].VarCharValue] = true
+	}))
+
+	if athenaErr != nil {
+		return columnSet, athenaErr
+	}
+
+	if len(columnSet) == 0 {
+		log.Infof("No columns retrieved from Athena")
+	}
+
+	return columnSet, nil
+}
+
+func (aq *AthenaQuerier) Query(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
+	err := aq.Validate()
+	if err != nil {
+		aq.ConnectionStatus = cloud.InvalidConfiguration
+		return err
+	}
+
+	log.Debugf("AthenaQuerier[%s]: Performing Query: %s", aq.Key(), query)
+	err = aq.queryAthenaPaginated(ctx, query, fn)
+	if err != nil {
+		aq.ConnectionStatus = cloud.FailedConnection
+		return err
+	}
+	return nil
+}
+
+func (aq *AthenaQuerier) GetAthenaClient() (*athena.Client, error) {
+	cfg, err := aq.Authorizer.CreateAWSConfig(aq.Region)
+	if err != nil {
+		return nil, err
+	}
+	cli := athena.NewFromConfig(cfg)
+	return cli, nil
+}
+
 // QueryAthenaPaginated executes athena query and processes results. An error from this method indicates a
 // FAILED_CONNECTION CloudConnectionStatus and should immediately stop the caller to maintain the correct CloudConnectionStatus
-func (aq *AthenaQuerier) QueryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
+func (aq *AthenaQuerier) queryAthenaPaginated(ctx context.Context, query string, fn func(*athena.GetQueryResultsOutput) bool) error {
 
 	queryExecutionCtx := &types.QueryExecutionContext{
 		Database: aws.String(aq.Database),
@@ -54,7 +105,7 @@ func (aq *AthenaQuerier) QueryAthenaPaginated(ctx context.Context, query string,
 	}
 
 	// Create Athena Client
-	cli, err := aq.AthenaConfiguration.GetAthenaClient()
+	cli, err := aq.GetAthenaClient()
 
 	// Query Athena
 	startQueryExecutionOutput, err := cli.StartQueryExecution(ctx, startQueryExecutionInput)

+ 11 - 1
pkg/cloud/aws/provider.go

@@ -1779,7 +1779,17 @@ func (aws *AWS) findCostForDisk(disk *ec2Types.Volume) (*float64, error) {
 
 	key := "us-east-2" + "," + class
 
-	priceStr := aws.Pricing[key].PV.Cost
+	pricing, ok := aws.Pricing[key]
+	if !ok {
+		return nil, fmt.Errorf("no pricing data for key '%s'", key)
+	}
+	if pricing == nil {
+		return nil, fmt.Errorf("nil pricing data for key '%s'", key)
+	}
+	if pricing.PV == nil {
+		return nil, fmt.Errorf("pricing for key '%s' has nil PV", key)
+	}
+	priceStr := pricing.PV.Cost
 
 	price, err := strconv.ParseFloat(priceStr, 64)
 	if err != nil {

+ 12 - 0
pkg/cloud/cloudcostintegration.go

@@ -0,0 +1,12 @@
+package cloud
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/pkg/kubecost"
+)
+
+// CloudCostIntegration is an interface for retrieving daily granularity CloudCost data for a given range
+type CloudCostIntegration interface {
+	GetCloudCost(time.Time, time.Time) (*kubecost.CloudCostSetRange, error)
+}

+ 5 - 0
pkg/cloud/connectionstatus.go

@@ -40,3 +40,8 @@ const (
 func (cs ConnectionStatus) String() string {
 	return string(cs)
 }
+
+// EmptyChecker provides an interface for to check if a result is empty which can be useful for setting a MissingData status
+type EmptyChecker interface {
+	IsEmpty() bool
+}

+ 7 - 0
pkg/costmodel/assets.go

@@ -133,6 +133,13 @@ func (cm *CostModel) ComputeAssets(start, end time.Time) (*kubecost.AssetSet, er
 		node.GPUCost = n.GPUCost
 		node.GPUCount = n.GPUCount
 		node.RAMCost = n.RAMCost
+
+		node.Overhead = &kubecost.NodeOverhead{
+			RamOverheadFraction: n.Overhead.RamOverheadFraction,
+			CpuOverheadFraction: n.Overhead.CpuOverheadFraction,
+			OverheadCostFraction: ((n.Overhead.CpuOverheadFraction * n.CPUCost) +
+				(n.Overhead.RamOverheadFraction * n.RAMCost)) / node.TotalCost(),
+		}
 		node.Discount = n.Discount
 		if n.Preemptible {
 			node.Preemptible = 1.0

+ 27 - 11
pkg/costmodel/cluster.go

@@ -472,6 +472,10 @@ func ClusterDisks(client prometheus.Client, provider models.Provider, start, end
 	return diskMap, nil
 }
 
+type NodeOverhead struct {
+	CpuOverheadFraction float64
+	RamOverheadFraction float64
+}
 type Node struct {
 	Cluster         string
 	Name            string
@@ -494,6 +498,7 @@ type Node struct {
 	CostPerCPUHr    float64
 	CostPerRAMGiBHr float64
 	CostPerGPUHr    float64
+	Overhead        *NodeOverhead
 }
 
 // GKE lies about the number of cores e2 nodes have. This table
@@ -567,9 +572,11 @@ func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.
 	optionalCtx := prom.NewNamedContext(client, prom.ClusterOptionalContextName)
 
 	queryNodeCPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_cpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
-	queryNodeCPUCores := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeCPUCoresCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeCPUCoresAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_cpu_cores[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
 	queryNodeRAMHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_ram_hourly_cost[%s])) by (%s, node, instance_type, provider_id) / 1024 / 1024 / 1024`, durStr, env.GetPromClusterLabel())
-	queryNodeRAMBytes := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeRAMBytesCapacity := fmt.Sprintf(`avg(avg_over_time(kube_node_status_capacity_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
+	queryNodeRAMBytesAllocatable := fmt.Sprintf(`avg(avg_over_time(kube_node_status_allocatable_memory_bytes[%s])) by (%s, node)`, durStr, env.GetPromClusterLabel())
 	queryNodeGPUCount := fmt.Sprintf(`avg(avg_over_time(node_gpu_count[%s])) by (%s, node, provider_id)`, durStr, env.GetPromClusterLabel())
 	queryNodeGPUHourlyCost := fmt.Sprintf(`avg(avg_over_time(node_gpu_hourly_cost[%s])) by (%s, node, instance_type, provider_id)`, durStr, env.GetPromClusterLabel())
 	queryNodeCPUModeTotal := fmt.Sprintf(`sum(rate(node_cpu_seconds_total[%s:%dm])) by (kubernetes_node, %s, mode)`, durStr, minsPerResolution, env.GetPromClusterLabel())
@@ -581,9 +588,11 @@ func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.
 
 	// Return errors if these fail
 	resChNodeCPUHourlyCost := requiredCtx.QueryAtTime(queryNodeCPUHourlyCost, t)
-	resChNodeCPUCores := requiredCtx.QueryAtTime(queryNodeCPUCores, t)
+	resChNodeCPUCoresCapacity := requiredCtx.QueryAtTime(queryNodeCPUCoresCapacity, t)
+	resChNodeCPUCoresAllocatable := requiredCtx.QueryAtTime(queryNodeCPUCoresAllocatable, t)
 	resChNodeRAMHourlyCost := requiredCtx.QueryAtTime(queryNodeRAMHourlyCost, t)
-	resChNodeRAMBytes := requiredCtx.QueryAtTime(queryNodeRAMBytes, t)
+	resChNodeRAMBytesCapacity := requiredCtx.QueryAtTime(queryNodeRAMBytesCapacity, t)
+	resChNodeRAMBytesAllocatable := requiredCtx.QueryAtTime(queryNodeRAMBytesAllocatable, t)
 	resChNodeGPUCount := requiredCtx.QueryAtTime(queryNodeGPUCount, t)
 	resChNodeGPUHourlyCost := requiredCtx.QueryAtTime(queryNodeGPUHourlyCost, t)
 	resChActiveMins := requiredCtx.QueryAtTime(queryActiveMins, t)
@@ -596,11 +605,13 @@ func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.
 	resChLabels := optionalCtx.QueryAtTime(queryLabels, t)
 
 	resNodeCPUHourlyCost, _ := resChNodeCPUHourlyCost.Await()
-	resNodeCPUCores, _ := resChNodeCPUCores.Await()
+	resNodeCPUCoresCapacity, _ := resChNodeCPUCoresCapacity.Await()
+	resNodeCPUCoresAllocatable, _ := resChNodeCPUCoresAllocatable.Await()
 	resNodeGPUCount, _ := resChNodeGPUCount.Await()
 	resNodeGPUHourlyCost, _ := resChNodeGPUHourlyCost.Await()
 	resNodeRAMHourlyCost, _ := resChNodeRAMHourlyCost.Await()
-	resNodeRAMBytes, _ := resChNodeRAMBytes.Await()
+	resNodeRAMBytesCapacity, _ := resChNodeRAMBytesCapacity.Await()
+	resNodeRAMBytesAllocatable, _ := resChNodeRAMBytesAllocatable.Await()
 	resIsSpot, _ := resChIsSpot.Await()
 	resNodeCPUModeTotal, _ := resChNodeCPUModeTotal.Await()
 	resNodeRAMSystemPct, _ := resChNodeRAMSystemPct.Await()
@@ -633,8 +644,12 @@ func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.
 	clusterAndNameToTypeIntermediate := mergeTypeMaps(clusterAndNameToType1, clusterAndNameToType2)
 	clusterAndNameToType := mergeTypeMaps(clusterAndNameToTypeIntermediate, clusterAndNameToType3)
 
-	cpuCoresMap := buildCPUCoresMap(resNodeCPUCores)
-	ramBytesMap := buildRAMBytesMap(resNodeRAMBytes)
+	cpuCoresCapacityMap := buildCPUCoresMap(resNodeCPUCoresCapacity)
+	ramBytesCapacityMap := buildRAMBytesMap(resNodeRAMBytesCapacity)
+
+	cpuCoresAllocatableMap := buildCPUCoresMap(resNodeCPUCoresAllocatable)
+	ramBytesAllocatableMap := buildRAMBytesMap(resNodeRAMBytesAllocatable)
+	overheadMap := buildOverheadMap(ramBytesCapacityMap, ramBytesAllocatableMap, cpuCoresCapacityMap, cpuCoresAllocatableMap)
 
 	ramUserPctMap := buildRAMUserPctMap(resNodeRAMUserPct)
 	ramSystemPctMap := buildRAMSystemPctMap(resNodeRAMSystemPct)
@@ -643,13 +658,13 @@ func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.
 
 	labelsMap := buildLabelsMap(resLabels)
 
-	costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresMap)
-	costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesMap)
+	costTimesMinuteAndCount(activeDataMap, cpuCostMap, cpuCoresCapacityMap)
+	costTimesMinuteAndCount(activeDataMap, ramCostMap, ramBytesCapacityMap)
 	costTimesMinute(activeDataMap, gpuCostMap) // there's no need to do a weird "nodeIdentifierNoProviderID" type match since gpuCounts have a providerID
 
 	nodeMap := buildNodeMap(
 		cpuCostMap, ramCostMap, gpuCostMap, gpuCountMap,
-		cpuCoresMap, ramBytesMap, ramUserPctMap,
+		cpuCoresCapacityMap, ramBytesCapacityMap, ramUserPctMap,
 		ramSystemPctMap,
 		cpuBreakdownMap,
 		activeDataMap,
@@ -657,6 +672,7 @@ func ClusterNodes(cp models.Provider, client prometheus.Client, start, end time.
 		labelsMap,
 		clusterAndNameToType,
 		resolution,
+		overheadMap,
 	)
 
 	c, err := cp.GetConfig()

+ 43 - 0
pkg/costmodel/cluster_helpers.go

@@ -426,6 +426,43 @@ func buildCPUBreakdownMap(resNodeCPUModeTotal []*prom.QueryResult) map[nodeIdent
 	return cpuBreakdownMap
 }
 
+func buildOverheadMap(capRam, allocRam, capCPU, allocCPU map[nodeIdentifierNoProviderID]float64) map[nodeIdentifierNoProviderID]*NodeOverhead {
+	m := make(map[nodeIdentifierNoProviderID]*NodeOverhead, len(capRam))
+
+	for identifier, ramCapacity := range capRam {
+		allocatableRam, ok := allocRam[identifier]
+		if !ok {
+			log.Warnf("Could not find allocatable ram for node %s", identifier.Name)
+			continue
+		}
+		overheadBytes := ramCapacity - allocatableRam
+		m[identifier] = &NodeOverhead{
+			RamOverheadFraction: overheadBytes / ramCapacity,
+		}
+	}
+
+	for identifier, cpuCapacity := range capCPU {
+		allocatableCPU, ok := allocCPU[identifier]
+		if !ok {
+			log.Warnf("Could not find allocatable cpu for node %s", identifier.Name)
+			continue
+		}
+
+		overhead := cpuCapacity - allocatableCPU
+
+		if _, found := m[identifier]; found {
+			m[identifier].CpuOverheadFraction = overhead / cpuCapacity
+		} else {
+			m[identifier] = &NodeOverhead{
+				CpuOverheadFraction: overhead / cpuCapacity,
+			}
+		}
+
+	}
+
+	return m
+}
+
 func buildRAMUserPctMap(resNodeRAMUserPct []*prom.QueryResult) map[nodeIdentifierNoProviderID]float64 {
 
 	m := make(map[nodeIdentifierNoProviderID]float64)
@@ -707,6 +744,7 @@ func buildNodeMap(
 	labelsMap map[nodeIdentifierNoProviderID]map[string]string,
 	clusterAndNameToType map[nodeIdentifierNoProviderID]string,
 	res time.Duration,
+	overheadMap map[nodeIdentifierNoProviderID]*NodeOverhead,
 ) map[NodeIdentifier]*Node {
 
 	nodeMap := make(map[NodeIdentifier]*Node)
@@ -784,6 +822,11 @@ func buildNodeMap(
 		if labels, ok := labelsMap[clusterAndNameID]; ok {
 			nodePtr.Labels = labels
 		}
+
+		if overhead, ok := overheadMap[clusterAndNameID]; ok {
+			nodePtr.Overhead = overhead
+		}
+
 	}
 
 	return nodeMap

+ 15 - 0
pkg/costmodel/cluster_helpers_test.go

@@ -150,6 +150,7 @@ func TestBuildNodeMap(t *testing.T) {
 		labelsMap            map[nodeIdentifierNoProviderID]map[string]string
 		clusterAndNameToType map[nodeIdentifierNoProviderID]string
 		expected             map[NodeIdentifier]*Node
+		overheadMap          map[nodeIdentifierNoProviderID]*NodeOverhead
 	}{
 		{
 			name:     "empty",
@@ -657,6 +658,15 @@ func TestBuildNodeMap(t *testing.T) {
 					Name:    "node1",
 				}: "e2-medium", // for this node type
 			},
+			overheadMap: map[nodeIdentifierNoProviderID]*NodeOverhead{
+				{
+					Cluster: "cluster1",
+					Name:    "node1",
+				}: {
+					CpuOverheadFraction: 0.5,
+					RamOverheadFraction: 0.25,
+				}, // for this node type
+			},
 			expected: map[NodeIdentifier]*Node{
 				{
 					Cluster:    "cluster1",
@@ -671,6 +681,10 @@ func TestBuildNodeMap(t *testing.T) {
 					CPUCores:     partialCPUMap["e2-medium"],
 					CPUBreakdown: &ClusterCostsBreakdown{},
 					RAMBreakdown: &ClusterCostsBreakdown{},
+					Overhead: &NodeOverhead{
+						CpuOverheadFraction: 0.5,
+						RamOverheadFraction: 0.25,
+					},
 				},
 			},
 		},
@@ -688,6 +702,7 @@ func TestBuildNodeMap(t *testing.T) {
 				testCase.labelsMap,
 				testCase.clusterAndNameToType,
 				time.Minute,
+				testCase.overheadMap,
 			)
 
 			if !reflect.DeepEqual(result, testCase.expected) {

+ 16 - 0
pkg/kubecost/asset.go

@@ -1753,6 +1753,14 @@ func (n *Network) String() string {
 	return toString(n)
 }
 
+// NodeOverhead represents the delta between the allocatable resources
+// of the node and the node nameplate capacity
+type NodeOverhead struct {
+	CpuOverheadFraction  float64
+	RamOverheadFraction  float64
+	OverheadCostFraction float64
+}
+
 // Node is an Asset representing a single node in a cluster
 type Node struct {
 	Properties   *AssetProperties
@@ -1773,6 +1781,7 @@ type Node struct {
 	RAMCost      float64
 	Discount     float64
 	Preemptible  float64
+	Overhead     *NodeOverhead // @bingen:field[version=19]
 }
 
 // NewNode creates and returns a new Node Asset
@@ -2001,6 +2010,13 @@ func (n *Node) add(that *Node) {
 	n.GPUCost += that.GPUCost
 	n.RAMCost += that.RAMCost
 	n.Adjustment += that.Adjustment
+
+	if n.Overhead != nil && that.Overhead != nil {
+
+		n.Overhead.RamOverheadFraction = (n.Overhead.RamOverheadFraction*n.RAMCost + that.Overhead.RamOverheadFraction*that.RAMCost) / totalRAMCost
+		n.Overhead.CpuOverheadFraction = (n.Overhead.CpuOverheadFraction*n.CPUCost + that.Overhead.CpuOverheadFraction*that.CPUCost) / totalCPUCost
+		n.Overhead.OverheadCostFraction = ((n.Overhead.CpuOverheadFraction * n.CPUCost) + (n.Overhead.RamOverheadFraction * n.RAMCost)) / n.TotalCost()
+	}
 }
 
 // Clone returns a deep copy of the given Node

+ 4 - 0
pkg/kubecost/asset_json.go

@@ -494,7 +494,11 @@ func (n *Node) MarshalJSON() ([]byte, error) {
 	jsonEncodeFloat64(buffer, "gpuCount", n.GPUs(), ",")
 	jsonEncodeFloat64(buffer, "ramCost", n.RAMCost, ",")
 	jsonEncodeFloat64(buffer, "adjustment", n.Adjustment, ",")
+	if n.Overhead != nil {
+		jsonEncode(buffer, "overhead", n.Overhead, ",")
+	}
 	jsonEncodeFloat64(buffer, "totalCost", n.TotalCost(), "")
+
 	buffer.WriteString("}")
 	return buffer.Bytes(), nil
 }

+ 3 - 2
pkg/kubecost/bingen.go

@@ -26,7 +26,7 @@ package kubecost
 // @bingen:generate:CoverageSet
 
 // Asset Version Set: Includes Asset pipeline specific resources
-// @bingen:set[name=Assets,version=18]
+// @bingen:set[name=Assets,version=19]
 // @bingen:generate:Any
 // @bingen:generate:Asset
 // @bingen:generate:AssetLabels
@@ -41,6 +41,7 @@ package kubecost
 // @bingen:generate:LoadBalancer
 // @bingen:generate:Network
 // @bingen:generate:Node
+// @bingen:generate:NodeOverhead
 // @bingen:generate:SharedAsset
 // @bingen:end
 
@@ -73,7 +74,7 @@ package kubecost
 // @bingen:generate:AuditSetRange
 // @bingen:end
 
-// @bingen:set[name=CloudCost,version=1]
+// @bingen:set[name=CloudCost,version=2]
 // @bingen:generate:CloudCost
 // @bingen:generate:CostMetric
 // @bingen:generate[stringtable]:CloudCostSet

+ 15 - 3
pkg/kubecost/cloudcost.go

@@ -18,10 +18,11 @@ type CloudCost struct {
 	NetCost          CostMetric           `json:"netCost"`
 	AmortizedNetCost CostMetric           `json:"amortizedNetCost"`
 	InvoicedCost     CostMetric           `json:"invoicedCost"`
+	AmortizedCost    CostMetric           `json:"amortizedCost"`
 }
 
 // NewCloudCost instantiates a new CloudCost
-func NewCloudCost(start, end time.Time, ccProperties *CloudCostProperties, kubernetesPercent, listCost, netCost, amortizedNetCost, invoicedCost float64) *CloudCost {
+func NewCloudCost(start, end time.Time, ccProperties *CloudCostProperties, kubernetesPercent, listCost, netCost, amortizedNetCost, invoicedCost, amortizedCost float64) *CloudCost {
 	return &CloudCost{
 		Properties: ccProperties,
 		Window:     NewWindow(&start, &end),
@@ -38,7 +39,11 @@ func NewCloudCost(start, end time.Time, ccProperties *CloudCostProperties, kuber
 			KubernetesPercent: kubernetesPercent,
 		},
 		InvoicedCost: CostMetric{
-			Cost:              listCost,
+			Cost:              invoicedCost,
+			KubernetesPercent: kubernetesPercent,
+		},
+		AmortizedCost: CostMetric{
+			Cost:              amortizedCost,
 			KubernetesPercent: kubernetesPercent,
 		},
 	}
@@ -52,6 +57,7 @@ func (cc *CloudCost) Clone() *CloudCost {
 		NetCost:          cc.NetCost.Clone(),
 		AmortizedNetCost: cc.AmortizedNetCost.Clone(),
 		InvoicedCost:     cc.InvoicedCost.Clone(),
+		AmortizedCost:    cc.AmortizedCost.Clone(),
 	}
 }
 
@@ -65,7 +71,8 @@ func (cc *CloudCost) Equal(that *CloudCost) bool {
 		cc.ListCost.Equal(that.ListCost) &&
 		cc.NetCost.Equal(that.NetCost) &&
 		cc.AmortizedNetCost.Equal(that.AmortizedNetCost) &&
-		cc.InvoicedCost.Equal(that.InvoicedCost)
+		cc.InvoicedCost.Equal(that.InvoicedCost) &&
+		cc.AmortizedCost.Equal(that.AmortizedCost)
 }
 
 func (cc *CloudCost) add(that *CloudCost) {
@@ -81,6 +88,7 @@ func (cc *CloudCost) add(that *CloudCost) {
 	cc.NetCost = cc.NetCost.add(that.NetCost)
 	cc.AmortizedNetCost = cc.AmortizedNetCost.add(that.AmortizedNetCost)
 	cc.InvoicedCost = cc.InvoicedCost.add(that.InvoicedCost)
+	cc.AmortizedCost = cc.AmortizedCost.add(that.AmortizedCost)
 
 	cc.Window = cc.Window.Expand(that.Window)
 }
@@ -131,6 +139,8 @@ func (cc *CloudCost) GetCostMetric(costMetricName string) (CostMetric, error) {
 		return cc.AmortizedNetCost, nil
 	case InvoicedCostMetric:
 		return cc.InvoicedCost, nil
+	case AmortizedCostMetric:
+		return cc.AmortizedCost, nil
 	}
 	return CostMetric{}, fmt.Errorf("invalid Cost Metric: %s", costMetricName)
 }
@@ -486,6 +496,7 @@ func (ccsr *CloudCostSetRange) LoadCloudCost(cloudCost *CloudCost) {
 				NetCost:          cloudCost.NetCost.percent(pct),
 				AmortizedNetCost: cloudCost.AmortizedNetCost.percent(pct),
 				InvoicedCost:     cloudCost.InvoicedCost.percent(pct),
+				AmortizedCost:    cloudCost.AmortizedCost.percent(pct),
 			}
 		}
 
@@ -507,6 +518,7 @@ const (
 	NetCostMetric          string = "NetCost"
 	AmortizedNetCostMetric string = "AmortizedNetCost"
 	InvoicedCostMetric     string = "InvoicedCost"
+	AmortizedCostMetric    string = "AmortizedCost"
 )
 
 type CostMetric struct {

+ 12 - 0
pkg/kubecost/cloudcost_test.go

@@ -43,6 +43,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 					NetCost:          CostMetric{Cost: 80, KubernetesPercent: 1},
 					AmortizedNetCost: CostMetric{Cost: 90, KubernetesPercent: 1},
 					InvoicedCost:     CostMetric{Cost: 95, KubernetesPercent: 1},
+					AmortizedCost:    CostMetric{Cost: 85, KubernetesPercent: 1},
 				},
 			},
 			ccsr: emtpyCCSR.Clone(),
@@ -58,6 +59,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 							NetCost:          CostMetric{Cost: 80, KubernetesPercent: 1},
 							AmortizedNetCost: CostMetric{Cost: 90, KubernetesPercent: 1},
 							InvoicedCost:     CostMetric{Cost: 95, KubernetesPercent: 1},
+							AmortizedCost:    CostMetric{Cost: 85, KubernetesPercent: 1},
 						},
 					},
 				},
@@ -82,6 +84,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 					NetCost:          CostMetric{Cost: 80, KubernetesPercent: 1},
 					AmortizedNetCost: CostMetric{Cost: 90, KubernetesPercent: 1},
 					InvoicedCost:     CostMetric{Cost: 95, KubernetesPercent: 1},
+					AmortizedCost:    CostMetric{Cost: 85, KubernetesPercent: 1},
 				},
 			},
 			ccsr: emtpyCCSR.Clone(),
@@ -97,6 +100,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 							NetCost:          CostMetric{Cost: 40, KubernetesPercent: 1},
 							AmortizedNetCost: CostMetric{Cost: 45, KubernetesPercent: 1},
 							InvoicedCost:     CostMetric{Cost: 47.5, KubernetesPercent: 1},
+							AmortizedCost:    CostMetric{Cost: 42.5, KubernetesPercent: 1},
 						},
 					},
 				},
@@ -111,6 +115,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 							NetCost:          CostMetric{Cost: 40, KubernetesPercent: 1},
 							AmortizedNetCost: CostMetric{Cost: 45, KubernetesPercent: 1},
 							InvoicedCost:     CostMetric{Cost: 47.5, KubernetesPercent: 1},
+							AmortizedCost:    CostMetric{Cost: 42.5, KubernetesPercent: 1},
 						},
 					},
 				},
@@ -130,6 +135,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 					NetCost:          CostMetric{Cost: 80, KubernetesPercent: 1},
 					AmortizedNetCost: CostMetric{Cost: 90, KubernetesPercent: 1},
 					InvoicedCost:     CostMetric{Cost: 95, KubernetesPercent: 1},
+					AmortizedCost:    CostMetric{Cost: 85, KubernetesPercent: 1},
 				},
 			},
 			ccsr: emtpyCCSR.Clone(),
@@ -145,6 +151,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 							NetCost:          CostMetric{Cost: 40, KubernetesPercent: 1},
 							AmortizedNetCost: CostMetric{Cost: 45, KubernetesPercent: 1},
 							InvoicedCost:     CostMetric{Cost: 47.5, KubernetesPercent: 1},
+							AmortizedCost:    CostMetric{Cost: 42.5, KubernetesPercent: 1},
 						},
 					},
 				},
@@ -169,6 +176,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 					NetCost:          CostMetric{Cost: 80, KubernetesPercent: 1},
 					AmortizedNetCost: CostMetric{Cost: 90, KubernetesPercent: 1},
 					InvoicedCost:     CostMetric{Cost: 95, KubernetesPercent: 1},
+					AmortizedCost:    CostMetric{Cost: 85, KubernetesPercent: 1},
 				},
 			},
 			ccsr: emtpyCCSR.Clone(),
@@ -194,6 +202,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 							NetCost:          CostMetric{Cost: 40, KubernetesPercent: 1},
 							AmortizedNetCost: CostMetric{Cost: 45, KubernetesPercent: 1},
 							InvoicedCost:     CostMetric{Cost: 47.5, KubernetesPercent: 1},
+							AmortizedCost:    CostMetric{Cost: 42.5, KubernetesPercent: 1},
 						},
 					},
 				},
@@ -208,6 +217,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 					NetCost:          CostMetric{Cost: 40, KubernetesPercent: 1},
 					AmortizedNetCost: CostMetric{Cost: 60, KubernetesPercent: 1},
 					InvoicedCost:     CostMetric{Cost: 50, KubernetesPercent: 1},
+					AmortizedCost:    CostMetric{Cost: 80, KubernetesPercent: 1},
 				},
 				{
 					Properties:       ccProperties1,
@@ -216,6 +226,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 					NetCost:          CostMetric{Cost: 60, KubernetesPercent: 0},
 					AmortizedNetCost: CostMetric{Cost: 40, KubernetesPercent: 0},
 					InvoicedCost:     CostMetric{Cost: 50, KubernetesPercent: 0},
+					AmortizedCost:    CostMetric{Cost: 20, KubernetesPercent: 0},
 				},
 			},
 			ccsr: emtpyCCSR.Clone(),
@@ -236,6 +247,7 @@ func TestCloudCost_LoadCloudCost(t *testing.T) {
 							NetCost:          CostMetric{Cost: 100, KubernetesPercent: 0.4},
 							AmortizedNetCost: CostMetric{Cost: 100, KubernetesPercent: 0.6},
 							InvoicedCost:     CostMetric{Cost: 100, KubernetesPercent: 0.5},
+							AmortizedCost:    CostMetric{Cost: 100, KubernetesPercent: 0.8},
 						},
 					},
 				},

+ 168 - 2
pkg/kubecost/kubecost_codecs.go

@@ -37,7 +37,7 @@ const (
 	DefaultCodecVersion uint8 = 17
 
 	// AssetsCodecVersion is used for any resources listed in the Assets version set
-	AssetsCodecVersion uint8 = 18
+	AssetsCodecVersion uint8 = 19
 
 	// AllocationCodecVersion is used for any resources listed in the Allocation version set
 	AllocationCodecVersion uint8 = 16
@@ -46,7 +46,7 @@ const (
 	AuditCodecVersion uint8 = 1
 
 	// CloudCostCodecVersion is used for any resources listed in the CloudCost version set
-	CloudCostCodecVersion uint8 = 1
+	CloudCostCodecVersion uint8 = 2
 )
 
 //--------------------------------------------------------------------------
@@ -86,6 +86,7 @@ var typeMap map[string]reflect.Type = map[string]reflect.Type{
 	"LoadBalancer":                  reflect.TypeOf((*LoadBalancer)(nil)).Elem(),
 	"Network":                       reflect.TypeOf((*Network)(nil)).Elem(),
 	"Node":                          reflect.TypeOf((*Node)(nil)).Elem(),
+	"NodeOverhead":                  reflect.TypeOf((*NodeOverhead)(nil)).Elem(),
 	"PVAllocation":                  reflect.TypeOf((*PVAllocation)(nil)).Elem(),
 	"PVKey":                         reflect.TypeOf((*PVKey)(nil)).Elem(),
 	"RawAllocationOnlyData":         reflect.TypeOf((*RawAllocationOnlyData)(nil)).Elem(),
@@ -4753,6 +4754,14 @@ func (target *CloudCost) MarshalBinaryWithContext(ctx *EncodingContext) (err err
 	}
 	// --- [end][write][struct](CostMetric) ---
 
+	// --- [begin][write][struct](CostMetric) ---
+	buff.WriteInt(0) // [compatibility, unused]
+	errG := target.AmortizedCost.MarshalBinaryWithContext(ctx)
+	if errG != nil {
+		return errG
+	}
+	// --- [end][write][struct](CostMetric) ---
+
 	return nil
 }
 
@@ -4874,6 +4883,16 @@ func (target *CloudCost) UnmarshalBinaryWithContext(ctx *DecodingContext) (err e
 	target.InvoicedCost = *f
 	// --- [end][read][struct](CostMetric) ---
 
+	// --- [begin][read][struct](CostMetric) ---
+	g := &CostMetric{}
+	buff.ReadInt() // [compatibility, unused]
+	errG := g.UnmarshalBinaryWithContext(ctx)
+	if errG != nil {
+		return errG
+	}
+	target.AmortizedCost = *g
+	// --- [end][read][struct](CostMetric) ---
+
 	return nil
 }
 
@@ -7712,6 +7731,20 @@ func (target *Node) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
 	buff.WriteFloat64(target.RAMCost)     // write float64
 	buff.WriteFloat64(target.Discount)    // write float64
 	buff.WriteFloat64(target.Preemptible) // write float64
+	if target.Overhead == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][struct](NodeOverhead) ---
+		buff.WriteInt(0) // [compatibility, unused]
+		errG := target.Overhead.MarshalBinaryWithContext(ctx)
+		if errG != nil {
+			return errG
+		}
+		// --- [end][write][struct](NodeOverhead) ---
+
+	}
 	return nil
 }
 
@@ -7923,6 +7956,139 @@ func (target *Node) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error)
 	ll := buff.ReadFloat64() // read float64
 	target.Preemptible = ll
 
+	// field version check
+	if uint8(19) <= version {
+		if buff.ReadUInt8() == uint8(0) {
+			target.Overhead = nil
+		} else {
+			// --- [begin][read][struct](NodeOverhead) ---
+			mm := &NodeOverhead{}
+			buff.ReadInt() // [compatibility, unused]
+			errG := mm.UnmarshalBinaryWithContext(ctx)
+			if errG != nil {
+				return errG
+			}
+			target.Overhead = mm
+			// --- [end][read][struct](NodeOverhead) ---
+
+		}
+	} else {
+		target.Overhead = nil
+
+	}
+
+	return nil
+}
+
+//--------------------------------------------------------------------------
+//  NodeOverhead
+//--------------------------------------------------------------------------
+
+// MarshalBinary serializes the internal properties of this NodeOverhead instance
+// into a byte array
+func (target *NodeOverhead) MarshalBinary() (data []byte, err error) {
+	ctx := &EncodingContext{
+		Buffer: util.NewBuffer(),
+		Table:  nil,
+	}
+
+	e := target.MarshalBinaryWithContext(ctx)
+	if e != nil {
+		return nil, e
+	}
+
+	encBytes := ctx.Buffer.Bytes()
+	return encBytes, nil
+}
+
+// MarshalBinaryWithContext serializes the internal properties of this NodeOverhead instance
+// into a byte array leveraging a predefined context.
+func (target *NodeOverhead) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
+	// panics are recovered and propagated as errors
+	defer func() {
+		if r := recover(); r != nil {
+			if e, ok := r.(error); ok {
+				err = e
+			} else if s, ok := r.(string); ok {
+				err = fmt.Errorf("Unexpected panic: %s", s)
+			} else {
+				err = fmt.Errorf("Unexpected panic: %+v", r)
+			}
+		}
+	}()
+
+	buff := ctx.Buffer
+	buff.WriteUInt8(AssetsCodecVersion) // version
+
+	buff.WriteFloat64(target.CpuOverheadFraction)  // write float64
+	buff.WriteFloat64(target.RamOverheadFraction)  // write float64
+	buff.WriteFloat64(target.OverheadCostFraction) // write float64
+	return nil
+}
+
+// UnmarshalBinary uses the data passed byte array to set all the internal properties of
+// the NodeOverhead type
+func (target *NodeOverhead) UnmarshalBinary(data []byte) error {
+	var table []string
+	buff := util.NewBufferFromBytes(data)
+
+	// string table header validation
+	if isBinaryTag(data, BinaryTagStringTable) {
+		buff.ReadBytes(len(BinaryTagStringTable)) // strip tag length
+		tl := buff.ReadInt()                      // table length
+		if tl > 0 {
+			table = make([]string, tl, tl)
+			for i := 0; i < tl; i++ {
+				table[i] = buff.ReadString()
+			}
+		}
+	}
+
+	ctx := &DecodingContext{
+		Buffer: buff,
+		Table:  table,
+	}
+
+	err := target.UnmarshalBinaryWithContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// UnmarshalBinaryWithContext uses the context containing a string table and binary buffer to set all the internal properties of
+// the NodeOverhead type
+func (target *NodeOverhead) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error) {
+	// panics are recovered and propagated as errors
+	defer func() {
+		if r := recover(); r != nil {
+			if e, ok := r.(error); ok {
+				err = e
+			} else if s, ok := r.(string); ok {
+				err = fmt.Errorf("Unexpected panic: %s", s)
+			} else {
+				err = fmt.Errorf("Unexpected panic: %+v", r)
+			}
+		}
+	}()
+
+	buff := ctx.Buffer
+	version := buff.ReadUInt8()
+
+	if version > AssetsCodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling NodeOverhead. Expected %d or less, got %d", AssetsCodecVersion, version)
+	}
+
+	a := buff.ReadFloat64() // read float64
+	target.CpuOverheadFraction = a
+
+	b := buff.ReadFloat64() // read float64
+	target.RamOverheadFraction = b
+
+	c := buff.ReadFloat64() // read float64
+	target.OverheadCostFraction = c
+
 	return nil
 }