Bläddra i källkod

Merge branch 'develop' into kaelan-prefix-bucket-backup

Ajay Tripathy 3 år sedan
förälder
incheckning
4650c726f4

+ 1 - 1
pkg/cmd/agent/agent.go

@@ -195,7 +195,7 @@ func Execute(opts *AgentOpts) error {
 
 	var clusterInfoProvider clusters.ClusterInfoProvider
 	if env.IsExportClusterInfoEnabled() {
-		clusterInfoConf := confManager.ConfigFileAt(path.Join(configPrefix, " cluster-info.json"))
+		clusterInfoConf := confManager.ConfigFileAt(path.Join(configPrefix, "cluster-info.json"))
 		clusterInfoProvider = costmodel.NewClusterInfoWriteOnRequest(localClusterInfo, clusterInfoConf)
 	} else {
 		clusterInfoProvider = localClusterInfo

+ 20 - 15
pkg/cmd/commands.go

@@ -26,20 +26,24 @@ const (
 // Execute runs the root command for the application. By default, if no command argument is provided,
 // on the command line, the cost-model is executed by default.
 //
-// This function accepts a costModelCmd parameter to provide support for various cost-model implementations
-// (ie: open source, enterprise).
-func Execute(costModelCmd *cobra.Command) error {
+// This function accepts a costModelCmd and agentCmd parameters to provide support for alternate
+// implementations for cost-model and/or agent. If the passed in costModelCmd and/or agentCmd are nil,
+// then the respective defaults from opencost will be used.
+//
+// Any additional commands passed in will be added to the root command.
+func Execute(costModelCmd *cobra.Command, cmds ...*cobra.Command) error {
 	// use the open-source cost-model if a command is not provided
 	if costModelCmd == nil {
 		costModelCmd = newCostModelCommand()
 	}
 
-	// validate the command being passed
-	if err := validate(costModelCmd); err != nil {
+	// validate the commands being passed
+	if err := validate(costModelCmd, CommandCostModel); err != nil {
 		return err
 	}
 
-	rootCmd := newRootCommand(costModelCmd)
+	// prepend the -agent command and create a new root command
+	rootCmd := newRootCommand(costModelCmd, cmds...)
 
 	// in the event that no directive/command is passed, we want to default to using the cost-model command
 	// cobra doesn't provide a way within the API to do this, so we'll prepend the command if it is omitted.
@@ -58,8 +62,8 @@ func Execute(costModelCmd *cobra.Command) error {
 }
 
 // newRootCommand creates a new root command which will act as a sub-command router for the
-// cost-model application
-func newRootCommand(costModelCmd *cobra.Command) *cobra.Command {
+// cost-model application.
+func newRootCommand(costModelCmd *cobra.Command, cmds ...*cobra.Command) *cobra.Command {
 	cmd := &cobra.Command{
 		Use:          commandRoot,
 		SilenceUsage: true,
@@ -81,8 +85,10 @@ func newRootCommand(costModelCmd *cobra.Command) *cobra.Command {
 
 	// add the modes of operation
 	cmd.AddCommand(
-		costModelCmd,
-		newAgentCommand(),
+		append([]*cobra.Command{
+			costModelCmd,
+			newAgentCommand(),
+		}, cmds...)...,
 	)
 
 	return cmd
@@ -131,11 +137,10 @@ func newAgentCommand() *cobra.Command {
 	return agentCmd
 }
 
-// validate will check to ensure that the cost model command passed in has a use equal to the
-// CommandCostModel to ensure that the default command matches.
-func validate(costModelCommand *cobra.Command) error {
-	if costModelCommand.Use != CommandCostModel {
-		return fmt.Errorf("Incompatible 'cost-model' command provided to run-time.")
+// validate checks the command's use to see if it matches an expected command name.
+func validate(cmd *cobra.Command, command string) error {
+	if cmd.Use != command {
+		return fmt.Errorf("Incompatible '%s' command provided to run-time.", command)
 	}
 	return nil
 }

+ 21 - 1
pkg/costmodel/allocation.go

@@ -43,6 +43,7 @@ const (
 	queryFmtNetInternetCostPerGiB    = `avg(avg_over_time(kubecost_network_internet_egress_cost{}[%s])) by (%s)`
 	queryFmtNetReceiveBytes          = `sum(increase(container_network_receive_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
 	queryFmtNetTransferBytes         = `sum(increase(container_network_transmit_bytes_total{pod!="", container="POD"}[%s])) by (pod_name, pod, namespace, %s)`
+	queryFmtNodeLabels               = `avg_over_time(kube_node_labels[%s])`
 	queryFmtNamespaceLabels          = `avg_over_time(kube_namespace_labels[%s])`
 	queryFmtNamespaceAnnotations     = `avg_over_time(kube_namespace_annotations[%s])`
 	queryFmtPodLabels                = `avg_over_time(kube_pod_labels[%s])`
@@ -393,6 +394,12 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	queryNetInternetCostPerGiB := fmt.Sprintf(queryFmtNetInternetCostPerGiB, durStr, env.GetPromClusterLabel())
 	resChNetInternetCostPerGiB := ctx.QueryAtTime(queryNetInternetCostPerGiB, end)
 
+	var resChNodeLabels prom.QueryResultsChan
+	if env.GetAllocationNodeLabelsEnabled() {
+		queryNodeLabels := fmt.Sprintf(queryFmtNodeLabels, durStr)
+		resChNodeLabels = ctx.QueryAtTime(queryNodeLabels, end)
+	}
+
 	queryNamespaceLabels := fmt.Sprintf(queryFmtNamespaceLabels, durStr)
 	resChNamespaceLabels := ctx.QueryAtTime(queryNamespaceLabels, end)
 
@@ -465,6 +472,12 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	resNetInternetGiB, _ := resChNetInternetGiB.Await()
 	resNetInternetCostPerGiB, _ := resChNetInternetCostPerGiB.Await()
 
+	var resNodeLabels []*prom.QueryResult
+	if env.GetAllocationNodeLabelsEnabled() {
+		if env.GetAllocationNodeLabelsEnabled() {
+			resNodeLabels, _ = resChNodeLabels.Await()
+		}
+	}
 	resNamespaceLabels, _ := resChNamespaceLabels.Await()
 	resNamespaceAnnotations, _ := resChNamespaceAnnotations.Await()
 	resPodLabels, _ := resChPodLabels.Await()
@@ -512,11 +525,18 @@ func (cm *CostModel) computeAllocation(start, end time.Time, resolution time.Dur
 	// Other than that case, Allocations should be associated with pods by the
 	// above functions.
 
+	// At this point, we expect "Node" to be set by one of the above functions
+	// (e.g. applyCPUCoresAllocated, etc.) -- otherwise, node labels will fail
+	// to correctly apply to the pods.
+	var nodeLabels map[nodeKey]map[string]string
+	if env.GetAllocationNodeLabelsEnabled() {
+		nodeLabels = resToNodeLabels(resNodeLabels)
+	}
 	namespaceLabels := resToNamespaceLabels(resNamespaceLabels)
 	podLabels := resToPodLabels(resPodLabels, podUIDKeyMap, ingestPodUID)
 	namespaceAnnotations := resToNamespaceAnnotations(resNamespaceAnnotations)
 	podAnnotations := resToPodAnnotations(resPodAnnotations, podUIDKeyMap, ingestPodUID)
-	applyLabels(podMap, namespaceLabels, podLabels)
+	applyLabels(podMap, nodeLabels, namespaceLabels, podLabels)
 	applyAnnotations(podMap, namespaceAnnotations, podAnnotations)
 
 	podDeploymentMap := labelsToPodControllerMap(podLabels, resToDeploymentLabels(resDeploymentLabels))

+ 61 - 4
pkg/costmodel/allocation_helpers.go

@@ -244,6 +244,7 @@ func applyCPUCoresAllocated(podMap map[podKey]*pod, resCPUCoresAllocated []*prom
 				continue
 			}
 			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -301,6 +302,7 @@ func applyCPUCoresRequested(podMap map[podKey]*pod, resCPUCoresRequested []*prom
 				continue
 			}
 			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -449,6 +451,7 @@ func applyRAMBytesAllocated(podMap map[podKey]*pod, resRAMBytesAllocated []*prom
 				continue
 			}
 			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -503,6 +506,7 @@ func applyRAMBytesRequested(podMap map[podKey]*pod, resRAMBytesRequested []*prom
 				continue
 			}
 			pod.Allocations[container].Properties.Node = node
+			pod.Node = node
 		}
 	}
 }
@@ -766,6 +770,42 @@ func applyNetworkAllocation(podMap map[podKey]*pod, resNetworkGiB []*prom.QueryR
 	}
 }
 
+func resToNodeLabels(resNodeLabels []*prom.QueryResult) map[nodeKey]map[string]string {
+	nodeLabels := map[nodeKey]map[string]string{}
+
+	for _, res := range resNodeLabels {
+		nodeKey, err := resultNodeKey(res, env.GetPromClusterLabel(), "node")
+		if err != nil {
+			continue
+		}
+
+		if _, ok := nodeLabels[nodeKey]; !ok {
+			nodeLabels[nodeKey] = map[string]string{}
+		}
+
+		for _, rawK := range env.GetAllocationNodeLabelsIncludeList() {
+			labels := res.GetLabels()
+
+			// Sanitize the given label name to match Prometheus formatting
+			// e.g. topology.kubernetes.io/zone => topology_kubernetes_io_zone
+			k := prom.SanitizeLabelName(rawK)
+			if v, ok := labels[k]; ok {
+				nodeLabels[nodeKey][k] = v
+				continue
+			}
+
+			// Try with the "label_" prefix, if not found
+			// e.g. topology_kubernetes_io_zone => label_topology_kubernetes_io_zone
+			k = fmt.Sprintf("label_%s", k)
+			if v, ok := labels[k]; ok {
+				nodeLabels[nodeKey][k] = v
+			}
+		}
+	}
+
+	return nodeLabels
+}
+
 func resToNamespaceLabels(resNamespaceLabels []*prom.QueryResult) map[namespaceKey]map[string]string {
 	namespaceLabels := map[namespaceKey]map[string]string{}
 
@@ -878,21 +918,34 @@ func resToPodAnnotations(resPodAnnotations []*prom.QueryResult, podUIDKeyMap map
 	return podAnnotations
 }
 
-func applyLabels(podMap map[podKey]*pod, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
+func applyLabels(podMap map[podKey]*pod, nodeLabels map[nodeKey]map[string]string, namespaceLabels map[namespaceKey]map[string]string, podLabels map[podKey]map[string]string) {
 	for podKey, pod := range podMap {
 		for _, alloc := range pod.Allocations {
 			allocLabels := alloc.Properties.Labels
 			if allocLabels == nil {
 				allocLabels = make(map[string]string)
 			}
-			// Apply namespace labels first, then pod labels so that pod labels
-			// overwrite namespace labels.
-			nsKey := podKey.namespaceKey // newNamespaceKey(podKey.Cluster, podKey.Namespace)
+
+			// Apply node labels first, then namespace labels, then pod labels
+			// so that pod labels overwrite namespace labels, which overwrite
+			// node labels.
+
+			if nodeLabels != nil {
+				nodeKey := newNodeKey(pod.Key.Cluster, pod.Node)
+				if labels, ok := nodeLabels[nodeKey]; ok {
+					for k, v := range labels {
+						allocLabels[k] = v
+					}
+				}
+			}
+
+			nsKey := podKey.namespaceKey
 			if labels, ok := namespaceLabels[nsKey]; ok {
 				for k, v := range labels {
 					allocLabels[k] = v
 				}
 			}
+
 			if labels, ok := podLabels[podKey]; ok {
 				for k, v := range labels {
 					allocLabels[k] = v
@@ -2008,6 +2061,8 @@ func getUnmountedPodForCluster(window kubecost.Window, podMap map[podKey]*pod, c
 		thisPod.Allocations[container].Properties.Pod = podName
 		thisPod.Allocations[container].Properties.Container = container
 
+		thisPod.Node = node
+
 		podMap[thisPodKey] = thisPod
 	}
 	return thisPod
@@ -2039,6 +2094,8 @@ func getUnmountedPodForNamespace(window kubecost.Window, podMap map[podKey]*pod,
 		thisPod.Allocations[container].Properties.Pod = podName
 		thisPod.Allocations[container].Properties.Container = container
 
+		thisPod.Node = node
+
 		podMap[thisPodKey] = thisPod
 	}
 	return thisPod

+ 3 - 1
pkg/costmodel/allocation_types.go

@@ -2,8 +2,9 @@ package costmodel
 
 import (
 	"fmt"
-	"github.com/opencost/opencost/pkg/kubecost"
 	"time"
+
+	"github.com/opencost/opencost/pkg/kubecost"
 )
 
 // pod describes a running pod's start and end time within a Window and
@@ -13,6 +14,7 @@ type pod struct {
 	Start       time.Time
 	End         time.Time
 	Key         podKey
+	Node        string
 	Allocations map[string]*kubecost.Allocation
 }
 

+ 8 - 8
pkg/costmodel/clusterinfo.go

@@ -24,23 +24,23 @@ var (
 
 // writeReportingFlags writes the reporting flags to the cluster info map
 func writeReportingFlags(clusterInfo map[string]string) {
-	clusterInfo["logCollection"] = fmt.Sprintf("%t", logCollectionEnabled)
-	clusterInfo["productAnalytics"] = fmt.Sprintf("%t", productAnalyticsEnabled)
-	clusterInfo["errorReporting"] = fmt.Sprintf("%t", errorReportingEnabled)
-	clusterInfo["valuesReporting"] = fmt.Sprintf("%t", valuesReportingEnabled)
+	clusterInfo[clusters.ClusterInfoLogCollectionKey] = fmt.Sprintf("%t", logCollectionEnabled)
+	clusterInfo[clusters.ClusterInfoProductAnalyticsKey] = fmt.Sprintf("%t", productAnalyticsEnabled)
+	clusterInfo[clusters.ClusterInfoErrorReportingKey] = fmt.Sprintf("%t", errorReportingEnabled)
+	clusterInfo[clusters.ClusterInfoValuesReportingKey] = fmt.Sprintf("%t", valuesReportingEnabled)
 }
 
 // writeClusterProfile writes the data associated with the cluster profile
 func writeClusterProfile(clusterInfo map[string]string) {
-	clusterInfo["clusterProfile"] = clusterProfile
+	clusterInfo[clusters.ClusterInfoProfileKey] = clusterProfile
 }
 
 // writeThanosFlags includes the configured thanos flags on the cluster info
 func writeThanosFlags(clusterInfo map[string]string) {
 	// Include Thanos Offset Duration if Applicable
-	clusterInfo["thanosEnabled"] = fmt.Sprintf("%t", thanos.IsEnabled())
+	clusterInfo[clusters.ClusterInfoThanosEnabledKey] = fmt.Sprintf("%t", thanos.IsEnabled())
 	if thanos.IsEnabled() {
-		clusterInfo["thanosOffset"] = thanos.Offset()
+		clusterInfo[clusters.ClusterInfoThanosOffsetKey] = thanos.Offset()
 	}
 }
 
@@ -65,7 +65,7 @@ func (dlcip *localClusterInfoProvider) GetClusterInfo() map[string]string {
 		if err != nil {
 			log.Infof("Could not get k8s version info: %s", err.Error())
 		} else if v != nil {
-			data["version"] = v.Major + "." + v.Minor
+			data[clusters.ClusterInfoVersionKey] = v.Major + "." + v.Minor
 		}
 	} else {
 		log.Infof("Could not get k8s version info: %s", err.Error())

+ 86 - 62
pkg/costmodel/clusters/clustermap.go

@@ -22,6 +22,25 @@ const (
 	LoadRetryDelay time.Duration = 10 * time.Second
 )
 
+// The following constants are used as keys into the cluster info map data structure
+const (
+	ClusterInfoIdKey               = "id"
+	ClusterInfoNameKey             = "name"
+	ClusterInfoProviderKey         = "provider"
+	ClusterInfoProjectKey          = "project"
+	ClusterInfoAccountKey          = "account"
+	ClusterInfoRegionKey           = "region"
+	ClusterInfoProvisionerKey      = "provisioner"
+	ClusterInfoProfileKey          = "clusterProfile"
+	ClusterInfoLogCollectionKey    = "logCollection"
+	ClusterInfoProductAnalyticsKey = "productAnalytics"
+	ClusterInfoErrorReportingKey   = "errorReporting"
+	ClusterInfoValuesReportingKey  = "valuesReporting"
+	ClusterInfoThanosEnabledKey    = "thanosEnabled"
+	ClusterInfoThanosOffsetKey     = "thanosOffset"
+	ClusterInfoVersionKey          = "version"
+)
+
 // prometheus query offset to apply to each non-range query
 // package scope to prevent calling duration parse each use
 var promQueryOffset = env.GetPrometheusQueryOffset()
@@ -73,12 +92,6 @@ type ClusterMap interface {
 	// NameIDFor returns an identifier in the format "<clusterName>/<clusterID>" if the cluster has an
 	// assigned name. Otherwise, just the clusterID is returned.
 	NameIDFor(clusterID string) string
-
-	// SplitNameID splits the nameID back into a separate id and name field
-	SplitNameID(nameID string) (id string, name string)
-
-	// StopRefresh stops the automatic internal map refresh
-	StopRefresh()
 }
 
 // ClusterInfoProvider is a contract which is capable of performing cluster info lookups.
@@ -89,7 +102,7 @@ type ClusterInfoProvider interface {
 
 // ClusterMap keeps records of all known cost-model clusters.
 type PrometheusClusterMap struct {
-	lock        *sync.RWMutex
+	lock        sync.RWMutex
 	client      prometheus.Client
 	clusters    map[string]*ClusterInfo
 	clusterInfo ClusterInfoProvider
@@ -101,7 +114,6 @@ func NewClusterMap(client prometheus.Client, cip ClusterInfoProvider, refresh ti
 	stop := make(chan struct{})
 
 	cm := &PrometheusClusterMap{
-		lock:        new(sync.RWMutex),
 		client:      client,
 		clusters:    make(map[string]*ClusterInfo),
 		clusterInfo: cip,
@@ -233,61 +245,12 @@ func (pcm *PrometheusClusterMap) loadClusters() (map[string]*ClusterInfo, error)
 func (pcm *PrometheusClusterMap) getLocalClusterInfo() (*ClusterInfo, error) {
 	info := pcm.clusterInfo.GetClusterInfo()
 
-	var id string
-	var name string
-
-	if i, ok := info["id"]; ok {
-		id = i
-	} else {
-		return nil, fmt.Errorf("Local Cluster Info Missing ID")
-	}
-	if n, ok := info["name"]; ok {
-		name = n
-	} else {
-		return nil, fmt.Errorf("Local Cluster Info Missing Name")
-	}
-
-	var clusterProfile string
-	var provider string
-	var account string
-	var project string
-	var region string
-	var provisioner string
-
-	if cp, ok := info["clusterProfile"]; ok {
-		clusterProfile = cp
-	}
-
-	if pvdr, ok := info["provider"]; ok {
-		provider = pvdr
-	}
-
-	if acct, ok := info["account"]; ok {
-		account = acct
-	}
-
-	if proj, ok := info["project"]; ok {
-		project = proj
-	}
-
-	if reg, ok := info["region"]; ok {
-		region = reg
-	}
-
-	if pvsr, ok := info["provisioner"]; ok {
-		provisioner = pvsr
+	clusterInfo, err := MapToClusterInfo(info)
+	if err != nil {
+		return nil, fmt.Errorf("Parsing Local Cluster Info Failed: %s", err)
 	}
 
-	return &ClusterInfo{
-		ID:          id,
-		Name:        name,
-		Profile:     clusterProfile,
-		Provider:    provider,
-		Account:     account,
-		Project:     project,
-		Region:      region,
-		Provisioner: provisioner,
-	}, nil
+	return clusterInfo, nil
 }
 
 // refreshClusters loads the clusters and updates the internal map
@@ -371,7 +334,8 @@ func (pcm *PrometheusClusterMap) NameIDFor(clusterID string) string {
 	return clusterID
 }
 
-func (pcm *PrometheusClusterMap) SplitNameID(nameID string) (id string, name string) {
+// SplitNameID is a helper method that removes the common split format and returns
+func SplitNameID(nameID string) (id string, name string) {
 	if !strings.Contains(nameID, "/") {
 		id = nameID
 		name = ""
@@ -391,3 +355,63 @@ func (pcm *PrometheusClusterMap) StopRefresh() {
 		pcm.stop = nil
 	}
 }
+
+// MapToClusterInfo returns a ClusterInfo using parsed data from a string map. If
+// parsing the map fails for id and/or name, an error is returned.
+func MapToClusterInfo(info map[string]string) (*ClusterInfo, error) {
+	var id string
+	var name string
+
+	if i, ok := info[ClusterInfoIdKey]; ok {
+		id = i
+	} else {
+		return nil, fmt.Errorf("Cluster Info Missing ID")
+	}
+	if n, ok := info[ClusterInfoNameKey]; ok {
+		name = n
+	} else {
+		name = id
+	}
+
+	var clusterProfile string
+	var provider string
+	var account string
+	var project string
+	var region string
+	var provisioner string
+
+	if cp, ok := info[ClusterInfoProfileKey]; ok {
+		clusterProfile = cp
+	}
+
+	if pvdr, ok := info[ClusterInfoProviderKey]; ok {
+		provider = pvdr
+	}
+
+	if acct, ok := info[ClusterInfoAccountKey]; ok {
+		account = acct
+	}
+
+	if proj, ok := info[ClusterInfoProjectKey]; ok {
+		project = proj
+	}
+
+	if reg, ok := info[ClusterInfoRegionKey]; ok {
+		region = reg
+	}
+
+	if pvsr, ok := info[ClusterInfoProvisionerKey]; ok {
+		provisioner = pvsr
+	}
+
+	return &ClusterInfo{
+		ID:          id,
+		Name:        name,
+		Profile:     clusterProfile,
+		Provider:    provider,
+		Account:     account,
+		Project:     project,
+		Region:      region,
+		Provisioner: provisioner,
+	}, nil
+}

+ 9 - 1
pkg/costmodel/costmodel.go

@@ -1044,7 +1044,7 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 
 		if newCnode.GPU != "" && newCnode.GPUCost == "" {
 			// We couldn't find a gpu cost, so fix cpu and ram, then accordingly
-			log.Debugf("GPU without cost found for %s, calculating...", cp.GetKey(nodeLabels, n).Features())
+			log.Infof("GPU without cost found for %s, calculating...", cp.GetKey(nodeLabels, n).Features())
 
 			defaultCPU, err := strconv.ParseFloat(cfg.CPU, 64)
 			if err != nil {
@@ -1181,6 +1181,14 @@ func (cm *CostModel) GetNodeCost(cp costAnalyzerCloud.Provider) (map[string]*cos
 					log.Warnf("Could not parse total node price")
 					return nil, err
 				}
+				if newCnode.GPUCost != "" {
+					gpuPrice, err := strconv.ParseFloat(newCnode.GPUCost, 64)
+					if err != nil {
+						log.Warnf("Could not parse node gpu price")
+						return nil, err
+					}
+					nodePrice = nodePrice - gpuPrice // remove the gpuPrice from the total, we're just costing out RAM and CPU.
+				}
 			} else {
 				nodePrice, err = strconv.ParseFloat(newCnode.VCPUCost, 64) // all the price was allocated to the CPU
 				if err != nil {

+ 1 - 1
pkg/costmodel/key.go

@@ -2,9 +2,9 @@ package costmodel
 
 import (
 	"fmt"
-	"github.com/opencost/opencost/pkg/kubecost"
 
 	"github.com/opencost/opencost/pkg/env"
+	"github.com/opencost/opencost/pkg/kubecost"
 	"github.com/opencost/opencost/pkg/prom"
 )
 

+ 32 - 0
pkg/env/costmodelenv.go

@@ -92,6 +92,9 @@ const (
 	IngestPodUIDEnvVar = "INGEST_POD_UID"
 
 	ETLReadOnlyMode = "ETL_READ_ONLY"
+
+	AllocationNodeLabelsEnabled     = "ALLOCATION_NODE_LABELS_ENABLED"
+	AllocationNodeLabelsIncludeList = "ALLOCATION_NODE_LABELS_INCLUDE_LIST"
 )
 
 var offsetRegex = regexp.MustCompile(`^(\+|-)(\d\d):(\d\d)$`)
@@ -503,3 +506,32 @@ func GetPromClusterLabel() string {
 func IsIngestingPodUID() bool {
 	return GetBool(IngestPodUIDEnvVar, false)
 }
+
+func GetAllocationNodeLabelsEnabled() bool {
+	return GetBool(AllocationNodeLabelsEnabled, true)
+}
+
+var defaultAllocationNodeLabelsIncludeList []string = []string{
+	"cloud.google.com/gke-nodepool",
+	"eks.amazonaws.com/nodegroup",
+	"kubernetes.azure.com/agentpool",
+	"node.kubernetes.io/instance-type",
+	"topology.kubernetes.io/region",
+	"topology.kubernetes.io/zone",
+}
+
+func GetAllocationNodeLabelsIncludeList() []string {
+	// If node labels are not enabled, return an empty list.
+	if !GetAllocationNodeLabelsEnabled() {
+		return []string{}
+	}
+
+	list := GetList(AllocationNodeLabelsIncludeList, ",")
+
+	// If node labels are enabled, but the white list is empty, use defaults.
+	if len(list) == 0 {
+		return defaultAllocationNodeLabelsIncludeList
+	}
+
+	return list
+}

+ 4 - 4
pkg/filter/util/cloudcostaggregate.go

@@ -17,12 +17,12 @@ func CloudCostAggregateFilterFromParams(pmr mapper.PrimitiveMapReader) filter.Fi
 		Filters: []filter.Filter[*kubecost.CloudCostAggregate]{},
 	}
 
-	if raw := pmr.GetList("filterAccounts", ","); len(raw) > 0 {
-		filter.Filters = append(filter.Filters, filterV1SingleValueFromList(raw, kubecost.CloudCostAccountProp))
+	if raw := pmr.GetList("filterBillingIDs", ","); len(raw) > 0 {
+		filter.Filters = append(filter.Filters, filterV1SingleValueFromList(raw, kubecost.CloudCostBillingIDProp))
 	}
 
-	if raw := pmr.GetList("filterProjects", ","); len(raw) > 0 {
-		filter.Filters = append(filter.Filters, filterV1SingleValueFromList(raw, kubecost.CloudCostProjectProp))
+	if raw := pmr.GetList("filterWorkGroupIDs", ","); len(raw) > 0 {
+		filter.Filters = append(filter.Filters, filterV1SingleValueFromList(raw, kubecost.CloudCostWorkGroupIDProp))
 	}
 
 	if raw := pmr.GetList("filterProviders", ","); len(raw) > 0 {

+ 2 - 2
pkg/kubecost/bingen.go

@@ -73,7 +73,7 @@ package kubecost
 // @bingen:generate:AuditSetRange
 // @bingen:end
 
-// @bingen:set[name=CloudCostAggregate,version=1]
+// @bingen:set[name=CloudCostAggregate,version=2]
 // @bingen:generate:CloudCostAggregate
 // @bingen:generate[stringtable]:CloudCostAggregateSet
 // @bingen:generate:CloudCostAggregateSetRange
@@ -81,7 +81,7 @@ package kubecost
 // @bingen:generate:CloudCostAggregateLabels
 // @bingen:end
 
-// @bingen:set[name=CloudCostItem,version=1]
+// @bingen:set[name=CloudCostItem,version=2]
 // @bingen:generate:CloudCostItem
 // @bingen:generate[stringtable]:CloudCostItemSet
 // @bingen:generate:CloudCostItemSetRange

+ 95 - 52
pkg/kubecost/cloudcostaggregate.go

@@ -11,33 +11,33 @@ import (
 )
 
 const (
-	CloudCostAccountProp  string = "account"
-	CloudCostProjectProp  string = "project"
-	CloudCostProviderProp string = "provider"
-	CloudCostServiceProp  string = "service"
-	CloudCostLabelProp    string = "label"
+	CloudCostBillingIDProp   string = "billingID"
+	CloudCostWorkGroupIDProp string = "workGroupID"
+	CloudCostProviderProp    string = "provider"
+	CloudCostServiceProp     string = "service"
+	CloudCostLabelProp       string = "label"
 )
 
 // CloudCostAggregateProperties unique property set for CloudCostAggregate within a window
 type CloudCostAggregateProperties struct {
-	Provider   string `json:"provider"`
-	Account    string `json:"account"`
-	Project    string `json:"project"`
-	Service    string `json:"service"`
-	LabelValue string `json:"label"`
+	Provider    string `json:"provider"`
+	WorkGroupID string `json:"workGroupID"`
+	BillingID   string `json:"billingID"`
+	Service     string `json:"service"`
+	LabelValue  string `json:"label"`
 }
 
 func (ccap CloudCostAggregateProperties) Equal(that CloudCostAggregateProperties) bool {
 	return ccap.Provider == that.Provider &&
-		ccap.Account == that.Account &&
-		ccap.Project == that.Project &&
+		ccap.WorkGroupID == that.WorkGroupID &&
+		ccap.BillingID == that.BillingID &&
 		ccap.Service == that.Service &&
 		ccap.LabelValue == that.LabelValue
 }
 
 func (ccap CloudCostAggregateProperties) Key(props []string) string {
 	if len(props) == 0 {
-		return fmt.Sprintf("%s/%s/%s/%s/%s", ccap.Provider, ccap.Account, ccap.Project, ccap.Service, ccap.LabelValue)
+		return fmt.Sprintf("%s/%s/%s/%s/%s", ccap.Provider, ccap.BillingID, ccap.WorkGroupID, ccap.Service, ccap.LabelValue)
 	}
 
 	keys := make([]string, len(props))
@@ -49,13 +49,13 @@ func (ccap CloudCostAggregateProperties) Key(props []string) string {
 			if ccap.Provider != "" {
 				key = ccap.Provider
 			}
-		case CloudCostAccountProp:
-			if ccap.Account != "" {
-				key = ccap.Account
+		case CloudCostBillingIDProp:
+			if ccap.BillingID != "" {
+				key = ccap.BillingID
 			}
-		case CloudCostProjectProp:
-			if ccap.Project != "" {
-				key = ccap.Project
+		case CloudCostWorkGroupIDProp:
+			if ccap.WorkGroupID != "" {
+				key = ccap.WorkGroupID
 			}
 		case CloudCostServiceProp:
 			if ccap.Service != "" {
@@ -83,7 +83,7 @@ type CloudCostAggregate struct {
 	Properties        CloudCostAggregateProperties `json:"properties"`
 	KubernetesPercent float64                      `json:"kubernetesPercent"`
 	Cost              float64                      `json:"cost"`
-	Credit            float64                      `json:"credit"`
+	NetCost           float64                      `json:"netCost"`
 }
 
 func (cca *CloudCostAggregate) Clone() *CloudCostAggregate {
@@ -91,7 +91,7 @@ func (cca *CloudCostAggregate) Clone() *CloudCostAggregate {
 		Properties:        cca.Properties,
 		KubernetesPercent: cca.KubernetesPercent,
 		Cost:              cca.Cost,
-		Credit:            cca.Credit,
+		NetCost:           cca.NetCost,
 	}
 }
 
@@ -101,7 +101,7 @@ func (cca *CloudCostAggregate) Equal(that *CloudCostAggregate) bool {
 	}
 
 	return cca.Cost == that.Cost &&
-		cca.Credit == that.Credit &&
+		cca.NetCost == that.NetCost &&
 		cca.Properties.Equal(that.Properties)
 }
 
@@ -115,10 +115,10 @@ func (cca *CloudCostAggregate) StringProperty(prop string) (string, error) {
 	}
 
 	switch prop {
-	case CloudCostAccountProp:
-		return cca.Properties.Account, nil
-	case CloudCostProjectProp:
-		return cca.Properties.Project, nil
+	case CloudCostBillingIDProp:
+		return cca.Properties.BillingID, nil
+	case CloudCostWorkGroupIDProp:
+		return cca.Properties.WorkGroupID, nil
 	case CloudCostProviderProp:
 		return cca.Properties.Provider, nil
 	case CloudCostServiceProp:
@@ -146,12 +146,12 @@ func (cca *CloudCostAggregate) add(that *CloudCostAggregate) {
 	}
 
 	cca.Cost = sumCost
-	cca.Credit += that.Credit
+	cca.NetCost += that.NetCost
 	cca.KubernetesPercent = k8sPct
 }
 
 type CloudCostAggregateSet struct {
-	CloudCostAggregates   map[string]*CloudCostAggregate `json:"items"`
+	CloudCostAggregates   map[string]*CloudCostAggregate `json:"aggregates"`
 	AggregationProperties []string                       `json:"-"`
 	Integration           string                         `json:"-"`
 	LabelName             string                         `json:"labelName,omitempty"`
@@ -351,51 +351,94 @@ func (ccas *CloudCostAggregateSet) Merge(that *CloudCostAggregateSet) (*CloudCos
 	return result, nil
 }
 
-func GetCloudCostAggregateSets(start, end time.Time, windowDuration time.Duration, integration string, labelName string) ([]*CloudCostAggregateSet, error) {
-	windows, err := GetWindows(start, end, windowDuration)
+type CloudCostAggregateSetRange struct {
+	CloudCostAggregateSets []*CloudCostAggregateSet `json:"sets"`
+	Window                 Window                   `json:"window"`
+}
+
+// NewCloudCostAggregateSetRange create a CloudCostAggregateSetRange containing CloudCostItemSets with windows of equal duration
+// the duration between start and end must be divisible by the window duration argument
+func NewCloudCostAggregateSetRange(start, end time.Time, window time.Duration, integration string, labelName string) (*CloudCostAggregateSetRange, error) {
+	windows, err := GetWindows(start, end, window)
 	if err != nil {
 		return nil, err
 	}
 
 	// Build slice of CloudCostAggregateSet to cover the range
-	CloudCostAggregateSets := []*CloudCostAggregateSet{}
-	for _, w := range windows {
+	cloudCostAggregateSets := make([]*CloudCostAggregateSet, len(windows))
+	for i, w := range windows {
 		ccas := NewCloudCostAggregateSet(*w.Start(), *w.End())
 		ccas.Integration = integration
 		ccas.LabelName = labelName
-		CloudCostAggregateSets = append(CloudCostAggregateSets, ccas)
+		cloudCostAggregateSets[i] = ccas
 	}
-	return CloudCostAggregateSets, nil
+	return &CloudCostAggregateSetRange{
+		Window:                 NewWindow(&start, &end),
+		CloudCostAggregateSets: cloudCostAggregateSets,
+	}, nil
 }
 
-// LoadCloudCostAggregateSets creates and loads CloudCostAggregates into provided CloudCostAggregateSets. This method makes it so
-// that the input windows do not have to match the one day frame of the Athena queries. CloudCostAggregates being generated from a
-// CUR which may be the identical except for the pricing model used (default, RI or savings plan)
-// are accumulated here so that the resulting CloudCostAggregate with the 1d window has the correct price for the entire day.
-func LoadCloudCostAggregateSets(itemStart time.Time, itemEnd time.Time, properties CloudCostAggregateProperties, K8sPercent, cost, credit float64, CloudCostAggregateSets []*CloudCostAggregateSet) {
-	// Disperse cost of the current item across one or more CloudCostAggregates in
+// LoadCloudCostAggregate loads CloudCostAggregates into existing CloudCostAggregateSets of the CloudCostAggregateSetRange.
+// This function service to aggregate and distribute costs over predefined windows
+// If all or a portion of the window of the CloudCostAggregate is outside of the windows of the existing CloudCostAggregateSets,
+// that portion of the CloudCostAggregate's cost will not be inserted
+func (ccasr *CloudCostAggregateSetRange) LoadCloudCostAggregate(window Window, cloudCostAggregate *CloudCostAggregate) {
+	if window.IsOpen() {
+		log.Errorf("CloudCostItemSetRange: LoadCloudCostItem: invalid window %s", window.String())
+		return
+	}
+
+	totalPct := 0.0
+
+	// Distribute cost of the current item across one or more CloudCostAggregates in
 	// across each relevant CloudCostAggregateSet. Stop when the end of the current
 	// block reaches the item's end time or the end of the range.
-	for _, ccas := range CloudCostAggregateSets {
-		pct := ccas.GetWindow().GetPercentInWindow(itemStart, itemEnd)
-
-		// Insert an CloudCostAggregate with that cost into the CloudCostAggregateSet at the given index
-		cca := &CloudCostAggregate{
-			Properties:        properties,
-			KubernetesPercent: K8sPercent * pct,
-			Cost:              cost * pct,
-			Credit:            credit * pct,
+	for _, ccas := range ccasr.CloudCostAggregateSets {
+		pct := ccas.GetWindow().GetPercentInWindow(window)
+		if pct == 0 {
+			continue
+		}
+		cca := cloudCostAggregate
+		// If the current set Window only contains a portion of the CloudCostItem Window, insert costs relative to that portion
+		if pct < 1.0 {
+			cca = &CloudCostAggregate{
+				Properties:        cloudCostAggregate.Properties,
+				KubernetesPercent: cloudCostAggregate.KubernetesPercent * pct,
+				Cost:              cloudCostAggregate.Cost * pct,
+				NetCost:           cloudCostAggregate.NetCost * pct,
+			}
 		}
 		err := ccas.insertByProperty(cca, nil)
 		if err != nil {
 			log.Errorf("LoadCloudCostAggregateSets: failed to load CloudCostAggregate with key %s and window %s", cca.Key(nil), ccas.GetWindow().String())
 		}
+
+		// If all cost has been inserted then finish
+		totalPct += pct
+		if totalPct >= 1.0 {
+			return
+		}
 	}
 }
 
-type CloudCostAggregateSetRange struct {
-	CloudCostAggregateSets []*CloudCostAggregateSet `json:"sets"`
-	Window                 Window                   `json:"window"`
+func (ccasr *CloudCostAggregateSetRange) Clone() *CloudCostAggregateSetRange {
+	ccasSlice := make([]*CloudCostAggregateSet, len(ccasr.CloudCostAggregateSets))
+	for i, ccas := range ccasr.CloudCostAggregateSets {
+		ccasSlice[i] = ccas.Clone()
+	}
+	return &CloudCostAggregateSetRange{
+		Window:                 ccasr.Window.Clone(),
+		CloudCostAggregateSets: ccasSlice,
+	}
+}
+
+func (ccasr *CloudCostAggregateSetRange) IsEmpty() bool {
+	for _, ccas := range ccasr.CloudCostAggregateSets {
+		if !ccas.IsEmpty() {
+			return false
+		}
+	}
+	return true
 }
 
 func (ccasr *CloudCostAggregateSetRange) Accumulate() (*CloudCostAggregateSet, error) {

+ 271 - 0
pkg/kubecost/cloudcostaggregate_test.go

@@ -0,0 +1,271 @@
+package kubecost
+
+import (
+	"github.com/opencost/opencost/pkg/util/timeutil"
+	"testing"
+	"time"
+)
+
+var ccaProperties1 = CloudCostAggregateProperties{
+	Provider:    "provider1",
+	WorkGroupID: "workgroup1",
+	BillingID:   "billing1",
+	Service:     "service1",
+	LabelValue:  "labelValue1",
+}
+
+// TestCloudCostAggregate_LoadCloudCostAggregate checks that loaded CloudCostAggregates end up in the correct set in the
+// correct proportions
+func TestCloudCostAggregate_LoadCloudCostAggregate(t *testing.T) {
+	// create values for 3 day Range tests
+	end := RoundBack(time.Now().UTC(), timeutil.Day)
+	start := end.Add(-3 * timeutil.Day)
+	dayWindows, _ := GetWindows(start, end, timeutil.Day)
+	emtpyCASSR, _ := NewCloudCostAggregateSetRange(start, end, timeutil.Day, "integration", "label")
+	testCases := map[string]struct {
+		cca      []*CloudCostAggregate
+		windows  []Window
+		ccasr    *CloudCostAggregateSetRange
+		expected []*CloudCostAggregateSet
+	}{
+		"Load Single Day On Grid": {
+			cca: []*CloudCostAggregate{
+				{
+					Properties:        ccaProperties1,
+					KubernetesPercent: 1,
+					Cost:              100,
+					NetCost:           80,
+				},
+			},
+			windows: []Window{
+				dayWindows[0],
+			},
+			ccasr: emtpyCASSR.Clone(),
+			expected: []*CloudCostAggregateSet{
+				{
+					Integration: "integration",
+					LabelName:   "label",
+					Window:      dayWindows[0],
+					CloudCostAggregates: map[string]*CloudCostAggregate{
+						ccaProperties1.Key(nil): {
+							Properties:        ccaProperties1,
+							KubernetesPercent: 1,
+							Cost:              100,
+							NetCost:           80,
+						},
+					},
+				},
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[1],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[2],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+			},
+		},
+		"Load Single Day Off Grid": {
+			cca: []*CloudCostAggregate{
+				{
+					Properties:        ccaProperties1,
+					KubernetesPercent: 1,
+					Cost:              100,
+					NetCost:           80,
+				},
+			},
+			windows: []Window{
+				NewClosedWindow(start.Add(12*time.Hour), start.Add(36*time.Hour)),
+			},
+			ccasr: emtpyCASSR.Clone(),
+			expected: []*CloudCostAggregateSet{
+				{
+					Integration: "integration",
+					LabelName:   "label",
+					Window:      dayWindows[0],
+					CloudCostAggregates: map[string]*CloudCostAggregate{
+						ccaProperties1.Key(nil): {
+							Properties:        ccaProperties1,
+							KubernetesPercent: 1,
+							Cost:              50,
+							NetCost:           40,
+						},
+					},
+				},
+				{
+					Integration: "integration",
+					LabelName:   "label",
+					Window:      dayWindows[1],
+					CloudCostAggregates: map[string]*CloudCostAggregate{
+						ccaProperties1.Key(nil): {
+							Properties:        ccaProperties1,
+							KubernetesPercent: 1,
+							Cost:              50,
+							NetCost:           40,
+						},
+					},
+				},
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[2],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+			},
+		},
+		"Load Single Day Off Grid Before Range Window": {
+			cca: []*CloudCostAggregate{
+				{
+					Properties:        ccaProperties1,
+					KubernetesPercent: 1,
+					Cost:              100,
+					NetCost:           80,
+				},
+			},
+			windows: []Window{
+				NewClosedWindow(start.Add(-12*time.Hour), start.Add(12*time.Hour)),
+			},
+			ccasr: emtpyCASSR.Clone(),
+			expected: []*CloudCostAggregateSet{
+				{
+					Integration: "integration",
+					LabelName:   "label",
+					Window:      dayWindows[0],
+					CloudCostAggregates: map[string]*CloudCostAggregate{
+						ccaProperties1.Key(nil): {
+							Properties:        ccaProperties1,
+							KubernetesPercent: 1,
+							Cost:              50,
+							NetCost:           40,
+						},
+					},
+				},
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[1],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[2],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+			},
+		},
+		"Load Single Day Off Grid After Range Window": {
+			cca: []*CloudCostAggregate{
+				{
+					Properties:        ccaProperties1,
+					KubernetesPercent: 1,
+					Cost:              100,
+					NetCost:           80,
+				},
+			},
+			windows: []Window{
+				NewClosedWindow(end.Add(-12*time.Hour), end.Add(12*time.Hour)),
+			},
+			ccasr: emtpyCASSR.Clone(),
+			expected: []*CloudCostAggregateSet{
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[0],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[1],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+				{
+					Integration: "integration",
+					LabelName:   "label",
+					Window:      dayWindows[2],
+					CloudCostAggregates: map[string]*CloudCostAggregate{
+						ccaProperties1.Key(nil): {
+							Properties:        ccaProperties1,
+							KubernetesPercent: 1,
+							Cost:              50,
+							NetCost:           40,
+						},
+					},
+				},
+			},
+		},
+		"Single Day Kubecost Percent": {
+			cca: []*CloudCostAggregate{
+				{
+					Properties:        ccaProperties1,
+					KubernetesPercent: 1,
+					Cost:              75,
+					NetCost:           60,
+				},
+				{
+					Properties:        ccaProperties1,
+					KubernetesPercent: 0,
+					Cost:              25,
+					NetCost:           20,
+				},
+			},
+			windows: []Window{
+				dayWindows[1],
+				dayWindows[1],
+			},
+			ccasr: emtpyCASSR.Clone(),
+			expected: []*CloudCostAggregateSet{
+				{
+					Integration:         "integration",
+					LabelName:           "label",
+					Window:              dayWindows[0],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+				{
+					Integration: "integration",
+					LabelName:   "label",
+					Window:      dayWindows[1],
+					CloudCostAggregates: map[string]*CloudCostAggregate{
+						ccaProperties1.Key(nil): {
+							Properties:        ccaProperties1,
+							KubernetesPercent: 0.75,
+							Cost:              100,
+							NetCost:           80,
+						},
+					},
+				},
+				{
+					Integration: "integration",
+					LabelName:   "label",
+					Window:      dayWindows[2],
+					CloudCostAggregates: map[string]*CloudCostAggregate{},
+				},
+			},
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			// load Cloud Cost Aggregates
+			for i, cca := range tc.cca {
+				tc.ccasr.LoadCloudCostAggregate(tc.windows[i], cca)
+			}
+
+			if len(tc.ccasr.CloudCostAggregateSets) != len(tc.expected) {
+				t.Errorf("the CloudCostAggregateSetRanges did not have the expected length")
+			}
+
+			for i, ccas := range tc.ccasr.CloudCostAggregateSets {
+				if !ccas.Equal(tc.expected[i]) {
+					t.Errorf("CloudCostAggregateSet at index: %d did not match expected", i)
+				}
+			}
+		})
+	}
+
+}

+ 87 - 48
pkg/kubecost/cloudcostitem.go

@@ -35,20 +35,20 @@ func (ccil CloudCostItemLabels) Equal(that CloudCostItemLabels) bool {
 }
 
 type CloudCostItemProperties struct {
-	ProviderID string              `json:"providerID,omitempty"`
-	Provider   string              `json:"provider,omitempty"`
-	Account    string              `json:"account,omitempty"`
-	Project    string              `json:"project,omitempty"`
-	Service    string              `json:"service,omitempty"`
-	Category   string              `json:"category,omitempty"`
-	Labels     CloudCostItemLabels `json:"labels,omitempty"`
+	ProviderID  string              `json:"providerID,omitempty"`
+	Provider    string              `json:"provider,omitempty"`
+	WorkGroupID string              `json:"workGroupID,omitempty"`
+	BillingID   string              `json:"billingID,omitempty"`
+	Service     string              `json:"service,omitempty"`
+	Category    string              `json:"category,omitempty"`
+	Labels      CloudCostItemLabels `json:"labels,omitempty"`
 }
 
 func (ccip CloudCostItemProperties) Equal(that CloudCostItemProperties) bool {
 	return ccip.ProviderID == that.ProviderID &&
 		ccip.Provider == that.Provider &&
-		ccip.Account == that.Account &&
-		ccip.Project == that.Project &&
+		ccip.WorkGroupID == that.WorkGroupID &&
+		ccip.BillingID == that.BillingID &&
 		ccip.Service == that.Service &&
 		ccip.Category == that.Category &&
 		ccip.Labels.Equal(that.Labels)
@@ -56,18 +56,18 @@ func (ccip CloudCostItemProperties) Equal(that CloudCostItemProperties) bool {
 
 func (ccip CloudCostItemProperties) Clone() CloudCostItemProperties {
 	return CloudCostItemProperties{
-		ProviderID: ccip.ProviderID,
-		Provider:   ccip.Provider,
-		Account:    ccip.Account,
-		Project:    ccip.Project,
-		Service:    ccip.Service,
-		Category:   ccip.Category,
-		Labels:     ccip.Labels.Clone(),
+		ProviderID:  ccip.ProviderID,
+		Provider:    ccip.Provider,
+		WorkGroupID: ccip.WorkGroupID,
+		BillingID:   ccip.BillingID,
+		Service:     ccip.Service,
+		Category:    ccip.Category,
+		Labels:      ccip.Labels.Clone(),
 	}
 }
 
 func (ccip CloudCostItemProperties) Key() string {
-	return fmt.Sprintf("%s/%s/%s/%s/%s/%s", ccip.Provider, ccip.Account, ccip.Project, ccip.Category, ccip.Service, ccip.ProviderID)
+	return fmt.Sprintf("%s/%s/%s/%s/%s/%s", ccip.Provider, ccip.BillingID, ccip.WorkGroupID, ccip.Category, ccip.Service, ccip.ProviderID)
 }
 
 // CloudCostItem represents a CUR line item, identifying a cloud resource and
@@ -77,7 +77,7 @@ type CloudCostItem struct {
 	IsKubernetes bool
 	Window       Window
 	Cost         float64
-	Credit       float64
+	NetCost      float64
 }
 
 func (cci *CloudCostItem) Clone() *CloudCostItem {
@@ -86,7 +86,7 @@ func (cci *CloudCostItem) Clone() *CloudCostItem {
 		IsKubernetes: cci.IsKubernetes,
 		Window:       cci.Window.Clone(),
 		Cost:         cci.Cost,
-		Credit:       cci.Credit,
+		NetCost:      cci.NetCost,
 	}
 }
 
@@ -99,7 +99,7 @@ func (cci *CloudCostItem) Equal(that *CloudCostItem) bool {
 		cci.IsKubernetes == that.IsKubernetes &&
 		cci.Window.Equal(that.Window) &&
 		cci.Cost == that.Cost &&
-		cci.Credit == that.Credit
+		cci.NetCost == that.NetCost
 }
 
 func (cci *CloudCostItem) Key() string {
@@ -113,7 +113,7 @@ func (cci *CloudCostItem) add(that *CloudCostItem) {
 	}
 
 	cci.Cost += that.Cost
-	cci.Credit += that.Credit
+	cci.NetCost += that.NetCost
 	cci.Window = cci.Window.Expand(that.Window)
 }
 
@@ -271,51 +271,90 @@ func (ccis *CloudCostItemSet) Merge(that *CloudCostItemSet) (*CloudCostItemSet,
 	return result, nil
 }
 
-// GetCloudCostItemSets
-func GetCloudCostItemSets(start time.Time, end time.Time, window time.Duration, integration string) ([]*CloudCostItemSet, error) {
+type CloudCostItemSetRange struct {
+	CloudCostItemSets []*CloudCostItemSet `json:"sets"`
+	Window            Window              `json:"window"`
+}
+
+// NewCloudCostItemSetRange create a CloudCostItemSetRange containing CloudCostItemSets with windows of equal duration
+// the duration between start and end must be divisible by the window duration argument
+func NewCloudCostItemSetRange(start time.Time, end time.Time, window time.Duration, integration string) (*CloudCostItemSetRange, error) {
 	windows, err := GetWindows(start, end, window)
 	if err != nil {
 		return nil, err
 	}
 
 	// Build slice of CloudCostItemSet to cover the range
-	CloudCostItemSets := []*CloudCostItemSet{}
-	for _, w := range windows {
+	cloudCostItemSets := make([]*CloudCostItemSet, len(windows))
+	for i, w := range windows {
 		ccis := NewCloudCostItemSet(*w.Start(), *w.End())
 		ccis.Integration = integration
-		CloudCostItemSets = append(CloudCostItemSets, ccis)
+		cloudCostItemSets[i] = ccis
+	}
+	return &CloudCostItemSetRange{
+		Window:            NewWindow(&start, &end),
+		CloudCostItemSets: cloudCostItemSets,
+	}, nil
+}
+
+func (ccisr *CloudCostItemSetRange) Clone() *CloudCostItemSetRange {
+	ccisSlice := make([]*CloudCostItemSet, len(ccisr.CloudCostItemSets))
+	for i, ccis := range ccisr.CloudCostItemSets {
+		ccisSlice[i] = ccis.Clone()
+	}
+	return &CloudCostItemSetRange{
+		Window:            ccisr.Window.Clone(),
+		CloudCostItemSets: ccisSlice,
 	}
-	return CloudCostItemSets, nil
 }
 
-// LoadCloudCostItemSets creates and loads CloudCostItems into provided CloudCostItemSets. This method makes it so
-// that the input windows do not have to match the one day frame of the Athena queries. CloudCostItems being generated from a
-// CUR which may be the identical except for the pricing model used (default, RI or savings plan)
+// LoadCloudCostItem loads CloudCostItems into existing CloudCostItemSets of the CloudCostItemSetRange.
+// This function service to aggregate and distribute costs over predefined windows
 // are accumulated here so that the resulting CloudCostItem with the 1d window has the correct price for the entire day.
-func LoadCloudCostItemSets(itemStart time.Time, itemEnd time.Time, properties CloudCostItemProperties, isK8s bool, cost, credit float64, CloudCostItemSets []*CloudCostItemSet) {
+// If all or a portion of the window of the CloudCostItem is outside of the windows of the existing CloudCostItemSets,
+// that portion of the CloudCostItem's cost will not be inserted
+func (ccisr *CloudCostItemSetRange) LoadCloudCostItem(cloudCostItem *CloudCostItem) {
+	window := cloudCostItem.Window
+	if window.IsOpen() {
+		log.Errorf("CloudCostItemSetRange: LoadCloudCostItem: invalid window %s", window.String())
+		return
+	}
+
+	totalPct := 0.0
 
-	// Disperse cost of the current item across one or more CloudCostItems in
+	// Distribute cost of the current item across one or more CloudCostItems in
 	// across each relevant CloudCostItemSet. Stop when the end of the current
 	// block reaches the item's end time or the end of the range.
-	for _, ccis := range CloudCostItemSets {
-		pct := ccis.GetWindow().GetPercentInWindow(itemStart, itemEnd)
-
-		// Insert an CloudCostItem with that cost into the CloudCostItemSet at the given index
-		cci := &CloudCostItem{
-			Properties:   properties,
-			IsKubernetes: isK8s,
-			Window:       ccis.GetWindow(),
-			Cost:         cost * pct,
-			Credit:       credit * pct,
+	for _, ccis := range ccisr.CloudCostItemSets {
+		setWindow := ccis.Window
+
+		// get percent of item window contained in set window
+		pct := setWindow.GetPercentInWindow(window)
+		if pct == 0 {
+			continue
+		}
+
+		cci := cloudCostItem
+		// If the current set Window only contains a portion of the CloudCostItem Window, insert costs relative to that portion
+		if pct < 1.0 {
+			cci = &CloudCostItem{
+				Properties:   cloudCostItem.Properties,
+				IsKubernetes: cloudCostItem.IsKubernetes,
+				Window:       window.Contract(setWindow),
+				Cost:         cloudCostItem.Cost * pct,
+				NetCost:      cloudCostItem.NetCost * pct,
+			}
 		}
+
 		err := ccis.Insert(cci)
 		if err != nil {
-			log.Errorf("LoadCloudCostItemSets: failed to load CloudCostItem with key %s and window %s: %s", cci.Key(), ccis.GetWindow().String(), err.Error())
+			log.Errorf("CloudCostItemSetRange: LoadCloudCostItem: failed to load CloudCostItem with key %s and window %s: %s", cci.Key(), ccis.GetWindow().String(), err.Error())
 		}
-	}
-}
 
-type CloudCostItemSetRange struct {
-	CloudCostItemSets []*CloudCostItemSet `json:"sets"`
-	Window            Window              `json:"window"`
+		// If all cost has been inserted then finish
+		totalPct += pct
+		if totalPct >= 1.0 {
+			return
+		}
+	}
 }

+ 256 - 0
pkg/kubecost/cloudcostitem_test.go

@@ -0,0 +1,256 @@
+package kubecost
+
+import (
+	"github.com/opencost/opencost/pkg/util/timeutil"
+	"testing"
+	"time"
+)
+
+var cciProperties1 = CloudCostItemProperties{
+	ProviderID:  "providerid1",
+	Provider:    "provider1",
+	WorkGroupID: "workgroup1",
+	BillingID:   "billing1",
+	Service:     "service1",
+	Category:    "category1",
+	Labels: map[string]string{
+		"label1": "value1",
+		"label2": "value2",
+	},
+}
+
+// TestCloudCostItem_LoadCloudCostItem checks that loaded CloudCostItems end up in the correct set in the
+// correct proportions
+func TestCloudCostItem_LoadCloudCostItem(t *testing.T) {
+	// create values for 3 day Range tests
+	end := RoundBack(time.Now().UTC(), timeutil.Day)
+	start := end.Add(-3 * timeutil.Day)
+	dayWindows, _ := GetWindows(start, end, timeutil.Day)
+	emtpyCCISR, _ := NewCloudCostItemSetRange(start, end, timeutil.Day, "integration")
+	testCases := map[string]struct {
+		cci      []*CloudCostItem
+		ccisr    *CloudCostItemSetRange
+		expected []*CloudCostItemSet
+	}{
+		"Load Single Day On Grid": {
+			cci: []*CloudCostItem{
+				{
+					Properties:   cciProperties1,
+					Window:       dayWindows[0],
+					IsKubernetes: true,
+					Cost:         100,
+					NetCost:      80,
+				},
+			},
+			ccisr: emtpyCCISR.Clone(),
+			expected: []*CloudCostItemSet{
+				{
+					Integration: "integration",
+					Window:      dayWindows[0],
+					CloudCostItems: map[string]*CloudCostItem{
+						cciProperties1.Key(): {
+							Properties:   cciProperties1,
+							Window:       dayWindows[0],
+							IsKubernetes: true,
+							Cost:         100,
+							NetCost:      80,
+						},
+					},
+				},
+				{
+					Integration:    "integration",
+					Window:         dayWindows[1],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+				{
+					Integration:    "integration",
+					Window:         dayWindows[2],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+			},
+		},
+		"Load Single Day Off Grid": {
+			cci: []*CloudCostItem{
+				{
+					Properties:   cciProperties1,
+					Window:       NewClosedWindow(start.Add(12*time.Hour), start.Add(36*time.Hour)),
+					IsKubernetes: true,
+					Cost:         100,
+					NetCost:      80,
+				},
+			},
+			ccisr: emtpyCCISR.Clone(),
+			expected: []*CloudCostItemSet{
+				{
+					Integration: "integration",
+					Window:      dayWindows[0],
+					CloudCostItems: map[string]*CloudCostItem{
+						cciProperties1.Key(): {
+							Properties:   cciProperties1,
+							Window:       NewClosedWindow(start.Add(12*time.Hour), start.Add(24*time.Hour)),
+							IsKubernetes: true,
+							Cost:         50,
+							NetCost:      40,
+						},
+					},
+				},
+				{
+					Integration: "integration",
+					Window:      dayWindows[1],
+					CloudCostItems: map[string]*CloudCostItem{
+						cciProperties1.Key(): {
+							Properties:   cciProperties1,
+							Window:       NewClosedWindow(start.Add(24*time.Hour), start.Add(36*time.Hour)),
+							IsKubernetes: true,
+							Cost:         50,
+							NetCost:      40,
+						},
+					},
+				},
+				{
+					Integration:    "integration",
+					Window:         dayWindows[2],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+			},
+		},
+		"Load Single Day Off Grid Before Range Window": {
+			cci: []*CloudCostItem{
+				{
+					Properties:   cciProperties1,
+					Window:       NewClosedWindow(start.Add(-12*time.Hour), start.Add(12*time.Hour)),
+					IsKubernetes: true,
+					Cost:         100,
+					NetCost:      80,
+				},
+			},
+			ccisr: emtpyCCISR.Clone(),
+			expected: []*CloudCostItemSet{
+				{
+					Integration: "integration",
+					Window:      dayWindows[0],
+					CloudCostItems: map[string]*CloudCostItem{
+						cciProperties1.Key(): {
+							Properties:   cciProperties1,
+							Window:       NewClosedWindow(start, start.Add(12*time.Hour)),
+							IsKubernetes: true,
+							Cost:         50,
+							NetCost:      40,
+						},
+					},
+				},
+				{
+					Integration:    "integration",
+					Window:         dayWindows[1],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+				{
+					Integration:    "integration",
+					Window:         dayWindows[2],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+			},
+		},
+		"Load Single Day Off Grid After Range Window": {
+			cci: []*CloudCostItem{
+				{
+					Properties:   cciProperties1,
+					Window:       NewClosedWindow(end.Add(-12*time.Hour), end.Add(12*time.Hour)),
+					IsKubernetes: true,
+					Cost:         100,
+					NetCost:      80,
+				},
+			},
+			ccisr: emtpyCCISR.Clone(),
+			expected: []*CloudCostItemSet{
+				{
+					Integration:    "integration",
+					Window:         dayWindows[0],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+				{
+					Integration:    "integration",
+					Window:         dayWindows[1],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+				{
+					Integration: "integration",
+					Window:      dayWindows[2],
+					CloudCostItems: map[string]*CloudCostItem{
+						cciProperties1.Key(): {
+							Properties:   cciProperties1,
+							Window:       NewClosedWindow(end.Add(-12*time.Hour), end),
+							IsKubernetes: true,
+							Cost:         50,
+							NetCost:      40,
+						},
+					},
+				},
+			},
+		},
+		"Single Day Kubecost Percent": {
+			cci: []*CloudCostItem{
+				{
+					Properties:   cciProperties1,
+					Window:       dayWindows[1],
+					IsKubernetes: true,
+					Cost:         75,
+					NetCost:      60,
+				},
+				{
+					Properties:   cciProperties1,
+					Window:       dayWindows[1],
+					IsKubernetes: true,
+					Cost:         25,
+					NetCost:      20,
+				},
+			},
+			ccisr: emtpyCCISR.Clone(),
+			expected: []*CloudCostItemSet{
+				{
+					Integration:    "integration",
+					Window:         dayWindows[0],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+				{
+					Integration: "integration",
+					Window:      dayWindows[1],
+					CloudCostItems: map[string]*CloudCostItem{
+						cciProperties1.Key(): {
+							Properties:   cciProperties1,
+							Window:       dayWindows[1],
+							IsKubernetes: true,
+							Cost:         100,
+							NetCost:      80,
+						},
+					},
+				},
+				{
+					Integration:    "integration",
+					Window:         dayWindows[2],
+					CloudCostItems: map[string]*CloudCostItem{},
+				},
+			},
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			// load Cloud Cost Items
+			for _, cci := range tc.cci {
+				tc.ccisr.LoadCloudCostItem(cci)
+			}
+
+			if len(tc.ccisr.CloudCostItemSets) != len(tc.expected) {
+				t.Errorf("the CloudCostItemSetRanges did not have the expected length")
+			}
+
+			for i, ccis := range tc.ccisr.CloudCostItemSets {
+				if !ccis.Equal(tc.expected[i]) {
+					t.Errorf("CloudCostItemSet at index: %d did not match expected", i)
+				}
+			}
+		})
+	}
+
+}

+ 22 - 22
pkg/kubecost/kubecost_codecs.go

@@ -33,9 +33,6 @@ const (
 )
 
 const (
-	// DefaultCodecVersion is used for any resources listed in the Default version set
-	DefaultCodecVersion uint8 = 17
-
 	// AssetsCodecVersion is used for any resources listed in the Assets version set
 	AssetsCodecVersion uint8 = 18
 
@@ -46,10 +43,13 @@ const (
 	AuditCodecVersion uint8 = 1
 
 	// CloudCostAggregateCodecVersion is used for any resources listed in the CloudCostAggregate version set
-	CloudCostAggregateCodecVersion uint8 = 1
+	CloudCostAggregateCodecVersion uint8 = 2
 
 	// CloudCostItemCodecVersion is used for any resources listed in the CloudCostItem version set
-	CloudCostItemCodecVersion uint8 = 1
+	CloudCostItemCodecVersion uint8 = 2
+
+	// DefaultCodecVersion is used for any resources listed in the Default version set
+	DefaultCodecVersion uint8 = 17
 )
 
 //--------------------------------------------------------------------------
@@ -4715,7 +4715,7 @@ func (target *CloudCostAggregate) MarshalBinaryWithContext(ctx *EncodingContext)
 
 	buff.WriteFloat64(target.KubernetesPercent) // write float64
 	buff.WriteFloat64(target.Cost)              // write float64
-	buff.WriteFloat64(target.Credit)            // write float64
+	buff.WriteFloat64(target.NetCost)           // write float64
 	return nil
 }
 
@@ -4790,7 +4790,7 @@ func (target *CloudCostAggregate) UnmarshalBinaryWithContext(ctx *DecodingContex
 	target.Cost = c
 
 	d := buff.ReadFloat64() // read float64
-	target.Credit = d
+	target.NetCost = d
 
 	return nil
 }
@@ -4842,16 +4842,16 @@ func (target *CloudCostAggregateProperties) MarshalBinaryWithContext(ctx *Encodi
 		buff.WriteString(target.Provider) // write string
 	}
 	if ctx.IsStringTable() {
-		b := ctx.Table.AddOrGet(target.Account)
+		b := ctx.Table.AddOrGet(target.WorkGroupID)
 		buff.WriteInt(b) // write table index
 	} else {
-		buff.WriteString(target.Account) // write string
+		buff.WriteString(target.WorkGroupID) // write string
 	}
 	if ctx.IsStringTable() {
-		c := ctx.Table.AddOrGet(target.Project)
+		c := ctx.Table.AddOrGet(target.BillingID)
 		buff.WriteInt(c) // write table index
 	} else {
-		buff.WriteString(target.Project) // write string
+		buff.WriteString(target.BillingID) // write string
 	}
 	if ctx.IsStringTable() {
 		d := ctx.Table.AddOrGet(target.Service)
@@ -4940,7 +4940,7 @@ func (target *CloudCostAggregateProperties) UnmarshalBinaryWithContext(ctx *Deco
 		e = buff.ReadString() // read string
 	}
 	d := e
-	target.Account = d
+	target.WorkGroupID = d
 
 	var h string
 	if ctx.IsStringTable() {
@@ -4950,7 +4950,7 @@ func (target *CloudCostAggregateProperties) UnmarshalBinaryWithContext(ctx *Deco
 		h = buff.ReadString() // read string
 	}
 	g := h
-	target.Project = g
+	target.BillingID = g
 
 	var m string
 	if ctx.IsStringTable() {
@@ -5469,8 +5469,8 @@ func (target *CloudCostItem) MarshalBinaryWithContext(ctx *EncodingContext) (err
 	}
 	// --- [end][write][struct](Window) ---
 
-	buff.WriteFloat64(target.Cost)   // write float64
-	buff.WriteFloat64(target.Credit) // write float64
+	buff.WriteFloat64(target.Cost)    // write float64
+	buff.WriteFloat64(target.NetCost) // write float64
 	return nil
 }
 
@@ -5555,7 +5555,7 @@ func (target *CloudCostItem) UnmarshalBinaryWithContext(ctx *DecodingContext) (e
 	target.Cost = d
 
 	e := buff.ReadFloat64() // read float64
-	target.Credit = e
+	target.NetCost = e
 
 	return nil
 }
@@ -5613,16 +5613,16 @@ func (target *CloudCostItemProperties) MarshalBinaryWithContext(ctx *EncodingCon
 		buff.WriteString(target.Provider) // write string
 	}
 	if ctx.IsStringTable() {
-		c := ctx.Table.AddOrGet(target.Account)
+		c := ctx.Table.AddOrGet(target.WorkGroupID)
 		buff.WriteInt(c) // write table index
 	} else {
-		buff.WriteString(target.Account) // write string
+		buff.WriteString(target.WorkGroupID) // write string
 	}
 	if ctx.IsStringTable() {
-		d := ctx.Table.AddOrGet(target.Project)
+		d := ctx.Table.AddOrGet(target.BillingID)
 		buff.WriteInt(d) // write table index
 	} else {
-		buff.WriteString(target.Project) // write string
+		buff.WriteString(target.BillingID) // write string
 	}
 	if ctx.IsStringTable() {
 		e := ctx.Table.AddOrGet(target.Service)
@@ -5748,7 +5748,7 @@ func (target *CloudCostItemProperties) UnmarshalBinaryWithContext(ctx *DecodingC
 		h = buff.ReadString() // read string
 	}
 	g := h
-	target.Account = g
+	target.WorkGroupID = g
 
 	var m string
 	if ctx.IsStringTable() {
@@ -5758,7 +5758,7 @@ func (target *CloudCostItemProperties) UnmarshalBinaryWithContext(ctx *DecodingC
 		m = buff.ReadString() // read string
 	}
 	l := m
-	target.Project = l
+	target.BillingID = l
 
 	var p string
 	if ctx.IsStringTable() {

+ 9 - 4
pkg/kubecost/window.go

@@ -3,6 +3,7 @@ package kubecost
 import (
 	"bytes"
 	"fmt"
+	"github.com/opencost/opencost/pkg/log"
 	"math"
 	"regexp"
 	"strconv"
@@ -710,14 +711,18 @@ func (w Window) DurationOffsetStrings() (string, string) {
 //     pct :=  4.0 / 16.0 = 0.250 for window 1
 //     pct := 10.0 / 16.0 = 0.625 for window 2
 //     pct :=  2.0 / 16.0 = 0.125 for window 3
-func (w Window) GetPercentInWindow(itemStart time.Time, itemEnd time.Time) float64 {
+func (w Window) GetPercentInWindow(that Window) float64 {
+	if that.IsOpen() {
+		log.Errorf("Window: GetPercentInWindow: invalid window %s", that.String())
+		return 0
+	}
 
-	s := itemStart
+	s := *that.Start()
 	if s.Before(*w.Start()) {
 		s = *w.Start()
 	}
 
-	e := itemEnd
+	e := *that.End()
 	if e.After(*w.End()) {
 		e = *w.End()
 	}
@@ -727,7 +732,7 @@ func (w Window) GetPercentInWindow(itemStart time.Time, itemEnd time.Time) float
 		return 0.0
 	}
 
-	totalMins := itemEnd.Sub(itemStart).Minutes()
+	totalMins := that.Duration().Minutes()
 
 	pct := mins / totalMins
 	return pct

+ 2 - 1
pkg/kubecost/window_test.go

@@ -915,7 +915,8 @@ func TestWindow_GetPercentInWindow(t *testing.T) {
 	}
 	for name, tc := range testcases {
 		t.Run(name, func(t *testing.T) {
-			if actual := tc.window.GetPercentInWindow(tc.itemStart, tc.itemEnd); actual != tc.expected {
+			thatWindow := NewWindow(&tc.itemStart, &tc.itemEnd)
+			if actual := tc.window.GetPercentInWindow(thatWindow); actual != tc.expected {
 				t.Errorf("GetPercentInWindow() = %v, want %v", actual, tc.expected)
 			}
 		})

+ 11 - 0
pkg/util/maputil/maputil.go

@@ -0,0 +1,11 @@
+package maputil
+
+// Map applies a transformation function to each value within a map to get a new map containing the
+// transformed values.
+func Map[K comparable, V any, T any](m map[K]V, transform func(V) T) map[K]T {
+	result := make(map[K]T, len(m))
+	for k, v := range m {
+		result[k] = transform(v)
+	}
+	return result
+}

+ 11 - 0
pkg/util/sliceutil/sliceutil.go

@@ -0,0 +1,11 @@
+package sliceutil
+
+// Map accepts a slice of T and applies a transformation function to each index of a
+// slice, which are inserted into a new slice of type U.
+func Map[T any, U any](s []T, transform func(T) U) []U {
+	result := make([]U, len(s))
+	for i := 0; i < len(s); i++ {
+		result[i] = transform(s[i])
+	}
+	return result
+}