Преглед на файлове

Metrics Synthesis and Bingen WAL (#3726)

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Matt Bolt преди 3 седмици
родител
ревизия
2e70e0efc2

+ 18 - 6
core/pkg/exporter/encoder.go

@@ -30,12 +30,24 @@ type BinaryMarshalerPtr[T any] interface {
 
 // BingenEncoder[T, U] is a generic encoder that uses the BinaryMarshaler interface to encode data.
 // It supports any type T that implements the encoding.BinaryMarshaler interface.
-type BingenEncoder[T any, U BinaryMarshalerPtr[T]] struct{}
+type BingenEncoder[T any, U BinaryMarshalerPtr[T]] struct {
+	fileExt string
+}
 
 // NewBingenEncoder creates an `Encoder[T]` implementation which supports binary encoding for the `T`
-// type.
+// type, and doesn't have a file extension.
 func NewBingenEncoder[T any, U BinaryMarshalerPtr[T]]() Encoder[T] {
-	return new(BingenEncoder[T, U])
+	return &BingenEncoder[T, U]{
+		fileExt: "",
+	}
+}
+
+// NewBingenFileEncoder creates a new `Encoder[T]` implementation which supports binary encoding for the
+// 'T' type with the ".bingen" file extension.
+func NewBingenFileEncoder[T any, U BinaryMarshalerPtr[T]]() Encoder[T] {
+	return &BingenEncoder[T, U]{
+		fileExt: "bingen",
+	}
 }
 
 // Encode encodes the provided data of type T into a byte slice using the BinaryMarshaler interface.
@@ -44,10 +56,10 @@ func (b *BingenEncoder[T, U]) Encode(data *T) ([]byte, error) {
 	return bingenData.MarshalBinary()
 }
 
-// FileExt returns the file extension for the encoded data. In this case, it returns an empty string
-// to indicate that there is no specific file extension for the binary encoded data.
+// FileExt returns the configured file extension for the encoded data. This may be an empty
+// string when no file extension is configured, or a non-empty value such as "bingen".
 func (b *BingenEncoder[T, U]) FileExt() string {
-	return ""
+	return b.fileExt
 }
 
 // JSONEncoder[T] is a generic encoder that uses the JSON encoding format to encode data.

+ 38 - 0
core/pkg/filter/text/text_test.go

@@ -0,0 +1,38 @@
+package text
+
+import (
+	"testing"
+
+	"github.com/opencost/opencost/core/pkg/filter/allocation"
+	"github.com/opencost/opencost/core/pkg/filter/ast"
+	"github.com/opencost/opencost/core/pkg/filter/transform"
+)
+
+// no compiler passes applied for comparisons
+var allocCompiler = NewTextCompiler()
+
+// This can be used to determine the rewritten filter after filters.
+var _allocCompilerWithPasses = NewTextCompiler(
+	transform.PrometheusKeySanitizePass(),
+	transform.UnallocatedReplacementPass(),
+)
+
+// AST parser for allocation syntax
+var allocParser ast.FilterParser = allocation.NewAllocationFilterParser()
+
+func TestThings(t *testing.T) {
+	filter := `namespace:"kubecost" + label[app]:"cost_analyzer" + annotation[a1]:"b2" + cluster:"cluster-one" + node!:"node-123","node-456" + controllerName:"kubecost-cost-analyzer","kubecost-prometheus-server" + controllerKind!:"daemonset","statefulset","job" + container!:"123-abc_foo" + pod!:"aaaaaaaaaaaaaaaaaaaaaaaaa" + services!~:"abc123"`
+
+	tree, err := allocParser.Parse(filter)
+	if err != nil {
+		t.Fatalf("Unexpected parse error: %s", err)
+	}
+	t.Logf("%s", ast.ToPreOrderString(tree))
+
+	result, err := allocCompiler.Compile(tree)
+	t.Logf("Result: %s", result)
+
+	if result != filter {
+		t.Fatalf("Expected original filter:\n%s\nto match string compiled filter:\n%s\n", filter, result)
+	}
+}

+ 355 - 0
core/pkg/filter/text/textcompiler.go

@@ -0,0 +1,355 @@
+package text
+
+import (
+	"fmt"
+	"slices"
+	"strings"
+
+	"github.com/opencost/opencost/core/pkg/filter/ast"
+	"github.com/opencost/opencost/core/pkg/filter/transform"
+	"github.com/opencost/opencost/core/pkg/filter/util"
+)
+
+// TextCompiler is a filter compiler implementation that will compile the filter AST back
+// into the filter query text.
+type TextCompiler struct {
+	passes []transform.CompilerPass
+}
+
+// NewTextCompiler creates a new TextCompiler instance that will compile the filter AST
+// back into the filter query text, after running all pre-compile transformations.
+func NewTextCompiler(passes ...transform.CompilerPass) *TextCompiler {
+	return &TextCompiler{
+		passes: passes,
+	}
+}
+
+// Compile accepts an `ast.FilterNode` tree and builds out the filter text that was used to
+// build the tree in the first place.
+func (tc *TextCompiler) Compile(filter ast.FilterNode) (string, error) {
+	// apply compiler passes on parsed ast
+	var err error
+	filter, err = transform.ApplyAll(filter, tc.passes)
+	if err != nil {
+		return "", fmt.Errorf("applying compiler passes: %w", err)
+	}
+
+	// if the root node is a void op, empty filter
+	if _, ok := filter.(*ast.VoidOp); ok {
+		return "", nil
+	}
+
+	var isContradictionOp bool
+	var result TextOp
+	var currentOps *util.Stack[TextGroupOp] = util.NewStack[TextGroupOp]()
+
+	// handle leaf is the ast walker func. group ops get pushed onto a stack on
+	// the Enter state, and popped on the Exit state. Any ops between Enter and
+	// Exit are added to the group. If there are no more groups on the stack after
+	// an Exit state, we set the result to the final group.
+	handleLeaf := func(leaf ast.FilterNode, state ast.TraversalState) {
+		switch n := leaf.(type) {
+		case *ast.AndOp:
+			if state == ast.TraversalStateEnter {
+				currentOps.Push(newGroupOp("+"))
+			} else if state == ast.TraversalStateExit {
+				if currentOps.Length() > 1 {
+					current := currentOps.Pop().Close()
+					currentOps.Top().Add(current)
+				} else {
+					result = currentOps.Pop().Close()
+				}
+			}
+		case *ast.OrOp:
+			if state == ast.TraversalStateEnter {
+				currentOps.Push(newGroupOp("|"))
+			} else if state == ast.TraversalStateExit {
+				if currentOps.Length() > 1 {
+					current := currentOps.Pop().Close()
+					currentOps.Top().Add(current)
+				} else {
+					result = currentOps.Pop().Close()
+				}
+			}
+
+		case *ast.NotOp:
+			if state == ast.TraversalStateEnter {
+				currentOps.Push(newNotOp())
+			} else if state == ast.TraversalStateExit {
+				if currentOps.Length() > 1 {
+					current := currentOps.Pop().Close()
+					currentOps.Top().Add(current)
+				} else {
+					result = currentOps.Pop().Close()
+				}
+			}
+		// Special case here, these can only be created programmatically and
+		// don't have a filter variant, but we will represent it as a special
+		// string at the end of the compile action
+		case *ast.ContradictionOp:
+			isContradictionOp = true
+			if currentOps.Length() == 0 {
+				result = NoOp
+			} else {
+				currentOps.Top().Add(NoOp)
+			}
+		case *ast.EqualOp:
+			op := newComparisonOp(":", n.Left, n.Right)
+			if currentOps.Length() == 0 {
+				result = op
+			} else {
+				currentOps.Top().Add(op)
+			}
+
+		case *ast.ContainsOp:
+			op := newComparisonOp("~:", n.Left, n.Right)
+			if currentOps.Length() == 0 {
+				result = op
+			} else {
+				currentOps.Top().Add(op)
+			}
+
+		case *ast.ContainsPrefixOp:
+			op := newComparisonOp("<~:", n.Left, n.Right)
+			if currentOps.Length() == 0 {
+				result = op
+			} else {
+				currentOps.Top().Add(op)
+			}
+
+		case *ast.ContainsSuffixOp:
+			op := newComparisonOp("~>:", n.Left, n.Right)
+			if currentOps.Length() == 0 {
+				result = op
+			} else {
+				currentOps.Top().Add(op)
+			}
+		}
+	}
+
+	ast.PreOrderTraversal(filter, handleLeaf)
+
+	// if we discover a contradiction op, we reject all inputs
+	// this isn't able to be expressed via a filter string
+	if isContradictionOp {
+		return "[all-fail]", nil
+	}
+
+	if result == nil {
+		return "", nil
+	}
+
+	// for group ops, trim the root level parens
+	strResult := result.String()
+	if rootOp, ok := result.(*GroupOp); ok {
+		if rootOp.Symbol == "|" || rootOp.Symbol == "+" {
+			strResult = strResult[1 : len(strResult)-1]
+		}
+	}
+
+	return strResult, nil
+}
+
+//--------------------------------------------------------------------------
+//  TextOp Abstractions
+//--------------------------------------------------------------------------
+
+// TextOp is just a basic operation that we will generate a string to represent the recreation of the filter from
+// the AST.
+type TextOp interface {
+	String() string
+}
+
+// TextGroupOp is a grouping operation like and, or, or not.
+type TextGroupOp interface {
+	TextOp
+
+	// Add appends a new operation to the group
+	Add(TextOp)
+
+	// Close collapses any inline reductions to the negation or multi-compare operations
+	Close() TextOp
+}
+
+//--------------------------------------------------------------------------
+//  Ops
+//--------------------------------------------------------------------------
+
+const NoOp ContradictionOp = ContradictionOp("")
+
+// ContradictionOp implementation for a filter all operation
+type ContradictionOp string
+
+func (no ContradictionOp) String() string { return "" }
+
+// And/Or grouping
+type GroupOp struct {
+	Symbol string
+	Ops    []TextOp
+}
+
+// creates a new grouping operation with the op symbol
+func newGroupOp(symbol string) *GroupOp {
+	return &GroupOp{
+		Symbol: symbol,
+	}
+}
+
+// Add appends a text op as part of the group
+func (a *GroupOp) Add(m TextOp) {
+	// this merges identical comparisons - this is a bit of a pre-optimization for Close()
+	// that combines ie: (foo:"bar" + foo:"baz") into (foo:"bar","baz")
+	if compOp, ok := m.(*ComparisonOp); ok {
+		sym := compOp.Symbol
+		left := compOp.Left
+		r := compOp.Right
+
+		// look for identical symbol and identifiers, also ensure there isn't a repeat
+		// value.
+		for _, op := range a.Ops {
+			if currOp, ook := op.(*ComparisonOp); ook {
+				if currOp.Symbol != sym || !currOp.Left.Equal(left) || currOp.Right == r {
+					continue
+				}
+				if slices.Contains(currOp.Other, r) {
+					continue
+				}
+
+				// found a match, merge comparison operands
+				currOp.Other = append(currOp.Other, r)
+				return
+			}
+		}
+	}
+
+	a.Ops = append(a.Ops, m)
+}
+
+func (a *GroupOp) Close() TextOp {
+	if len(a.Ops) == 1 {
+		return a.Ops[0]
+	}
+
+	return a
+}
+
+// generates the group op using the provided symbol
+func (a *GroupOp) String() string {
+	return writeGroupOp(a.Symbol, a.Ops...)
+}
+
+// ComparisonOp is your standard boolean expression used in the filters we need to
+// express as merely a symbol and operands.
+type ComparisonOp struct {
+	Symbol string
+	Left   ast.Identifier
+	Right  string
+	Other  []string
+}
+
+// creates a new comparison op with a symbol, identifier, and value.
+func newComparisonOp(symbol string, left ast.Identifier, right string) *ComparisonOp {
+	return &ComparisonOp{
+		Symbol: symbol,
+		Left:   left,
+		Right:  right,
+	}
+}
+
+func (a *ComparisonOp) String() string {
+	return writeOp(a.Symbol, a.Left, a.Right, a.Other...)
+}
+
+// NotOp is a negation that contains a single op to negate.
+type NotOp struct {
+	Op TextOp
+}
+
+func newNotOp() *NotOp {
+	return new(NotOp)
+}
+
+func (a *NotOp) Add(m TextOp) {
+	a.Op = m
+}
+
+func (a *NotOp) Close() TextOp {
+	if a.Op == nil {
+		return a
+	}
+
+	switch innerOp := a.Op.(type) {
+	case *GroupOp:
+		return a
+	case *ComparisonOp:
+		merged := newComparisonOp("!"+innerOp.Symbol, innerOp.Left, innerOp.Right)
+		merged.Other = innerOp.Other
+		return merged
+	}
+
+	return a
+}
+
+// Because our tree will treat 'foo !: bar' as '!(foo : bar)' we can easily convert back into the originating negation
+// depending on the inner op by prepending a '!'
+func (a *NotOp) String() string {
+	if a.Op == nil {
+		return ""
+	}
+
+	switch innerOp := a.Op.(type) {
+	case *GroupOp:
+		return "!" + writeGroupOp("", innerOp)
+	case *ComparisonOp:
+		merged := newComparisonOp("!"+innerOp.Symbol, innerOp.Left, innerOp.Right)
+		merged.Other = innerOp.Other
+		return merged.String()
+	}
+
+	return ""
+}
+
+//--------------------------------------------------------------------------
+//  Helpers
+//--------------------------------------------------------------------------
+
+// helper function that writes all of the provided operands with a joining
+// operation symbol
+func writeGroupOp(op string, operands ...TextOp) string {
+	if len(operands) == 0 {
+		return ""
+	}
+	if len(operands) == 1 {
+		return operands[0].String()
+	}
+
+	sep := fmt.Sprintf(" %s ", op)
+
+	var sb strings.Builder
+	sb.WriteRune('(')
+	sb.WriteString(operands[0].String())
+	for _, f := range operands[1:] {
+		sb.WriteString(sep)
+		sb.WriteString(f.String())
+	}
+	sb.WriteRune(')')
+
+	return sb.String()
+}
+
+// helper function to generate a basic comparison operation
+func writeOp(op string, left ast.Identifier, right string, additional ...string) string {
+	var sb strings.Builder
+	sb.WriteString(left.String())
+	sb.WriteString(op)
+	sb.WriteRune('"')
+	sb.WriteString(right)
+	sb.WriteRune('"')
+	for _, other := range additional {
+		sb.WriteRune(',')
+		sb.WriteRune('"')
+		sb.WriteString(other)
+		sb.WriteRune('"')
+	}
+	return sb.String()
+}

+ 4 - 4
modules/collector-source/pkg/collector/collector.go

@@ -331,7 +331,7 @@ func NewLocalStorageBytesMetricCollector() *metric.MetricCollector {
 
 // count(
 //
-//	node_total_hourly_cost{
+//	kube_node_labels{
 //		<some_custom_filter>
 //	}
 //
@@ -339,7 +339,7 @@ func NewLocalStorageBytesMetricCollector() *metric.MetricCollector {
 func NewLocalStorageActiveMinutesMetricCollector() *metric.MetricCollector {
 	return metric.NewMetricCollector(
 		metric.LocalStorageActiveMinutesID,
-		metric.NodeTotalHourlyCost,
+		metric.KubeNodeLabels,
 		[]string{
 			source.NodeLabel,
 			source.ProviderIDLabel,
@@ -477,7 +477,7 @@ func NewNodeLabelsMetricCollector() *metric.MetricCollector {
 }
 
 //	avg(
-//		node_total_hourly_cost{
+//		kube_node_labels{
 //			<some_custom_filter>
 //		}
 //	) by (node, cluster_id, provider_id)[%s:%dm]
@@ -485,7 +485,7 @@ func NewNodeLabelsMetricCollector() *metric.MetricCollector {
 func NewNodeActiveMinutesMetricCollector() *metric.MetricCollector {
 	return metric.NewMetricCollector(
 		metric.NodeActiveMinutesID,
-		metric.NodeTotalHourlyCost,
+		metric.KubeNodeLabels,
 		[]string{
 			source.NodeLabel,
 			source.ProviderIDLabel,

+ 11 - 1
modules/collector-source/pkg/collector/datasource.go

@@ -13,6 +13,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/storage"
 	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric/synthetic"
 	"github.com/opencost/opencost/modules/collector-source/pkg/scrape"
 	"github.com/opencost/opencost/modules/collector-source/pkg/util"
 )
@@ -81,6 +82,15 @@ func NewCollectorDataSource(
 		}
 	}
 
+	// synthesizer collects specific metric types and generates new metrics to pass
+	// along with the original metrics into the updater
+	metricSynthesizer := synthetic.NewMetricSynthesizers(
+		updater,
+		synthetic.NewContainerMemoryAllocationSynthesizer(),
+		synthetic.NewContainerCpuAllocationSynthesizer(),
+	)
+	updater = metricSynthesizer
+
 	diagnosticsModule := metric.NewDiagnosticsModule()
 	scrapeController := scrape.NewScrapeController(
 		config.ClusterUID,
@@ -110,7 +120,7 @@ func NewCollectorDataSource(
 }
 
 func (c *collectorDataSource) RegisterEndPoints(router *httprouter.Router) {
-	return
+
 }
 
 func (c *collectorDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {

+ 3 - 0
modules/collector-source/pkg/collector/metricsquerier_test.go

@@ -124,6 +124,9 @@ func GetMockCollectorProvider() StoreProvider {
 		source.ServiceLabel:    "service2",
 	}
 
+	collector.Update(metric.KubeNodeLabels, node1Info, 0, start, nil)
+	collector.Update(metric.KubeNodeLabels, node1Info, 0, end, nil)
+
 	collector.Update(metric.NodeTotalHourlyCost, node1Info, 0, start, nil)
 	collector.Update(metric.NodeTotalHourlyCost, node1Info, 0, end, nil)
 

+ 3 - 1
modules/collector-source/pkg/metric/aggregator/uptime.go

@@ -32,8 +32,10 @@ func (a *uptimeAggregator) Update(value float64, timestamp time.Time, additional
 	defer a.lock.Unlock()
 	if a.start == nil {
 		a.start = &timestamp
+		return
 	}
-	if !timestamp.Equal(*a.start) {
+
+	if timestamp.After(*a.start) {
 		a.end = &timestamp
 	}
 }

+ 6 - 0
modules/collector-source/pkg/metric/bingen.go

@@ -0,0 +1,6 @@
+package metric
+
+// @bingen:generate[streamable,stringtable]:UpdateSet
+// @bingen:generate:Update
+
+//go:generate bingen -package=metric -version=1 -buffer=github.com/opencost/opencost/core/pkg/util

+ 1123 - 0
modules/collector-source/pkg/metric/metric_codecs.go

@@ -0,0 +1,1123 @@
+////////////////////////////////////////////////////////////////////////////////
+//
+//                             DO NOT MODIFY
+//
+//                          ┻━┻ ︵ヽ(`Д´)ノ︵ ┻━┻
+//
+//
+//            This source file was automatically generated by bingen.
+//
+////////////////////////////////////////////////////////////////////////////////
+
+package metric
+
+import (
+	"fmt"
+	"io"
+	"iter"
+	"os"
+	"reflect"
+	"strings"
+	"sync"
+	"time"
+	"unsafe"
+
+	util "github.com/opencost/opencost/core/pkg/util"
+)
+
+const (
+	// GeneratorPackageName is the package the generator is targetting
+	GeneratorPackageName string = "metric"
+)
+
+// BinaryTags represent the formatting tag used for specific optimization features
+const (
+	// BinaryTagStringTable is written and/or read prior to the existence of a string
+	// table (where each index is encoded as a string entry in the resource
+	BinaryTagStringTable string = "BGST"
+)
+
+const (
+	// DefaultCodecVersion is used for any resources listed in the Default version set
+	DefaultCodecVersion uint8 = 1
+)
+
+//--------------------------------------------------------------------------
+//  Configuration
+//--------------------------------------------------------------------------
+
+var (
+	bingenConfigLock sync.RWMutex
+	bingenConfig     *BingenConfiguration = DefaultBingenConfiguration()
+)
+
+// BingenConfiguration is used to set any custom configuration in the way files are encoded
+// or decoded.
+type BingenConfiguration struct {
+	// FileBackedStringTableEnabled enables the use of file-backed string tables for streaming
+	// bingen decoding.
+	FileBackedStringTableEnabled bool
+
+	// FileBackedStringTableDir is the directory to write the string table files for reading.
+	FileBackedStringTableDir string
+}
+
+// DefaultBingenConfiguration creates the default implementation of the bingen configuration
+// and returns it.
+func DefaultBingenConfiguration() *BingenConfiguration {
+	return &BingenConfiguration{
+		FileBackedStringTableEnabled: false,
+		FileBackedStringTableDir:     os.TempDir(),
+	}
+}
+
+// ConfigureBingen accepts a new *BingenConfiguration instance which updates the internal decoder
+// and encoder behavior.
+func ConfigureBingen(config *BingenConfiguration) {
+	bingenConfigLock.Lock()
+	defer bingenConfigLock.Unlock()
+
+	if config == nil {
+		config = DefaultBingenConfiguration()
+	}
+	bingenConfig = config
+}
+
+// IsBingenFileBackedStringTableEnabled accessor for file backed string table configuration
+func IsBingenFileBackedStringTableEnabled() bool {
+	bingenConfigLock.RLock()
+	defer bingenConfigLock.RUnlock()
+
+	return bingenConfig.FileBackedStringTableEnabled
+}
+
+// BingenFileBackedStringTableDir returns the directory configured for file backed string tables.
+func BingenFileBackedStringTableDir() string {
+	bingenConfigLock.RLock()
+	defer bingenConfigLock.RUnlock()
+
+	return bingenConfig.FileBackedStringTableDir
+}
+
+//--------------------------------------------------------------------------
+//  Type Map
+//--------------------------------------------------------------------------
+
+// Generated type map for resolving interface implementations to
+// to concrete types
+var typeMap map[string]reflect.Type = map[string]reflect.Type{
+	"Update":    reflect.TypeFor[Update](),
+	"UpdateSet": reflect.TypeFor[UpdateSet](),
+}
+
+//--------------------------------------------------------------------------
+//  Type Helpers
+//--------------------------------------------------------------------------
+
+// isBinaryTag returns true when the first bytes in the provided binary matches the tag
+func isBinaryTag(data []byte, tag string) bool {
+	if len(data) < len(tag) {
+		return false
+	}
+
+	return string(data[:len(tag)]) == tag
+}
+
+// isReaderBinaryTag is used to peek the header for an io.Reader Buffer
+func isReaderBinaryTag(buff *util.Buffer, tag string) bool {
+	data, err := buff.Peek(len(tag))
+	if err != nil && err != io.EOF {
+		panic(fmt.Sprintf("called Peek() on a non buffered reader: %s", err))
+	}
+	if len(data) < len(tag) {
+		return false
+	}
+
+	return string(data[:len(tag)]) == tag
+}
+
+// appendBytes combines a and b into a new byte array
+func appendBytes(a []byte, b []byte) []byte {
+	al := len(a)
+	bl := len(b)
+	tl := al + bl
+
+	// allocate a new byte array for the combined
+	// use native copy for speedy byte copying
+	result := make([]byte, tl)
+	copy(result, a)
+	copy(result[al:], b)
+
+	return result
+}
+
+// typeToString determines the basic properties of the type, the qualifier, package path, and
+// type name, and returns the qualified type
+func typeToString(f interface{}) string {
+	qual := ""
+	t := reflect.TypeOf(f)
+	if t.Kind() == reflect.Ptr {
+		t = t.Elem()
+		qual = "*"
+	}
+
+	return fmt.Sprintf("%s%s.%s", qual, t.PkgPath(), t.Name())
+}
+
+// resolveType uses the name of a type and returns the package, base type name, and whether
+// or not it's a pointer.
+func resolveType(t string) (pkg string, name string, isPtr bool) {
+	isPtr = t[:1] == "*"
+	if isPtr {
+		t = t[1:]
+	}
+
+	slashIndex := strings.LastIndex(t, "/")
+	if slashIndex >= 0 {
+		t = t[slashIndex+1:]
+	}
+	parts := strings.Split(t, ".")
+	if parts[0] == GeneratorPackageName {
+		parts[0] = ""
+	}
+
+	pkg = parts[0]
+	name = parts[1]
+	return
+}
+
+//--------------------------------------------------------------------------
+//  Stream Helpers
+//--------------------------------------------------------------------------
+
+// StreamFactoryFunc is an alias for a func that creates a BingenStream implementation.
+type StreamFactoryFunc func(io.Reader) BingenStream
+
+// Generated streamable factory map for finding the specific new stream methods
+// by T type
+var streamFactoryMap map[reflect.Type]StreamFactoryFunc = map[reflect.Type]StreamFactoryFunc{
+	reflect.TypeFor[UpdateSet](): NewUpdateSetStream,
+}
+
+// NewStreamFor accepts an io.Reader, and returns a new BingenStream for the generic T
+// type provided _if_ it is a registered bingen type that is annotated as 'streamable'. See
+// the streamFactoryMap for generated type listings.
+func NewStreamFor[T any](reader io.Reader) (BingenStream, error) {
+	typeKey := reflect.TypeFor[T]()
+
+	factory, ok := streamFactoryMap[typeKey]
+	if !ok {
+		return nil, fmt.Errorf("the type: %s is not a registered bingen streamable type", typeKey.Name())
+	}
+
+	return factory(reader), nil
+}
+
+// BingenStream is the stream interface for all streamable types
+type BingenStream interface {
+	// Stream returns the iterator which will stream each field of the target type and
+	// return the field info as well as the value.
+	Stream() iter.Seq2[BingenFieldInfo, *BingenValue]
+
+	// Close will close any dynamic io.Reader used to stream in the fields
+	Close()
+
+	// Error returns an error if one occurred during the process of streaming the type's fields.
+	// This can be checked after iterating through the Stream().
+	Error() error
+}
+
+// BingenValue contains the value of a field as well as any index/key associated with that value.
+type BingenValue struct {
+	Value any
+	Index any
+}
+
+// IsNil is just a method accessor way to check to see if the value returned was nil
+func (bv *BingenValue) IsNil() bool {
+	return bv == nil
+}
+
+// creates a single BingenValue instance without a key or index
+func singleV(value any) *BingenValue {
+	return &BingenValue{
+		Value: value,
+	}
+}
+
+// creates a pair of key/index and value.
+func pairV(index any, value any) *BingenValue {
+	return &BingenValue{
+		Value: value,
+		Index: index,
+	}
+}
+
+// BingenFieldInfo contains the type of the field being streamed as well as the name of the field.
+type BingenFieldInfo struct {
+	Type reflect.Type
+	Name string
+}
+
+//--------------------------------------------------------------------------
+//  String Table Writer
+//--------------------------------------------------------------------------
+
+// StringTableWriter maps strings to specific indices for encoding
+type StringTableWriter struct {
+	l       sync.Mutex
+	indices map[string]int
+	next    int
+}
+
+// NewStringTableWriter Creates a new StringTableWriter instance with provided contents
+func NewStringTableWriter(contents ...string) *StringTableWriter {
+	st := &StringTableWriter{
+		indices: make(map[string]int, len(contents)),
+		next:    len(contents),
+	}
+
+	for i, entry := range contents {
+		st.indices[entry] = i
+	}
+
+	return st
+}
+
+// AddOrGet atomically retrieves a string entry's index if it exist. Otherwise, it will
+// add the entry and return the index.
+func (st *StringTableWriter) AddOrGet(s string) int {
+	st.l.Lock()
+	defer st.l.Unlock()
+
+	if ind, ok := st.indices[s]; ok {
+		return ind
+	}
+
+	current := st.next
+	st.next++
+
+	st.indices[s] = current
+	return current
+}
+
+// ToSlice Converts the contents to a string array for encoding.
+func (st *StringTableWriter) ToSlice() []string {
+	st.l.Lock()
+	defer st.l.Unlock()
+
+	if st.next == 0 {
+		return []string{}
+	}
+
+	sl := make([]string, st.next)
+	for s, i := range st.indices {
+		sl[i] = s
+	}
+	return sl
+}
+
+// ToBytes Converts the contents to a binary encoded representation
+func (st *StringTableWriter) ToBytes() []byte {
+	buff := util.NewBuffer()
+	buff.WriteBytes([]byte(BinaryTagStringTable)) // bingen table header
+
+	strs := st.ToSlice()
+
+	buff.WriteInt(len(strs)) // table length
+	for _, s := range strs {
+		buff.WriteString(s)
+	}
+
+	return buff.Bytes()
+}
+
+//--------------------------------------------------------------------------
+//  String Table Reader
+//--------------------------------------------------------------------------
+
+// StringTableReader is the interface used to read the string table from the decoding.
+type StringTableReader interface {
+	// At returns the string entry at a specific index, or panics on out of bounds.
+	At(index int) string
+
+	// Len returns the total number of strings loaded in the string table.
+	Len() int
+
+	// Close will clear the loaded table, and drop any external resources used.
+	Close() error
+}
+
+// SliceStringTableReader is a basic pre-loaded []string that provides index-based access.
+// The cost of this implementation is holding all strings in memory, which provides faster
+// lookup performance for memory usage.
+type SliceStringTableReader struct {
+	table []string
+}
+
+// NewSliceStringTableReaderFrom creates a new SliceStringTableReader instance loading
+// data directly from the buffer. The buffer's position should start at the table length.
+func NewSliceStringTableReaderFrom(buffer *util.Buffer) StringTableReader {
+	// table length
+	tl := buffer.ReadInt()
+
+	var table []string
+	if tl > 0 {
+		table = make([]string, tl)
+		for i := range tl {
+			table[i] = buffer.ReadString()
+		}
+	}
+
+	return &SliceStringTableReader{
+		table: table,
+	}
+}
+
+// At returns the string entry at a specific index, or panics on out of bounds.
+func (sstr *SliceStringTableReader) At(index int) string {
+	if index < 0 || index >= len(sstr.table) {
+		panic(fmt.Errorf("%s: string table index out of bounds: %d", GeneratorPackageName, index))
+	}
+
+	return sstr.table[index]
+}
+
+// Len returns the total number of strings loaded in the string table.
+func (sstr *SliceStringTableReader) Len() int {
+	if sstr == nil {
+		return 0
+	}
+
+	return len(sstr.table)
+}
+
+// Close for the slice tables just nils out the slice and returns
+func (sstr *SliceStringTableReader) Close() error {
+	sstr.table = nil
+	return nil
+}
+
+// fileStringRef maps a bingen string-table index to a payload stored in a temp file.
+type fileStringRef struct {
+	off    int64
+	length int
+}
+
+// FileStringTableReader leverages a local file to write string table data for lookup. On
+// memory focused systems, this allows a slower parse with a significant decrease in memory
+// usage. This implementation is often pair with streaming readers for high throughput with
+// reduced memory usage.
+type FileStringTableReader struct {
+	f    *os.File
+	refs []fileStringRef
+}
+
+// NewFileStringTableFromBuffer reads exactly tl length-prefixed (uint16) string payloads from buffer
+// and appends each payload to a new temp file. It does not retain full strings in memory.
+func NewFileStringTableReaderFrom(buffer *util.Buffer, dir string) StringTableReader {
+	// helper func to cast a string in-place to a byte slice.
+	// NOTE: Return value is READ-ONLY. DO NOT MODIFY!
+	byteSliceFor := func(s string) []byte {
+		return unsafe.Slice(unsafe.StringData(s), len(s))
+	}
+
+	err := os.MkdirAll(dir, 0755)
+	if err != nil {
+		panic(fmt.Errorf("%s: failed to create string table directory: %w", GeneratorPackageName, err))
+	}
+
+	f, err := os.CreateTemp(dir, fmt.Sprintf("%s-bgst-*", GeneratorPackageName))
+	if err != nil {
+		panic(fmt.Errorf("%s: failed to create string table file: %w", GeneratorPackageName, err))
+	}
+
+	var writeErr error
+	defer func() {
+		if writeErr != nil {
+			_ = f.Close()
+		}
+	}()
+
+	// table length
+	tl := buffer.ReadInt()
+
+	var refs []fileStringRef
+	if tl > 0 {
+		refs = make([]fileStringRef, tl)
+
+		for i := range tl {
+			payload := byteSliceFor(buffer.ReadString())
+
+			var off int64
+			if len(payload) > 0 {
+				off, err = f.Seek(0, io.SeekEnd)
+				if err != nil {
+					writeErr = fmt.Errorf("%s: failed to seek string table file: %w", GeneratorPackageName, err)
+					panic(writeErr)
+				}
+				if _, err := f.Write(payload); err != nil {
+					writeErr = fmt.Errorf("%s: failed to write string table entry %d: %w", GeneratorPackageName, i, err)
+					panic(writeErr)
+				}
+			}
+
+			refs[i] = fileStringRef{
+				off:    off,
+				length: len(payload),
+			}
+		}
+	}
+
+	return &FileStringTableReader{
+		f:    f,
+		refs: refs,
+	}
+}
+
+// At returns the string from the internal file using the reference's offset and length.
+func (fstr *FileStringTableReader) At(index int) string {
+	if fstr == nil || fstr.f == nil {
+		panic(fmt.Errorf("%s: failed to read file string table data", GeneratorPackageName))
+	}
+	if index < 0 || index >= len(fstr.refs) {
+		panic(fmt.Errorf("%s: string table index out of bounds: %d", GeneratorPackageName, index))
+	}
+
+	ref := fstr.refs[index]
+	if ref.length == 0 {
+		return ""
+	}
+
+	b := make([]byte, ref.length)
+	_, err := fstr.f.ReadAt(b, ref.off)
+	if err != nil {
+		return ""
+	}
+
+	// cast the allocated bytes to a string in-place, as we
+	// were the ones that allocated the bytes
+	return unsafe.String(unsafe.SliceData(b), len(b))
+}
+
+// Len returns the total number of strings loaded in the string table.
+func (fstr *FileStringTableReader) Len() int {
+	if fstr == nil {
+		return 0
+	}
+
+	return len(fstr.refs)
+}
+
+// Close for the file string table reader closes the file and deletes it.
+func (fstr *FileStringTableReader) Close() error {
+	if fstr == nil || fstr.f == nil {
+		return nil
+	}
+
+	path := fstr.f.Name()
+	err := fstr.f.Close()
+	fstr.f = nil
+	fstr.refs = nil
+
+	if path != "" {
+		_ = os.Remove(path)
+	}
+
+	return err
+}
+
+//--------------------------------------------------------------------------
+//  Codec Context
+//--------------------------------------------------------------------------
+
+// EncodingContext is a context object passed to the encoders to ensure reuse of buffer
+// and table data
+type EncodingContext struct {
+	Buffer *util.Buffer
+	Table  *StringTableWriter
+}
+
+// IsStringTable returns true if the table is available
+func (ec *EncodingContext) IsStringTable() bool {
+	return ec.Table != nil
+}
+
+// DecodingContext is a context object passed to the decoders to ensure parent objects
+// reuse as much data as possible
+type DecodingContext struct {
+	Buffer *util.Buffer
+	Table  StringTableReader
+}
+
+// NewDecodingContextFromBytes creates a new DecodingContext instance using an byte slice
+func NewDecodingContextFromBytes(data []byte) *DecodingContext {
+	var table StringTableReader
+
+	buff := util.NewBufferFromBytes(data)
+
+	// string table header validation
+	if isBinaryTag(data, BinaryTagStringTable) {
+		buff.ReadBytes(len(BinaryTagStringTable)) // strip tag length
+
+		// always use a slice string table with a byte array since the
+		// data is already in memory
+		table = NewSliceStringTableReaderFrom(buff)
+	}
+
+	return &DecodingContext{
+		Buffer: buff,
+		Table:  table,
+	}
+}
+
+// NewDecodingContextFromReader creates a new DecodingContext instance using an io.Reader
+// implementation
+func NewDecodingContextFromReader(reader io.Reader) *DecodingContext {
+	var table StringTableReader
+
+	buff := util.NewBufferFromReader(reader)
+
+	if isReaderBinaryTag(buff, BinaryTagStringTable) {
+		buff.ReadBytes(len(BinaryTagStringTable)) // strip tag length
+
+		// create correct string table implementation
+		if IsBingenFileBackedStringTableEnabled() {
+			table = NewFileStringTableReaderFrom(buff, BingenFileBackedStringTableDir())
+		} else {
+			table = NewSliceStringTableReaderFrom(buff)
+		}
+	}
+
+	return &DecodingContext{
+		Buffer: buff,
+		Table:  table,
+	}
+}
+
+// IsStringTable returns true if the table is available
+func (dc *DecodingContext) IsStringTable() bool {
+	return dc.Table != nil && dc.Table.Len() > 0
+}
+
+// Close will ensure that any string table resources and buffer resources are
+// cleaned up.
+func (dc *DecodingContext) Close() {
+	if dc.Table != nil {
+		_ = dc.Table.Close()
+		dc.Table = nil
+	}
+}
+
+//--------------------------------------------------------------------------
+//  Binary Codec
+//--------------------------------------------------------------------------
+
+// BinEncoder is an encoding interface which defines a context based marshal contract.
+type BinEncoder interface {
+	MarshalBinaryWithContext(*EncodingContext) error
+}
+
+// BinDecoder is a decoding interface which defines a context based unmarshal contract.
+type BinDecoder interface {
+	UnmarshalBinaryWithContext(*DecodingContext) error
+}
+
+//--------------------------------------------------------------------------
+//  Update
+//--------------------------------------------------------------------------
+
+// MarshalBinary serializes the internal properties of this Update instance
+// into a byte array
+func (target *Update) MarshalBinary() (data []byte, err error) {
+	ctx := &EncodingContext{
+		Buffer: util.NewBuffer(),
+		Table:  nil,
+	}
+
+	e := target.MarshalBinaryWithContext(ctx)
+	if e != nil {
+		return nil, e
+	}
+
+	encBytes := ctx.Buffer.Bytes()
+	return encBytes, nil
+}
+
+// MarshalBinaryWithContext serializes the internal properties of this Update instance
+// into a byte array leveraging a predefined context.
+func (target *Update) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
+	// panics are recovered and propagated as errors
+	defer func() {
+		if r := recover(); r != nil {
+			if e, ok := r.(error); ok {
+				err = e
+			} else if s, ok := r.(string); ok {
+				err = fmt.Errorf("Unexpected panic: %s", s)
+			} else {
+				err = fmt.Errorf("Unexpected panic: %+v", r)
+			}
+		}
+	}()
+
+	buff := ctx.Buffer
+	buff.WriteUInt8(DefaultCodecVersion) // version
+
+	if ctx.IsStringTable() {
+		a := ctx.Table.AddOrGet(target.Name)
+		buff.WriteInt(a) // write table index
+	} else {
+		buff.WriteString(target.Name) // write string
+	}
+	if target.Labels == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][map](map[string]string) ---
+		buff.WriteInt(len(target.Labels)) // map length
+		for v, z := range target.Labels {
+			if ctx.IsStringTable() {
+				b := ctx.Table.AddOrGet(v)
+				buff.WriteInt(b) // write table index
+			} else {
+				buff.WriteString(v) // write string
+			}
+			if ctx.IsStringTable() {
+				c := ctx.Table.AddOrGet(z)
+				buff.WriteInt(c) // write table index
+			} else {
+				buff.WriteString(z) // write string
+			}
+		}
+		// --- [end][write][map](map[string]string) ---
+
+	}
+	buff.WriteFloat64(target.Value) // write float64
+	if target.AdditionalInfo == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][map](map[string]string) ---
+		buff.WriteInt(len(target.AdditionalInfo)) // map length
+		for vv, zz := range target.AdditionalInfo {
+			if ctx.IsStringTable() {
+				d := ctx.Table.AddOrGet(vv)
+				buff.WriteInt(d) // write table index
+			} else {
+				buff.WriteString(vv) // write string
+			}
+			if ctx.IsStringTable() {
+				e := ctx.Table.AddOrGet(zz)
+				buff.WriteInt(e) // write table index
+			} else {
+				buff.WriteString(zz) // write string
+			}
+		}
+		// --- [end][write][map](map[string]string) ---
+
+	}
+	return nil
+}
+
+// UnmarshalBinary uses the data passed byte array to set all the internal properties of
+// the Update type
+func (target *Update) UnmarshalBinary(data []byte) error {
+	ctx := NewDecodingContextFromBytes(data)
+	defer ctx.Close()
+	err := target.UnmarshalBinaryWithContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// UnmarshalBinaryFromReader uses the io.Reader data to set all the internal properties of
+// the Update type
+func (target *Update) UnmarshalBinaryFromReader(reader io.Reader) error {
+	ctx := NewDecodingContextFromReader(reader)
+	defer ctx.Close()
+	err := target.UnmarshalBinaryWithContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// UnmarshalBinaryWithContext uses the context containing a string table and binary buffer to set all the internal properties of
+// the Update type
+func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error) {
+	// panics are recovered and propagated as errors
+	defer func() {
+		if r := recover(); r != nil {
+			if e, ok := r.(error); ok {
+				err = e
+			} else if s, ok := r.(string); ok {
+				err = fmt.Errorf("Unexpected panic: %s", s)
+			} else {
+				err = fmt.Errorf("Unexpected panic: %+v", r)
+			}
+		}
+	}()
+
+	buff := ctx.Buffer
+	version := buff.ReadUInt8()
+
+	if version > DefaultCodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling Update. Expected %d or less, got %d", DefaultCodecVersion, version)
+	}
+
+	var b string
+	if ctx.IsStringTable() {
+		c := buff.ReadInt() // read string index
+		b = ctx.Table.At(c)
+	} else {
+		b = buff.ReadString() // read string
+	}
+	a := b
+	target.Name = a
+
+	if buff.ReadUInt8() == uint8(0) {
+		target.Labels = nil
+	} else {
+		// --- [begin][read][map](map[string]string) ---
+		e := buff.ReadInt() // map len
+		d := make(map[string]string, e)
+		for i := 0; i < e; i++ {
+			var v string
+			var g string
+			if ctx.IsStringTable() {
+				h := buff.ReadInt() // read string index
+				g = ctx.Table.At(h)
+			} else {
+				g = buff.ReadString() // read string
+			}
+			f := g
+			v = f
+
+			var z string
+			var m string
+			if ctx.IsStringTable() {
+				n := buff.ReadInt() // read string index
+				m = ctx.Table.At(n)
+			} else {
+				m = buff.ReadString() // read string
+			}
+			l := m
+			z = l
+
+			d[v] = z
+		}
+		target.Labels = d
+		// --- [end][read][map](map[string]string) ---
+
+	}
+	o := buff.ReadFloat64() // read float64
+	target.Value = o
+
+	if buff.ReadUInt8() == uint8(0) {
+		target.AdditionalInfo = nil
+	} else {
+		// --- [begin][read][map](map[string]string) ---
+		q := buff.ReadInt() // map len
+		p := make(map[string]string, q)
+		for j := 0; j < q; j++ {
+			var vv string
+			var s string
+			if ctx.IsStringTable() {
+				t := buff.ReadInt() // read string index
+				s = ctx.Table.At(t)
+			} else {
+				s = buff.ReadString() // read string
+			}
+			r := s
+			vv = r
+
+			var zz string
+			var w string
+			if ctx.IsStringTable() {
+				x := buff.ReadInt() // read string index
+				w = ctx.Table.At(x)
+			} else {
+				w = buff.ReadString() // read string
+			}
+			u := w
+			zz = u
+
+			p[vv] = zz
+		}
+		target.AdditionalInfo = p
+		// --- [end][read][map](map[string]string) ---
+
+	}
+	return nil
+}
+
+//--------------------------------------------------------------------------
+//  UpdateSet
+//--------------------------------------------------------------------------
+
+// MarshalBinary serializes the internal properties of this UpdateSet instance
+// into a byte array
+func (target *UpdateSet) MarshalBinary() (data []byte, err error) {
+	ctx := &EncodingContext{
+		Buffer: util.NewBuffer(),
+		Table:  NewStringTableWriter(),
+	}
+
+	e := target.MarshalBinaryWithContext(ctx)
+	if e != nil {
+		return nil, e
+	}
+
+	encBytes := ctx.Buffer.Bytes()
+	sTableBytes := ctx.Table.ToBytes()
+	merged := appendBytes(sTableBytes, encBytes)
+	return merged, nil
+}
+
+// MarshalBinaryWithContext serializes the internal properties of this UpdateSet instance
+// into a byte array leveraging a predefined context.
+func (target *UpdateSet) MarshalBinaryWithContext(ctx *EncodingContext) (err error) {
+	// panics are recovered and propagated as errors
+	defer func() {
+		if r := recover(); r != nil {
+			if e, ok := r.(error); ok {
+				err = e
+			} else if s, ok := r.(string); ok {
+				err = fmt.Errorf("Unexpected panic: %s", s)
+			} else {
+				err = fmt.Errorf("Unexpected panic: %+v", r)
+			}
+		}
+	}()
+
+	buff := ctx.Buffer
+	buff.WriteUInt8(DefaultCodecVersion) // version
+
+	// --- [begin][write][reference](time.Time) ---
+	a, errA := target.Timestamp.MarshalBinary()
+	if errA != nil {
+		return errA
+	}
+	buff.WriteInt(len(a))
+	buff.WriteBytes(a)
+	// --- [end][write][reference](time.Time) ---
+
+	if target.Updates == nil {
+		buff.WriteUInt8(uint8(0)) // write nil byte
+	} else {
+		buff.WriteUInt8(uint8(1)) // write non-nil byte
+
+		// --- [begin][write][slice]([]Update) ---
+		buff.WriteInt(len(target.Updates)) // array length
+		for i := 0; i < len(target.Updates); i++ {
+			// --- [begin][write][struct](Update) ---
+			buff.WriteInt(0) // [compatibility, unused]
+			errB := target.Updates[i].MarshalBinaryWithContext(ctx)
+			if errB != nil {
+				return errB
+			}
+			// --- [end][write][struct](Update) ---
+
+		}
+		// --- [end][write][slice]([]Update) ---
+
+	}
+	return nil
+}
+
+// UnmarshalBinary uses the data passed byte array to set all the internal properties of
+// the UpdateSet type
+func (target *UpdateSet) UnmarshalBinary(data []byte) error {
+	ctx := NewDecodingContextFromBytes(data)
+	defer ctx.Close()
+	err := target.UnmarshalBinaryWithContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// UnmarshalBinaryFromReader uses the io.Reader data to set all the internal properties of
+// the UpdateSet type
+func (target *UpdateSet) UnmarshalBinaryFromReader(reader io.Reader) error {
+	ctx := NewDecodingContextFromReader(reader)
+	defer ctx.Close()
+	err := target.UnmarshalBinaryWithContext(ctx)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+// UnmarshalBinaryWithContext uses the context containing a string table and binary buffer to set all the internal properties of
+// the UpdateSet type
+func (target *UpdateSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (err error) {
+	// panics are recovered and propagated as errors
+	defer func() {
+		if r := recover(); r != nil {
+			if e, ok := r.(error); ok {
+				err = e
+			} else if s, ok := r.(string); ok {
+				err = fmt.Errorf("Unexpected panic: %s", s)
+			} else {
+				err = fmt.Errorf("Unexpected panic: %+v", r)
+			}
+		}
+	}()
+
+	buff := ctx.Buffer
+	version := buff.ReadUInt8()
+
+	if version > DefaultCodecVersion {
+		return fmt.Errorf("Invalid Version Unmarshaling UpdateSet. Expected %d or less, got %d", DefaultCodecVersion, version)
+	}
+
+	// --- [begin][read][reference](time.Time) ---
+	a := &time.Time{}
+	b := buff.ReadInt()    // byte array length
+	c := buff.ReadBytes(b) // byte array
+	errA := a.UnmarshalBinary(c)
+	if errA != nil {
+		return errA
+	}
+	target.Timestamp = *a
+	// --- [end][read][reference](time.Time) ---
+
+	if buff.ReadUInt8() == uint8(0) {
+		target.Updates = nil
+	} else {
+		// --- [begin][read][slice]([]Update) ---
+		e := buff.ReadInt() // array len
+		d := make([]Update, e)
+		for i := 0; i < e; i++ {
+			// --- [begin][read][struct](Update) ---
+			g := &Update{}
+			buff.ReadInt() // [compatibility, unused]
+			errB := g.UnmarshalBinaryWithContext(ctx)
+			if errB != nil {
+				return errB
+			}
+			f := *g
+			// --- [end][read][struct](Update) ---
+
+			d[i] = f
+		}
+		target.Updates = d
+		// --- [end][read][slice]([]Update) ---
+
+	}
+	return nil
+}
+
+//--------------------------------------------------------------------------
+//  UpdateSetStream
+//--------------------------------------------------------------------------
+
+// UpdateSetStream is a single use field stream for the contents of an UpdateSet instance. Instead of creating an instance and populating
+// the fields on that instance, we provide a streaming iterator which yields (BingenFieldInfo, *BingenValue) tuples for each
+// stremable element. All slices and maps will be flattened one depth and each element streamed individually.
+type UpdateSetStream struct {
+	reader io.Reader
+	ctx    *DecodingContext
+	err    error
+}
+
+// Closes closes the internal io.Reader used to read and parse the UpdateSet fields.
+// This should be called once the stream is no longer needed.
+func (stream *UpdateSetStream) Close() {
+	if closer, ok := stream.reader.(io.Closer); ok {
+		closer.Close()
+	}
+	stream.ctx.Close()
+}
+
+// Error returns an error if one occurred during the process of streaming the UpdateSet
+// This can be checked after iterating through the Stream().
+func (stream *UpdateSetStream) Error() error {
+	return stream.err
+}
+
+// NewUpdateSetStream creates a new UpdateSetStream, which uses the io.Reader data to stream all internal fields of an UpdateSet instance
+func NewUpdateSetStream(reader io.Reader) BingenStream {
+	ctx := NewDecodingContextFromReader(reader)
+
+	return &UpdateSetStream{
+		ctx:    ctx,
+		reader: reader,
+	}
+}
+
+// Stream returns the iterator which will stream each field of the target type.
+func (stream *UpdateSetStream) Stream() iter.Seq2[BingenFieldInfo, *BingenValue] {
+	return func(yield func(BingenFieldInfo, *BingenValue) bool) {
+		var fi BingenFieldInfo
+
+		ctx := stream.ctx
+		buff := ctx.Buffer
+		version := buff.ReadUInt8()
+
+		if version > DefaultCodecVersion {
+			stream.err = fmt.Errorf("Invalid Version Unmarshaling UpdateSet. Expected %d or less, got %d", DefaultCodecVersion, version)
+			return
+		}
+
+		fi = BingenFieldInfo{
+			Type: reflect.TypeFor[time.Time](),
+			Name: "Timestamp",
+		}
+
+		// --- [begin][read][reference](time.Time) ---
+		b := &time.Time{}
+		c := buff.ReadInt()    // byte array length
+		d := buff.ReadBytes(c) // byte array
+		errA := b.UnmarshalBinary(d)
+		if errA != nil {
+			stream.err = errA
+			return
+		}
+		a := *b
+		// --- [end][read][reference](time.Time) ---
+
+		if !yield(fi, singleV(a)) {
+			return
+		}
+		fi = BingenFieldInfo{
+			Type: reflect.TypeFor[[]Update](),
+			Name: "Updates",
+		}
+
+		if buff.ReadUInt8() == uint8(0) {
+			if !yield(fi, nil) {
+				return
+			}
+		} else {
+			// --- [begin][read][streaming-slice]([]Update) ---
+			e := buff.ReadInt() // array len
+			for i := 0; i < e; i++ {
+				// --- [begin][read][struct](Update) ---
+				g := &Update{}
+				buff.ReadInt() // [compatibility, unused]
+				errB := g.UnmarshalBinaryWithContext(ctx)
+				if errB != nil {
+					stream.err = errB
+					return
+				}
+				f := *g
+				// --- [end][read][struct](Update) ---
+
+				if !yield(fi, pairV(i, f)) {
+					return
+				}
+			}
+			// --- [end][read][streaming-slice]([]Update) ---
+
+		}
+	}
+}

+ 4 - 4
modules/collector-source/pkg/metric/metrics.go

@@ -25,6 +25,10 @@ const (
 	ServiceSelectorLabels                                 = "service_selector_labels"
 	StatefulSetMatchLabels                                = "statefulSet_match_labels"
 	KubeReplicasetOwner                                   = "kube_replicaset_owner"
+	ContainerCPUAllocation                                = "container_cpu_allocation"
+	ContainerMemoryAllocationBytes                        = "container_memory_allocation_bytes"
+	ContainerGPUAllocation                                = "container_gpu_allocation"
+	PodPVCAllocation                                      = "pod_pvc_allocation"
 	ResourceQuotaInfo                                     = "resourcequota_info"
 	KubeResourceQuotaSpecResourceRequests                 = "resourcequota_spec_resource_requests"
 	KubeResourceQuotaSpecResourceLimits                   = "resourcequota_spec_resource_limits"
@@ -54,10 +58,6 @@ const (
 	NodeGPUHourlyCost                    = "node_gpu_hourly_cost"
 	NodeGPUCount                         = "node_gpu_count"
 	KubecostNodeIsSpot                   = "kubecost_node_is_spot"
-	ContainerCPUAllocation               = "container_cpu_allocation"
-	ContainerMemoryAllocationBytes       = "container_memory_allocation_bytes"
-	ContainerGPUAllocation               = "container_gpu_allocation"
-	PodPVCAllocation                     = "pod_pvc_allocation"
 
 	// Stat Summary Metrics
 	NodeCPUSecondsTotal                = "node_cpu_seconds_total"

+ 283 - 0
modules/collector-source/pkg/metric/synthetic/cpuallocation.go

@@ -0,0 +1,283 @@
+package synthetic
+
+import (
+	"maps"
+	"math"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+)
+
+// CpuUsageMetric contains the last two samples of a CPU instant metric.
+type CpuUsageMetric struct {
+	current *InstantMetric
+	prev    *InstantMetric
+}
+
+// NewCpuUsageMetric creates a new cpu usage metric initialized to the provided instant metric
+// data.
+func NewCpuUsageMetric(t time.Time, m *metric.Update) *CpuUsageMetric {
+	return new(CpuUsageMetric).Push(t, m)
+}
+
+// Push accepts new instant metric data, advances any current data to previous, and sets the new
+// current to the provided metric.
+func (usage *CpuUsageMetric) Push(t time.Time, m *metric.Update) *CpuUsageMetric {
+	if usage.current == nil {
+		usage.current = &InstantMetric{t, m}
+		return usage
+	}
+
+	usage.prev = usage.current
+	usage.current = &InstantMetric{t, m}
+	return usage
+}
+
+// Labels returns the labels for any current if it exists first, then looks to any previous data next.
+func (usage *CpuUsageMetric) Labels() map[string]string {
+	if usage.current != nil {
+		return usage.current.update.Labels
+	}
+	if usage.prev != nil {
+		return usage.prev.update.Labels
+	}
+
+	return map[string]string{}
+}
+
+// IsValid returns true when usage is non-nil, the current instant metric is non-nil, and the previous
+// instant metric is non-nil
+func (usage *CpuUsageMetric) IsValid() bool {
+	return usage != nil && usage.current != nil && usage.prev != nil
+}
+
+// IsEmpty returns true when there are no valid samples
+func (usage *CpuUsageMetric) IsEmpty() bool {
+	return usage == nil || (usage.current == nil && usage.prev == nil)
+}
+
+// Value returns the irate of the two metric samples if they exist, and 0 if they don't.
+func (usage *CpuUsageMetric) Value() float64 {
+	if usage.current == nil || usage.prev == nil {
+		return 0.0
+	}
+
+	v1, t1 := usage.current.update.Value, usage.current.timestamp
+	v2, t2 := usage.prev.update.Value, usage.prev.timestamp
+	seconds := t1.Sub(t2).Seconds()
+	if seconds <= 0.0 {
+		return 0.0
+	}
+
+	irate := (v1 - v2) / seconds
+	return irate
+}
+
+// Shift will set the previous to the current metric, and set the current metric to nil.
+func (usage *CpuUsageMetric) Shift() {
+	if usage == nil {
+		return
+	}
+
+	usage.prev = usage.current
+	usage.current = nil
+}
+
+// ContainerCpuAllocationMetric is the grouping unit for cpu usage and cpu request metrics.
+type ContainerCpuAllocationMetric struct {
+	requestMetric *metric.Update
+	usageMetric   *CpuUsageMetric
+}
+
+// IsValid returns true if we can synthesize an update from the samples available
+func (cmam *ContainerCpuAllocationMetric) IsValid() bool {
+	return cmam.requestMetric != nil || cmam.usageMetric.IsValid()
+}
+
+// Synthesize returns a new CpuAllocation metric update with the max(request, usage)
+func (cmam *ContainerCpuAllocationMetric) Synthesize() metric.Update {
+	if cmam.requestMetric != nil && cmam.usageMetric.IsValid() {
+		req := cmam.requestMetric.Value
+		if math.IsNaN(req) {
+			log.Debugf("NaN value found during cpu allocation synthesis for requests.")
+			req = 0.0
+		}
+
+		used := cmam.usageMetric.Value()
+		if math.IsNaN(used) {
+			log.Debugf("NaN value found during cpu allocation synthesis for used.")
+			used = 0.0
+		}
+
+		// TODO: validate and merge labels if they both have keys?
+		labels := maps.Clone(cmam.usageMetric.Labels())
+
+		return metric.Update{
+			Name:   metric.ContainerCPUAllocation,
+			Labels: labels,
+			Value:  max(req, used),
+		}
+	} else if cmam.requestMetric != nil {
+		req := cmam.requestMetric.Value
+		if math.IsNaN(req) {
+			log.Debugf("NaN value found during cpu allocation synthesis for requests.")
+			req = 0.0
+		}
+
+		// drop the "extra" labels
+		labels := maps.Clone(cmam.requestMetric.Labels)
+		delete(labels, source.ResourceLabel)
+		delete(labels, source.UnitLabel)
+
+		return metric.Update{
+			Name:   metric.ContainerCPUAllocation,
+			Labels: labels,
+			Value:  req,
+		}
+	}
+
+	// not possible for both request and usage to be nil, so we can assume only used is
+	// valid here
+	used := cmam.usageMetric.Value()
+	if math.IsNaN(used) {
+		log.Debugf("NaN value found during cpu allocation synthesis for used.")
+		used = 0.0
+	}
+
+	labels := maps.Clone(cmam.usageMetric.Labels())
+
+	return metric.Update{
+		Name:   metric.ContainerCPUAllocation,
+		Labels: labels,
+		Value:  used,
+	}
+}
+
+// IsEmpty returns true if there are no valid samples to extract from
+func (cmam *ContainerCpuAllocationMetric) IsEmpty() bool {
+	return cmam.requestMetric == nil && cmam.usageMetric.IsEmpty()
+}
+
+// Cycle will advance the usage sample buffer and clear the request sample.
+func (cmam *ContainerCpuAllocationMetric) Cycle() {
+	cmam.requestMetric = nil
+	cmam.usageMetric.Shift()
+}
+
+// ContainerCpuAllocationSynthesizer is a MetricSynthesizer that leverages pod uid and container name grouping
+// to match relevant request and usage metrics to build the cpu allocation data.
+type ContainerCpuAllocationSynthesizer struct {
+	byPod map[string]map[string]*ContainerCpuAllocationMetric
+}
+
+// NewContainerCpuAllocationSynthesizer creates a new ContainerCpuAllocationSynthesizer which synthesizes
+// metric updates for ContainerCPUAllocation from cpu requests and cpu usage metrics.
+func NewContainerCpuAllocationSynthesizer() *ContainerCpuAllocationSynthesizer {
+	return &ContainerCpuAllocationSynthesizer{
+		byPod: make(map[string]map[string]*ContainerCpuAllocationMetric),
+	}
+}
+
+// Process only processes cpu requests and cpu usage metrics
+func (cmas *ContainerCpuAllocationSynthesizer) Process(t time.Time, update *metric.Update) {
+	switch update.Name {
+	case metric.KubePodContainerResourceRequests:
+		cmas.addRequestsMetric(update)
+	case metric.ContainerCPUUsageSecondsTotal:
+		cmas.addUsageMetric(t, update)
+	}
+}
+
+// Synthesize will synthesize all valid synthesizers within the pod/container mapping.
+func (cmas *ContainerCpuAllocationSynthesizer) Synthesize() []metric.Update {
+	var updates []metric.Update
+
+	for _, pod := range cmas.byPod {
+		for _, synthesizer := range pod {
+			isValid := synthesizer.IsValid()
+			if isValid {
+				updates = append(updates, synthesizer.Synthesize())
+			}
+		}
+	}
+
+	return updates
+}
+
+// Clear for the CpuAllocationSynthesis must cycle the samples, and only remove them if there is no
+// more valid sample data remaining.
+func (cmas *ContainerCpuAllocationSynthesizer) Clear() {
+	for podKey, pod := range cmas.byPod {
+		for synthKey, synthesizer := range pod {
+			synthesizer.Cycle()
+			if synthesizer.IsEmpty() {
+				delete(pod, synthKey)
+			}
+		}
+		if len(pod) == 0 {
+			delete(cmas.byPod, podKey)
+		}
+	}
+}
+
+func (cmas *ContainerCpuAllocationSynthesizer) addRequestsMetric(update *metric.Update) {
+	if !cmas.isValidRequests(update.Labels) {
+		return
+	}
+
+	podUID := update.Labels[source.UIDLabel]
+	container := update.Labels[source.ContainerLabel]
+	if _, ok := cmas.byPod[podUID]; !ok {
+		cmas.byPod[podUID] = make(map[string]*ContainerCpuAllocationMetric)
+	}
+
+	if _, ok := cmas.byPod[podUID][container]; !ok {
+		cmas.byPod[podUID][container] = &ContainerCpuAllocationMetric{
+			requestMetric: update,
+		}
+	} else {
+		cmas.byPod[podUID][container].requestMetric = update
+	}
+}
+
+func (cmas *ContainerCpuAllocationSynthesizer) addUsageMetric(t time.Time, update *metric.Update) {
+	if !cmas.isValidUsage(update.Labels) {
+		return
+	}
+
+	podUID := update.Labels[source.UIDLabel]
+	container := update.Labels[source.ContainerLabel]
+	if _, ok := cmas.byPod[podUID]; !ok {
+		cmas.byPod[podUID] = make(map[string]*ContainerCpuAllocationMetric)
+	}
+
+	if _, ok := cmas.byPod[podUID][container]; !ok {
+		cmas.byPod[podUID][container] = &ContainerCpuAllocationMetric{
+			usageMetric: NewCpuUsageMetric(t, update),
+		}
+	} else {
+		cpuAllocMetric := cmas.byPod[podUID][container]
+		if cpuAllocMetric.usageMetric == nil {
+			cpuAllocMetric.usageMetric = NewCpuUsageMetric(t, update)
+		} else {
+			cpuAllocMetric.usageMetric.Push(t, update)
+		}
+	}
+}
+
+func (cmas *ContainerCpuAllocationSynthesizer) isValidRequests(labels map[string]string) bool {
+	return labels[source.ResourceLabel] == "cpu" &&
+		labels[source.UnitLabel] == "core" &&
+		labels[source.ContainerLabel] != "POD" &&
+		labels[source.ContainerLabel] != "" &&
+		labels[source.NodeLabel] != "" &&
+		labels[source.UIDLabel] != ""
+}
+
+func (cmas *ContainerCpuAllocationSynthesizer) isValidUsage(labels map[string]string) bool {
+	return labels[source.ContainerLabel] != "POD" &&
+		labels[source.ContainerLabel] != "" &&
+		labels[source.UIDLabel] != ""
+}

+ 69 - 0
modules/collector-source/pkg/metric/synthetic/metricsynthesizer.go

@@ -0,0 +1,69 @@
+package synthetic
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+)
+
+// InstantMetric is a metric update that happened at a specific timestamp.
+type InstantMetric struct {
+	timestamp time.Time
+	update    *metric.Update
+}
+
+// MetricSynthesizer is an implementation prototype for an object capable of processing
+// a stream of metric updates, and then synthesizing new metric updates based on the processed
+// data.
+type MetricSynthesizer interface {
+	// Process accepts individual Updates from an UpdateSet for processing. Once all Updates
+	// have been processed, call Synthesize() to generate any additional updates.
+	Process(t time.Time, update *metric.Update)
+
+	// Synthesize will generate all synthetic Update instances after processing all existing updates
+	// in a set.
+	Synthesize() []metric.Update
+
+	// Clear resets or cycles the current state of the processed metrics to prepare for the next scrape.
+	Clear()
+}
+
+// MetricSynthesizers implements the `metric.Updater` interface, to accept a `metric.UpdateSet` of metric updates,
+// pipes each `metric.Update` into the registered MetricSynthesizer instances for processing, and then synthesizes
+// new metric updates to append.
+type MetricSynthesizers struct {
+	synthesizers []MetricSynthesizer
+	next         metric.Updater
+}
+
+// NewMetricSynthesizers creates a new set of metric synthesizers, which acts as an updater decorator to append
+// all newly synthesized metrics onto the existing update set before passing it along to the next updater.
+func NewMetricSynthesizers(next metric.Updater, synthesizers ...MetricSynthesizer) *MetricSynthesizers {
+	return &MetricSynthesizers{
+		synthesizers: synthesizers,
+		next:         next,
+	}
+}
+
+func (ms *MetricSynthesizers) Update(set *metric.UpdateSet) {
+	ts := set.Timestamp
+
+	// first pass is to have all synthesizers process all updates
+	for _, synthesizer := range ms.synthesizers {
+		for i := range len(set.Updates) {
+			update := set.Updates[i]
+			synthesizer.Process(ts, &update)
+		}
+	}
+
+	// second pass is to have the synthesizers generate all synthetic updates
+	for _, synthesizer := range ms.synthesizers {
+		updates := synthesizer.Synthesize()
+		if len(updates) != 0 {
+			set.Updates = append(set.Updates, updates...)
+		}
+		synthesizer.Clear()
+	}
+
+	ms.next.Update(set)
+}

+ 422 - 0
modules/collector-source/pkg/metric/synthetic/metricsynthesizer_test.go

@@ -0,0 +1,422 @@
+package synthetic
+
+import (
+	"maps"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/core/pkg/util"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+)
+
+var _ metric.Updater = (*FuncUpdater)(nil)
+
+type FuncUpdater struct {
+	f func(*metric.UpdateSet)
+}
+
+func NewFuncUpdater(f func(*metric.UpdateSet)) *FuncUpdater {
+	return &FuncUpdater{f}
+}
+
+func (fu *FuncUpdater) Update(set *metric.UpdateSet) {
+	fu.f(set)
+}
+
+func toMemoryResource(m map[string]string) map[string]string {
+	mm := maps.Clone(m)
+	mm[source.ResourceLabel] = "memory"
+	mm[source.UnitLabel] = "byte"
+	return mm
+}
+
+func toCpuResource(m map[string]string) map[string]string {
+	mm := maps.Clone(m)
+	mm[source.ResourceLabel] = "cpu"
+	mm[source.UnitLabel] = "core"
+	return mm
+}
+
+func findMetric(t *testing.T, set *metric.UpdateSet, name string, container string) *metric.Update {
+	t.Helper()
+
+	var metric *metric.Update
+	for _, update := range set.Updates {
+		if update.Name == name && update.Labels[source.ContainerLabel] == container {
+			metric = &update
+			break
+		}
+	}
+
+	return metric
+}
+
+func assertMetricValue(t *testing.T, set *metric.UpdateSet, name string, container string, value float64) {
+	t.Helper()
+
+	metric := findMetric(t, set, name, container)
+	if metric == nil {
+		t.Fatalf("Failed to Locate a %s Metric for Container: %s\n", name, container)
+		return
+	}
+
+	if !util.IsApproximately(metric.Value, value) {
+		t.Fatalf("Expected %f for %s [Container: %s], got: %f\n", value, name, container, metric.Value)
+		return
+	}
+}
+
+func assertMetricExists(t *testing.T, set *metric.UpdateSet, name string, container string) {
+	t.Helper()
+
+	metric := findMetric(t, set, name, container)
+	if metric == nil {
+		t.Fatalf("Failed to Locate a %s Metric for Container: %s\n", name, container)
+		return
+	}
+}
+
+func assertNoMetricExists(t *testing.T, set *metric.UpdateSet, name string, container string) {
+	t.Helper()
+
+	metric := findMetric(t, set, name, container)
+	if metric != nil {
+		t.Fatalf("Expected metric to not exist: %s Metric for Container: %s\n", name, container)
+		return
+	}
+}
+
+func TestMetricSynthesizerRAMAllocation(t *testing.T) {
+	pod1Info := map[string]string{
+		source.NamespaceLabel: "namespace1",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod1",
+		source.UIDLabel:       "pod-uuid1",
+	}
+
+	container1Info := map[string]string{
+		source.NamespaceLabel: "namespace1",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod1",
+		source.UIDLabel:       "pod-uuid1",
+		source.ContainerLabel: "container1",
+	}
+
+	container2Info := map[string]string{
+		source.NamespaceLabel: "kube-system",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod2",
+		source.UIDLabel:       "pod-uuid2",
+		source.ContainerLabel: "container2",
+	}
+
+	const startingCPUSeconds float64 = 506000.0
+
+	updateSet1 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			// container1 has both requests and usage
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toMemoryResource(container1Info),
+				Value:  4.0 * 1024 * 1024 * 1024,
+			},
+			{
+				Name:   metric.ContainerMemoryWorkingSetBytes,
+				Labels: maps.Clone(container1Info),
+				Value:  5.5 * 1024 * 1024 * 1024,
+			},
+			// container2 only has usage
+			{
+				Name:   metric.ContainerMemoryWorkingSetBytes,
+				Labels: maps.Clone(container2Info),
+				Value:  1.5 * 1024 * 1024 * 1024,
+			},
+			// add some additional metrics to test filtering
+			{
+				Name:   metric.KubeNamespaceLabels,
+				Labels: maps.Clone(pod1Info),
+				Value:  0,
+			},
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  20,
+			},
+		},
+	}
+
+	updateSet2 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 30, 0, time.UTC),
+		Updates: []metric.Update{
+			// container1 has both requests and usage
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toMemoryResource(container1Info),
+				Value:  4.0 * 1024 * 1024 * 1024,
+			},
+			{
+				Name:   metric.ContainerMemoryWorkingSetBytes,
+				Labels: maps.Clone(container1Info),
+				Value:  3.0 * 1024 * 1024 * 1024,
+			},
+			// container2 only has usage
+			{
+				Name:   metric.ContainerMemoryWorkingSetBytes,
+				Labels: maps.Clone(container2Info),
+				Value:  2.5 * 1024 * 1024 * 1024,
+			},
+			// add some additional metrics to test filtering
+			{
+				Name:   metric.KubeNamespaceLabels,
+				Labels: maps.Clone(pod1Info),
+				Value:  0,
+			},
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  75,
+			},
+		},
+	}
+
+	updateSet3 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 1, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			// container1 has both requests and usage
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toMemoryResource(container1Info),
+				Value:  4.0 * 1024 * 1024 * 1024,
+			},
+			{
+				Name:   metric.ContainerMemoryWorkingSetBytes,
+				Labels: maps.Clone(container1Info),
+				Value:  6.0 * 1024 * 1024 * 1024,
+			},
+			// container2 only has usage
+			{
+				Name:   metric.ContainerMemoryWorkingSetBytes,
+				Labels: maps.Clone(container2Info),
+				Value:  1.75 * 1024 * 1024 * 1024,
+			},
+			// add some additional metrics to test filtering
+			{
+				Name:   metric.KubeNamespaceLabels,
+				Labels: maps.Clone(pod1Info),
+				Value:  0,
+			},
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  135,
+			},
+		},
+	}
+
+	scrape := 0
+	updater := NewFuncUpdater(func(us *metric.UpdateSet) {
+		// first scrape:
+		//  - container1: max(4.0gb, 5.5gb)
+		//  - container2: 1.5gb
+		if scrape == 0 {
+			assertMetricValue(t, us, metric.ContainerMemoryAllocationBytes, "container1", 5.5*1024*1024*1024)
+			assertMetricValue(t, us, metric.ContainerMemoryAllocationBytes, "container2", 1.5*1024*1024*1024)
+		}
+
+		// second scrape
+		//  - container1: max(4.0gb, 3.5gb)
+		//  - container2: 2.5gb
+		if scrape == 1 {
+			assertMetricValue(t, us, metric.ContainerMemoryAllocationBytes, "container1", 4.0*1024*1024*1024)
+			assertMetricValue(t, us, metric.ContainerMemoryAllocationBytes, "container2", 2.5*1024*1024*1024)
+		}
+
+		// third scrape
+		//  - container1: max(4.0gb, 6.0gb)
+		//  - container2: 1.75gb
+		if scrape == 2 {
+			assertMetricValue(t, us, metric.ContainerMemoryAllocationBytes, "container1", 6.0*1024*1024*1024)
+			assertMetricValue(t, us, metric.ContainerMemoryAllocationBytes, "container2", 1.75*1024*1024*1024)
+		}
+
+		scrape += 1
+	})
+
+	metricSynth := NewMetricSynthesizers(updater, NewContainerCpuAllocationSynthesizer(), NewContainerMemoryAllocationSynthesizer())
+
+	metricSynth.Update(updateSet1)
+	metricSynth.Update(updateSet2)
+	metricSynth.Update(updateSet3)
+}
+
+func TestMetricSynthesizerCPUAllocation(t *testing.T) {
+	pod1Info := map[string]string{
+		source.NamespaceLabel: "namespace1",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod1",
+		source.UIDLabel:       "pod-uuid1",
+	}
+
+	container1Info := map[string]string{
+		source.NamespaceLabel: "namespace1",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod1",
+		source.UIDLabel:       "pod-uuid1",
+		source.ContainerLabel: "container1",
+	}
+
+	container2Info := map[string]string{
+		source.NamespaceLabel: "kube-system",
+		source.NodeLabel:      "node1",
+		source.InstanceLabel:  "node1",
+		source.PodLabel:       "pod2",
+		source.UIDLabel:       "pod-uuid2",
+		source.ContainerLabel: "container2",
+	}
+
+	const startingCPUSeconds float64 = 506000.0
+
+	updateSet1 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			// container1 has both requests and usage
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  0.2,
+			},
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  startingCPUSeconds,
+			},
+			// container2 only has usage
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container2Info),
+				Value:  startingCPUSeconds,
+			},
+			// add some additional metrics to test filtering
+			{
+				Name:   metric.KubeNamespaceLabels,
+				Labels: maps.Clone(pod1Info),
+				Value:  0,
+			},
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toMemoryResource(container1Info),
+				Value:  2.5 * 1024.0 * 1024.0 * 1024.0,
+			},
+		},
+	}
+
+	updateSet2 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 0, 30, 0, time.UTC),
+		Updates: []metric.Update{
+			// container1 has both requests and usage
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  0.2,
+			},
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  startingCPUSeconds + 40.0,
+			},
+			// container2 only has usage
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container2Info),
+				Value:  startingCPUSeconds + 30.0,
+			},
+			// add some additional metrics to test filtering
+			{
+				Name:   metric.KubeNamespaceLabels,
+				Labels: maps.Clone(pod1Info),
+				Value:  0,
+			},
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toMemoryResource(container1Info),
+				Value:  2.5 * 1024.0 * 1024.0 * 1024.0,
+			},
+		},
+	}
+
+	updateSet3 := &metric.UpdateSet{
+		Timestamp: time.Date(2026, time.January, 1, 0, 1, 0, 0, time.UTC),
+		Updates: []metric.Update{
+			// container1 has both requests and usage
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toCpuResource(container1Info),
+				Value:  0.2,
+			},
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container1Info),
+				Value:  startingCPUSeconds + 40.0 + 5.0,
+			},
+			// container2 only has usage
+			{
+				Name:   metric.ContainerCPUUsageSecondsTotal,
+				Labels: maps.Clone(container2Info),
+				Value:  startingCPUSeconds + 30.0 + 30.0,
+			},
+			// add some additional metrics to test filtering
+			{
+				Name:   metric.KubeNamespaceLabels,
+				Labels: maps.Clone(pod1Info),
+				Value:  0,
+			},
+			{
+				Name:   metric.KubePodContainerResourceRequests,
+				Labels: toMemoryResource(container1Info),
+				Value:  2.5 * 1024.0 * 1024.0 * 1024.0,
+			},
+		},
+	}
+
+	scrape := 0
+	updater := NewFuncUpdater(func(us *metric.UpdateSet) {
+		// first scrape:
+		//  - container1: alloc = request
+		//  - container2: no metric
+		if scrape == 0 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 0.2)
+			assertNoMetricExists(t, us, metric.ContainerCPUAllocation, "container2")
+		}
+
+		// second scrape
+		//  - container1: alloc = 40s/30s = 1.33
+		//  - container2: alloc = 30s/30s = 1.0
+		if scrape == 1 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 1.33333333)
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container2", 1.0)
+		}
+
+		// third scrape
+		//  - container1: alloc = 5.0/30.0s = 0.13, so alloc = request again (0.2)
+		//  - container2: alloc = 30s/30s = 1.0
+		if scrape == 2 {
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container1", 0.2)
+			assertMetricValue(t, us, metric.ContainerCPUAllocation, "container2", 1.0)
+		}
+
+		scrape += 1
+	})
+
+	metricSynth := NewMetricSynthesizers(updater, NewContainerCpuAllocationSynthesizer(), NewContainerMemoryAllocationSynthesizer())
+
+	metricSynth.Update(updateSet1)
+	metricSynth.Update(updateSet2)
+	metricSynth.Update(updateSet3)
+}

+ 173 - 0
modules/collector-source/pkg/metric/synthetic/ramallocation.go

@@ -0,0 +1,173 @@
+package synthetic
+
+import (
+	"maps"
+	"math"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/modules/collector-source/pkg/metric"
+)
+
+// ContainerMemoryAllocationMetric is the grouping unit for memory usage and request
+type ContainerMemoryAllocationMetric struct {
+	requestMetric *metric.Update
+	usageMetric   *metric.Update
+}
+
+// Synthesize returns a new ContainerMemoryAllocationBytes metric update with the max(request, usage)
+func (cmam *ContainerMemoryAllocationMetric) Synthesize() metric.Update {
+	if cmam.requestMetric != nil && cmam.usageMetric != nil {
+		req := cmam.requestMetric.Value
+		if math.IsNaN(req) {
+			log.Debugf("NaN value found during memory allocation synthesis for requests.")
+			req = 0.0
+		}
+
+		used := cmam.usageMetric.Value
+		if math.IsNaN(used) {
+			log.Debugf("NaN value found during memory allocation synthesis for used.")
+			used = 0.0
+		}
+
+		// TODO: validate and merge labels if they both have keys?
+		labels := maps.Clone(cmam.usageMetric.Labels)
+
+		return metric.Update{
+			Name:   metric.ContainerMemoryAllocationBytes,
+			Labels: labels,
+			Value:  max(req, used),
+		}
+	} else if cmam.requestMetric != nil {
+		req := cmam.requestMetric.Value
+		if math.IsNaN(req) {
+			log.Debugf("NaN value found during memory allocation synthesis for requests.")
+			req = 0.0
+		}
+
+		// drop the "extra" labels
+		labels := maps.Clone(cmam.requestMetric.Labels)
+		delete(labels, source.ResourceLabel)
+		delete(labels, source.UnitLabel)
+
+		return metric.Update{
+			Name:   metric.ContainerMemoryAllocationBytes,
+			Labels: labels,
+			Value:  req,
+		}
+	}
+
+	// not possible for both request and usage to be nil, so we can assume only used is
+	// valid here
+	used := cmam.usageMetric.Value
+	if math.IsNaN(used) {
+		log.Debugf("NaN value found during memory allocation synthesis for used.")
+		used = 0.0
+	}
+
+	labels := maps.Clone(cmam.usageMetric.Labels)
+
+	return metric.Update{
+		Name:   metric.ContainerMemoryAllocationBytes,
+		Labels: labels,
+		Value:  used,
+	}
+}
+
+// ContainerMemoryAllocationSynthesizer is a MetricSynthesizer that leverages pod uid and container name grouping
+// to match relevant request and usage metrics to build the memory allocation data.
+type ContainerMemoryAllocationSynthesizer struct {
+	byPod map[string]map[string]*ContainerMemoryAllocationMetric
+}
+
+// NewContainerMemoryAllocationSynthesizer creates a new ContainerMemoryAllocationSynthesizer which synthesizes
+// metric updates for ContainerMemoryAllocationBytes from ram requests and ram usage metrics.
+func NewContainerMemoryAllocationSynthesizer() *ContainerMemoryAllocationSynthesizer {
+	return &ContainerMemoryAllocationSynthesizer{
+		byPod: make(map[string]map[string]*ContainerMemoryAllocationMetric),
+	}
+}
+
+// Process accepts metric updates and only records updates relevant to memory allocation.
+func (cmas *ContainerMemoryAllocationSynthesizer) Process(t time.Time, update *metric.Update) {
+	switch update.Name {
+	case metric.KubePodContainerResourceRequests:
+		cmas.addRequestsMetric(update)
+	case metric.ContainerMemoryWorkingSetBytes:
+		cmas.addUsageMetric(update)
+	}
+}
+
+// Synthesize generates all new memory allocation metrics
+func (cmas *ContainerMemoryAllocationSynthesizer) Synthesize() []metric.Update {
+	var updates []metric.Update
+
+	for _, pod := range cmas.byPod {
+		for _, synthesizer := range pod {
+			updates = append(updates, synthesizer.Synthesize())
+		}
+	}
+
+	return updates
+}
+
+// Clear drops the current metric mapping and creates a new map ready to process next metrics collection.
+func (cmas *ContainerMemoryAllocationSynthesizer) Clear() {
+	cmas.byPod = make(map[string]map[string]*ContainerMemoryAllocationMetric)
+}
+
+func (cmas *ContainerMemoryAllocationSynthesizer) addRequestsMetric(update *metric.Update) {
+	if !cmas.isValidRequests(update.Labels) {
+		return
+	}
+
+	podUID := update.Labels[source.UIDLabel]
+	container := update.Labels[source.ContainerLabel]
+	if _, ok := cmas.byPod[podUID]; !ok {
+		cmas.byPod[podUID] = make(map[string]*ContainerMemoryAllocationMetric)
+	}
+
+	if _, ok := cmas.byPod[podUID][container]; !ok {
+		cmas.byPod[podUID][container] = &ContainerMemoryAllocationMetric{
+			requestMetric: update,
+		}
+	} else {
+		cmas.byPod[podUID][container].requestMetric = update
+	}
+}
+
+func (cmas *ContainerMemoryAllocationSynthesizer) addUsageMetric(update *metric.Update) {
+	if !cmas.isValidUsage(update.Labels) {
+		return
+	}
+
+	podUID := update.Labels[source.UIDLabel]
+	container := update.Labels[source.ContainerLabel]
+	if _, ok := cmas.byPod[podUID]; !ok {
+		cmas.byPod[podUID] = make(map[string]*ContainerMemoryAllocationMetric)
+	}
+
+	if _, ok := cmas.byPod[podUID][container]; !ok {
+		cmas.byPod[podUID][container] = &ContainerMemoryAllocationMetric{
+			usageMetric: update,
+		}
+	} else {
+		cmas.byPod[podUID][container].usageMetric = update
+	}
+}
+
+func (cmas *ContainerMemoryAllocationSynthesizer) isValidRequests(labels map[string]string) bool {
+	return labels[source.ResourceLabel] == "memory" &&
+		labels[source.UnitLabel] == "byte" &&
+		labels[source.ContainerLabel] != "POD" &&
+		labels[source.ContainerLabel] != "" &&
+		labels[source.NodeLabel] != "" &&
+		labels[source.UIDLabel] != ""
+}
+
+func (cmas *ContainerMemoryAllocationSynthesizer) isValidUsage(labels map[string]string) bool {
+	return labels[source.ContainerLabel] != "POD" &&
+		labels[source.ContainerLabel] != "" &&
+		labels[source.UIDLabel] != ""
+}

+ 11 - 2
modules/collector-source/pkg/metric/walinator.go

@@ -50,9 +50,10 @@ func NewWalinator(
 	}
 	pathFormatter, err := pathing.NewEventStoragePathFormatter(applicationName, clusterID, CollectorEventName)
 	if err != nil {
-		return nil, fmt.Errorf("filed to create path formatter for scrape controller: %s", err.Error())
+		return nil, fmt.Errorf("failed to create path formatter for scrape controller: %s", err.Error())
 	}
-	encoder := exporter.NewGZipEncoder(exporter.NewJSONEncoder[UpdateSet]())
+
+	encoder := exporter.NewBingenFileEncoder[UpdateSet]()
 	exp := exporter.NewEventStorageExporter(
 		pathFormatter,
 		encoder,
@@ -144,7 +145,15 @@ func deserializeUpdateSet(ext string, b []byte) (*UpdateSet, error) {
 		}
 
 		return deserializeUpdateSet(strings.TrimSuffix(ext, ".gz"), decompressed)
+	case "bingen":
+		updateSet := new(UpdateSet)
+		err := updateSet.UnmarshalBinary(b)
+		if err != nil {
+			return nil, fmt.Errorf("failed to unmarshal bingen: %w", err)
+		}
+		return updateSet, nil
 	}
+
 	return nil, fmt.Errorf("unrecognized extension: '%s'", ext)
 }
 

+ 287 - 2
modules/collector-source/pkg/scrape/clustercache.go

@@ -3,6 +3,7 @@ package scrape
 import (
 	"fmt"
 	"slices"
+	"strconv"
 	"strings"
 
 	"github.com/kubecost/events"
@@ -19,6 +20,8 @@ import (
 	"k8s.io/apimachinery/pkg/util/validation"
 )
 
+const unmountedPVsContainer = "unmounted-pvs"
+
 type ClusterCacheScraper struct {
 	clusterCache clustercache.ClusterCache
 }
@@ -214,10 +217,21 @@ func (ccs *ClusterCacheScraper) scrapeNamespaces(namespaces []*clustercache.Name
 
 func (ccs *ClusterCacheScraper) ScrapePods() []metric.Update {
 	pods := ccs.clusterCache.GetAllPods()
-	return ccs.scrapePods(pods)
+	pvcs := ccs.clusterCache.GetAllPersistentVolumeClaims()
+
+	return ccs.scrapePods(pods, pvcs)
 }
 
-func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Update {
+func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod, pvcs []*clustercache.PersistentVolumeClaim) []metric.Update {
+	// this is only populated if we find gpu resources being requested
+	var nodesGpuInfo map[string]*NodeGpuInfo
+
+	// pv allocation and unmounted pvs
+	pvcInfo := getPvcsInfo(pvcs)
+
+	// pod info by uid
+	podInfoByUid := make(map[string]map[string]string)
+
 	var scrapeResults []metric.Update
 	for _, pod := range pods {
 		podInfo := map[string]string{
@@ -228,6 +242,8 @@ func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Up
 			source.InstanceLabel:  pod.Spec.NodeName,
 		}
 
+		podInfoByUid[string(pod.UID)] = podInfo
+
 		// pod labels
 		labelNames, labelValues := promutil.KubeLabelsToLabels(pod.Labels)
 		podLabels := util.ToMap(labelNames, labelValues)
@@ -248,6 +264,23 @@ func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Up
 			AdditionalInfo: podAnnotations,
 		})
 
+		// Determine PVC use data for Pod
+		claimed := make(map[string]struct{})
+		for _, volume := range pod.Spec.Volumes {
+			if volume.PersistentVolumeClaim != nil {
+				name := volume.PersistentVolumeClaim.ClaimName
+				key := pod.Namespace + "," + name
+				if _, seen := claimed[key]; seen {
+					continue
+				}
+
+				if pvc, ok := pvcInfo[key]; ok {
+					pvc.PodsClaimed = append(pvc.PodsClaimed, string(pod.UID))
+					claimed[key] = struct{}{}
+				}
+			}
+		}
+
 		// Pod owner metric
 		for _, owner := range pod.OwnerReferences {
 			ownerInfo := maps.Clone(podInfo)
@@ -274,6 +307,9 @@ func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Up
 		}
 
 		for _, container := range pod.Spec.Containers {
+			// gpu "requests" is either the request or limit if it exists
+			var gpuRequest *float64
+
 			containerInfo := maps.Clone(podInfo)
 			containerInfo[source.ContainerLabel] = container.Name
 			// Requests
@@ -299,6 +335,12 @@ func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Up
 						Labels: resourceRequestInfo,
 						Value:  value,
 					})
+
+					// set gpu request if it exists
+					if isGpuResourceName(resourceName) {
+						gpuRequestValue := value
+						gpuRequest = &gpuRequestValue
+					}
 				}
 			}
 
@@ -325,8 +367,81 @@ func (ccs *ClusterCacheScraper) scrapePods(pods []*clustercache.Pod) []metric.Up
 						Labels: resourceLimitInfo,
 						Value:  value,
 					})
+
+					// if we didn't set a gpuRequest previously and the limit is a gpu resource,
+					// set it to the limit
+					if gpuRequest == nil && isGpuResourceName(resourceName) {
+						gpuRequestValue := value
+						gpuRequest = &gpuRequestValue
+					}
 				}
 			}
+
+			// handle the GPU allocation metric here IFF there exists a request/limit for GPUs
+			// we only load the node gpu data map if we run into a container with gpu requests/limits
+			if gpuRequest != nil {
+				if nodesGpuInfo == nil {
+					nodesGpuInfo = ccs.getNodesGpuInfo()
+				}
+
+				gpuAlloc := *gpuRequest
+				if nodeGpuInfo, ok := nodesGpuInfo[pod.Spec.NodeName]; ok {
+					if nodeGpuInfo != nil && nodeGpuInfo.VGPU != 0 {
+						gpuAlloc = gpuAlloc * (nodeGpuInfo.GPU / nodeGpuInfo.VGPU)
+					}
+				}
+
+				scrapeResults = append(scrapeResults, metric.Update{
+					Name:   metric.ContainerGPUAllocation,
+					Labels: maps.Clone(containerInfo),
+					Value:  gpuAlloc,
+				})
+			}
+		}
+	}
+
+	// Iterate through PVC Info after the pods have been tallied and export
+	// allocation metrics based on the number of other pods claiming the volume
+	for _, pvc := range pvcInfo {
+		// unmounted pvs get full allocation
+		if len(pvc.PodsClaimed) == 0 {
+			labels := map[string]string{
+				source.PodLabel:       unmountedPVsContainer,
+				source.NamespaceLabel: pvc.Namespace,
+				source.UIDLabel:       "",
+				source.NodeLabel:      "",
+				source.InstanceLabel:  "",
+				source.PVCLabel:       pvc.Claim,
+				source.PVLabel:        pvc.VolumeName,
+			}
+
+			scrapeResults = append(scrapeResults, metric.Update{
+				Name:   metric.PodPVCAllocation,
+				Labels: labels,
+				Value:  pvc.Requests,
+			})
+
+			continue
+		}
+
+		// pods get a proportion of pv allocation
+		value := pvc.Requests / float64(len(pvc.PodsClaimed))
+
+		for _, podUid := range pvc.PodsClaimed {
+			podInfo, ok := podInfoByUid[podUid]
+			if !ok {
+				continue
+			}
+
+			pvcLabels := maps.Clone(podInfo)
+			pvcLabels[source.PVCLabel] = pvc.Claim
+			pvcLabels[source.PVLabel] = pvc.VolumeName
+
+			scrapeResults = append(scrapeResults, metric.Update{
+				Name:   metric.PodPVCAllocation,
+				Labels: pvcLabels,
+				Value:  value,
+			})
 		}
 	}
 
@@ -632,6 +747,75 @@ func (ccs *ClusterCacheScraper) scrapeResourceQuotas(resourceQuotas []*clusterca
 	return scrapeResults
 }
 
+// PvcInfo is used to store information about a pvc for tracking volume usage.
+type PvcInfo struct {
+	Class       string
+	Claim       string
+	Namespace   string
+	VolumeName  string
+	Requests    float64
+	PodsClaimed []string
+}
+
+func getPvcsInfo(pvcs []*clustercache.PersistentVolumeClaim) map[string]*PvcInfo {
+	toReturn := make(map[string]*PvcInfo)
+
+	for _, pvc := range pvcs {
+		ns := pvc.Namespace
+		pvcName := pvc.Name
+		volumeName := pvc.Spec.VolumeName
+		pvClass := getPersistentVolumeClaimClass(pvc)
+		requests := float64(pvc.Spec.Resources.Requests.Storage().Value())
+
+		key := ns + "," + pvcName
+		toReturn[key] = &PvcInfo{
+			Class:      pvClass,
+			Claim:      pvcName,
+			Namespace:  ns,
+			VolumeName: volumeName,
+			Requests:   requests,
+		}
+	}
+
+	return toReturn
+}
+
+// NodeGpuInfo contains the gpu count and vgpu counts for nodes
+type NodeGpuInfo struct {
+	GPU  float64
+	VGPU float64
+}
+
+func (ccs *ClusterCacheScraper) getNodesGpuInfo() map[string]*NodeGpuInfo {
+	// use a closure to cache allocatableVGPU result instead of calculating
+	// it every time we need it
+	var allocatableVGPUs *float64
+	allocVGPUs := func() (float64, error) {
+		if allocatableVGPUs != nil {
+			return *allocatableVGPUs, nil
+		}
+
+		vgpu, err := getAllocatableVGPUs(ccs.clusterCache.GetAllDaemonSets())
+		if err != nil {
+			return vgpu, err
+		}
+		allocatableVGPUs = &vgpu
+		return *allocatableVGPUs, nil
+	}
+
+	var nodeGpuMap map[string]*NodeGpuInfo = make(map[string]*NodeGpuInfo)
+	for _, node := range ccs.clusterCache.GetAllNodes() {
+		info, err := gpuInfoFor(node, allocVGPUs)
+		if err != nil {
+			log.Warnf("Failed to retrieve GPU Info for Node: %s - %s", node.Name, err)
+			continue
+		}
+		nodeGpuMap[node.Name] = info
+	}
+
+	return nodeGpuMap
+}
+
 // getPersistentVolumeClaimClass returns StorageClassName. If no storage class was
 // requested, it returns "".
 func getPersistentVolumeClaimClass(claim *clustercache.PersistentVolumeClaim) string {
@@ -691,6 +875,10 @@ func toResourceUnitValue(resourceName v1.ResourceName, quantity resource.Quantit
 	return
 }
 
+func isGpuResourceName(name v1.ResourceName) bool {
+	return name == "nvidia.com/gpu" || name == "k8s.amazonaws.com/vgpu"
+}
+
 // isHugePageResourceName checks for a huge page container resource name
 func isHugePageResourceName(name v1.ResourceName) bool {
 	return strings.HasPrefix(string(name), v1.ResourceHugePagesPrefix)
@@ -722,3 +910,100 @@ func isNativeResource(name v1.ResourceName) bool {
 func isPrefixedNativeResource(name v1.ResourceName) bool {
 	return strings.Contains(string(name), v1.ResourceDefaultNamespacePrefix)
 }
+
+// gets the Node GPUs and VGPUs using the node data from k8s. Returns nil if GPUs could not be located for the node.
+func gpuInfoFor(
+	n *clustercache.Node,
+	allocatedVGPUs func() (float64, error),
+) (*NodeGpuInfo, error) {
+	g, hasGpu := n.Status.Capacity["nvidia.com/gpu"]
+	_, hasReplicas := n.Labels["nvidia.com/gpu.replicas"]
+
+	// Case 1: Standard NVIDIA GPU
+	if hasGpu && g.Value() != 0 && !hasReplicas {
+		return &NodeGpuInfo{
+			GPU:  float64(g.Value()),
+			VGPU: float64(g.Value()),
+		}, nil
+	}
+
+	// Case 2: NVIDIA GPU with GPU Feature Discovery (GFD) Pod enabled.
+	// Ref: https://docs.nvidia.com/datacenter/cloud-native/gpu-operator/latest/gpu-sharing.html#verifying-the-gpu-time-slicing-configuration
+	// Ref: https://github.com/NVIDIA/k8s-device-plugin/blob/d899752a424818428f744a946d32b132ea2c0cf1/internal/lm/resource_test.go#L44-L45
+	// Ref: https://github.com/NVIDIA/k8s-device-plugin/blob/d899752a424818428f744a946d32b132ea2c0cf1/internal/lm/resource_test.go#L103-L118
+	if hasReplicas {
+		resultGPU := 0.0
+		resultVGPU := 0.0
+
+		if c, ok := n.Labels["nvidia.com/gpu.count"]; ok {
+			var err error
+			resultGPU, err = strconv.ParseFloat(c, 64)
+			if err != nil {
+				return nil, fmt.Errorf("could not parse label \"nvidia.com/gpu.count\": %v", err)
+			}
+		}
+
+		if s, ok := n.Status.Capacity["nvidia.com/gpu.shared"]; ok { // GFD configured `renameByDefault=true`
+			resultVGPU = float64(s.Value())
+		} else if g, ok := n.Status.Capacity["nvidia.com/gpu"]; ok { // GFD configured `renameByDefault=false`
+			resultVGPU = float64(g.Value())
+		} else {
+			resultVGPU = resultGPU
+		}
+
+		return &NodeGpuInfo{
+			GPU:  resultGPU,
+			VGPU: resultVGPU,
+		}, nil
+	}
+
+	// Case 3: AWS vGPU
+	if vgpu, ok := n.Status.Capacity["k8s.amazonaws.com/vgpu"]; ok {
+		vgpuCount, err := allocatedVGPUs()
+		if err != nil {
+			return nil, err
+		}
+
+		vgpuCoeff := 10.0
+		if vgpuCount > 0.0 {
+			vgpuCoeff = vgpuCount
+		}
+
+		if vgpu.Value() != 0 {
+			resultGPU := float64(vgpu.Value()) / vgpuCoeff
+			resultVGPU := float64(vgpu.Value())
+			return &NodeGpuInfo{
+				GPU:  resultGPU,
+				VGPU: resultVGPU,
+			}, nil
+		}
+	}
+
+	// No GPU found
+	return nil, nil
+}
+
+func getAllocatableVGPUs(daemonsets []*clustercache.DaemonSet) (float64, error) {
+	vgpuCount := 0.0
+
+	for _, ds := range daemonsets {
+		dsContainerList := &ds.SpecContainers
+		for _, ctnr := range *dsContainerList {
+			if ctnr.Args != nil {
+				for _, arg := range ctnr.Args {
+					if strings.Contains(arg, "--vgpu=") {
+						vgpus, err := strconv.ParseFloat(arg[strings.IndexByte(arg, '=')+1:], 64)
+						if err != nil {
+							log.Errorf("failed to parse vgpu allocation string %s: %v", arg, err)
+							continue
+						}
+						vgpuCount = vgpus
+						return vgpuCount, nil
+					}
+
+				}
+			}
+		}
+	}
+	return vgpuCount, nil
+}

+ 32 - 1
modules/collector-source/pkg/scrape/clustercache_test.go

@@ -311,6 +311,7 @@ func Test_kubernetesScraper_scrapePods(t *testing.T) {
 	start1, _ := time.Parse(time.RFC3339, Start1Str)
 
 	type scrape struct {
+		PVCs      []*clustercache.PersistentVolumeClaim
 		Pods      []*clustercache.Pod
 		Timestamp time.Time
 	}
@@ -323,6 +324,22 @@ func Test_kubernetesScraper_scrapePods(t *testing.T) {
 			name: "simple",
 			scrapes: []scrape{
 				{
+					PVCs: []*clustercache.PersistentVolumeClaim{
+						{
+							Name:      "pvc1",
+							Namespace: "namespace1",
+							UID:       "uuid1",
+							Spec: v1.PersistentVolumeClaimSpec{
+								VolumeName:       "vol1",
+								StorageClassName: util.Ptr("storageClass1"),
+								Resources: v1.VolumeResourceRequirements{
+									Requests: v1.ResourceList{
+										v1.ResourceStorage: resource.MustParse("4096"),
+									},
+								},
+							},
+						},
+					},
 					Pods: []*clustercache.Pod{
 						{
 							Name:      "pod1",
@@ -494,6 +511,20 @@ func Test_kubernetesScraper_scrapePods(t *testing.T) {
 					Value:          1024,
 					AdditionalInfo: nil,
 				},
+				{
+					Name: metric.PodPVCAllocation,
+					Labels: map[string]string{
+						source.InstanceLabel:  "",
+						source.NamespaceLabel: "namespace1",
+						source.NodeLabel:      "",
+						source.PVLabel:        "vol1",
+						source.PVCLabel:       "pvc1",
+						source.PodLabel:       "unmounted-pvs",
+						source.UIDLabel:       "",
+					},
+					Value:          4096,
+					AdditionalInfo: nil,
+				},
 			},
 		},
 	}
@@ -502,7 +533,7 @@ func Test_kubernetesScraper_scrapePods(t *testing.T) {
 			ks := &ClusterCacheScraper{}
 			var scrapeResults []metric.Update
 			for _, s := range tt.scrapes {
-				res := ks.scrapePods(s.Pods)
+				res := ks.scrapePods(s.Pods, s.PVCs)
 				scrapeResults = append(scrapeResults, res...)
 			}
 

+ 0 - 4
modules/collector-source/pkg/scrape/opencost.go

@@ -33,10 +33,6 @@ func newOpencostTargetScraper(provider target.TargetProvider) *TargetScraper {
 			metric.NodeGPUHourlyCost,
 			metric.NodeGPUCount,
 			metric.KubecostNodeIsSpot,
-			metric.ContainerCPUAllocation,
-			metric.ContainerMemoryAllocationBytes,
-			metric.ContainerGPUAllocation,
-			metric.PodPVCAllocation,
 		},
 		true)
 }

+ 0 - 86
modules/collector-source/pkg/scrape/targetscraper_test.go

@@ -408,92 +408,6 @@ func TestTargetScraper_Scrape(t *testing.T) {
 					},
 					Value: 0,
 				},
-				{
-					Name: metric.ContainerCPUAllocation,
-					Labels: map[string]string{
-						"container": "container1",
-						"instance":  "node1",
-						"namespace": "namespace1",
-						"node":      "node1",
-						"pod":       "pod1",
-					},
-					Value: 0.02,
-				},
-				{
-					Name: metric.ContainerCPUAllocation,
-					Labels: map[string]string{
-						"container": "container2",
-						"instance":  "node2",
-						"namespace": "namespace1",
-						"node":      "node2",
-						"pod":       "pod2",
-					},
-					Value: 0.01,
-				},
-				{
-					Name: metric.ContainerMemoryAllocationBytes,
-					Labels: map[string]string{
-						"container": "container1",
-						"instance":  "node1",
-						"namespace": "namespace1",
-						"node":      "node1",
-						"pod":       "pod1",
-					},
-					Value: 1.1528192e+07,
-				},
-				{
-					Name: metric.ContainerMemoryAllocationBytes,
-					Labels: map[string]string{
-						"container": "container2",
-						"instance":  "node2",
-						"namespace": "namespace1",
-						"node":      "node2",
-						"pod":       "pod2",
-					},
-					Value: 1e+07,
-				},
-				{
-					Name: metric.ContainerGPUAllocation,
-					Labels: map[string]string{
-						"container": "container1",
-						"instance":  "node1",
-						"namespace": "namespace1",
-						"node":      "node1",
-						"pod":       "pod1",
-					},
-					Value: 0,
-				},
-				{
-					Name: metric.ContainerGPUAllocation,
-					Labels: map[string]string{
-						"container": "container2",
-						"instance":  "node2",
-						"namespace": "namespace1",
-						"node":      "node2",
-						"pod":       "pod2",
-					},
-					Value: 0,
-				},
-				{
-					Name: metric.PodPVCAllocation,
-					Labels: map[string]string{
-						"namespace":             "namespace1",
-						"persistentvolume":      "pvc-1",
-						"persistentvolumeclaim": "pvc1",
-						"pod":                   "pod1",
-					},
-					Value: 3.4359738368e+10,
-				},
-				{
-					Name: metric.PodPVCAllocation,
-					Labels: map[string]string{
-						"namespace":             "namespace1",
-						"persistentvolume":      "pvc-2",
-						"persistentvolumeclaim": "pvc2",
-						"pod":                   "pod2",
-					},
-					Value: 3.4359738368e+10,
-				},
 			},
 		},
 		{

+ 28 - 0
pkg/costmodel/allocation_helpers.go

@@ -317,6 +317,13 @@ func applyCPUCoresLimits(podMap map[podKey]*pod, resCPUCoresLimits []*source.CPU
 			}
 
 			thisPod.Allocations[container].CPUCoreLimitAverage = res.Data[0].Value
+
+			node := res.Node
+			if node == "" {
+				continue
+			}
+			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -362,6 +369,13 @@ func applyCPUCoresUsedAvg(podMap map[podKey]*pod, resCPUCoresUsedAvg []*source.C
 				log.Infof("[WARNING] Very large cpu USAGE, dropping outlier")
 				thisPod.Allocations[container].CPUCoreUsageAverage = 0.0
 			}
+
+			node := res.Node
+			if node == "" {
+				continue
+			}
+			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }
@@ -556,6 +570,13 @@ func applyRAMBytesLimits(podMap map[podKey]*pod, resRAMBytesLimits []*source.RAM
 			}
 
 			pod.Allocations[container].RAMBytesLimitAverage = res.Data[0].Value
+
+			node := res.Node
+			if node == "" {
+				continue
+			}
+			pod.Allocations[container].Properties.Node = node
+			pod.Node = node
 		}
 	}
 }
@@ -597,6 +618,13 @@ func applyRAMBytesUsedAvg(podMap map[podKey]*pod, resRAMBytesUsedAvg []*source.R
 			}
 
 			thisPod.Allocations[container].RAMBytesUsageAverage = res.Data[0].Value
+
+			node := res.Node
+			if node == "" {
+				continue
+			}
+			thisPod.Allocations[container].Properties.Node = node
+			thisPod.Node = node
 		}
 	}
 }