Переглянути джерело

Streaming Bingen Support (#3671)

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Matt Bolt 3 тижнів тому
батько
коміт
50eb7e4355

+ 20 - 2
core/pkg/exporter/decoder.go

@@ -12,14 +12,22 @@ import (
 )
 
 type Decoder[T any] func([]byte) (*T, error)
+type StreamingDecoder[T any] func(io.Reader) (*T, error)
 
-// BinaryMarshalerPtr[T] is a generic constraint to ensure types passed to the encoder implement
-// encoding.BinaryMarshaler and are pointers to T.
+// BinaryMarshalerPtr[T] is a generic constraint to ensure types passed to the decoder implement
+// encoding.BinaryUnmarshaler and are pointers to T.
 type BinaryUnmarshalerPtr[T any] interface {
 	encoding.BinaryUnmarshaler
 	*T
 }
 
+// BinaryUnmarshalWithReaderPtr[T] is a generic constraint to ensure types passed to the decoder
+// implement an unmarshal from io.Reader method.
+type BinaryUnmarshalWithReaderPtr[T any] interface {
+	UnmarshalBinaryFromReader(reader io.Reader) error
+	*T
+}
+
 func BingenDecoder[T any, U BinaryUnmarshalerPtr[T]](data []byte) (*T, error) {
 	var set U = new(T)
 
@@ -31,6 +39,16 @@ func BingenDecoder[T any, U BinaryUnmarshalerPtr[T]](data []byte) (*T, error) {
 	return set, nil
 }
 
+func StreamingBingenDecoder[T any, U BinaryUnmarshalWithReaderPtr[T]](reader io.Reader) (*T, error) {
+	var set U = new(T)
+
+	err := set.UnmarshalBinaryFromReader(reader)
+	if err != nil {
+		return nil, fmt.Errorf("failed to decode bingen: %w", err)
+	}
+	return set, nil
+}
+
 func JSONDecoder[T any](data []byte) (*T, error) {
 	var instance = new(T)
 	err := json.Unmarshal(data, instance)

+ 4 - 4
core/pkg/opencost/bingen.go

@@ -30,7 +30,7 @@ package opencost
 // @bingen:generate:AssetLabels
 // @bingen:generate:AssetProperties
 // @bingen:generate:AssetProperty
-// @bingen:generate[stringtable,preprocess,postprocess]:AssetSet
+// @bingen:generate[streamable,stringtable,preprocess,postprocess]:AssetSet
 // @bingen:generate:AssetSetRange
 // @bingen:generate:Breakdown
 // @bingen:generate:Cloud
@@ -46,7 +46,7 @@ package opencost
 // Allocation Version Set: Includes Allocation pipeline specific resources
 // @bingen:set[name=Allocation,version=25]
 // @bingen:generate[migrate]:Allocation
-// @bingen:generate[stringtable]:AllocationSet
+// @bingen:generate[streamable,stringtable]:AllocationSet
 // @bingen:generate:AllocationSetRange
 // @bingen:generate:AllocationProperties
 // @bingen:generate:AllocationProperty
@@ -64,14 +64,14 @@ package opencost
 // @bingen:set[name=CloudCost,version=3]
 // @bingen:generate:CloudCost
 // @bingen:generate:CostMetric
-// @bingen:generate[stringtable]:CloudCostSet
+// @bingen:generate[streamable,stringtable]:CloudCostSet
 // @bingen:generate:CloudCostSetRange
 // @bingen:generate:CloudCostProperties
 // @bingen:generate:CloudCostLabels
 // @bingen:end
 
 // @bingen:set[name=NetworkInsight,version=1]
-// @bingen:generate:NetworkInsightSet
+// @bingen:generate[streamable,stringtable]:NetworkInsightSet
 // @bingen:generate:NetworkInsight
 // @bingen:generate:NetworkTrafficDirection
 // @bingen:generate:NetworkTrafficType

Різницю між файлами не показано, бо вона завелика
+ 551 - 153
core/pkg/opencost/opencost_codecs.go


+ 219 - 41
core/pkg/opencost/opencost_codecs_test.go

@@ -1,16 +1,52 @@
 package opencost
 
 import (
+	"bytes"
+	"io"
 	"testing"
 	"time"
 )
 
-func TestAllocation_BinaryEncoding(t *testing.T) {
+type UnmarshalFunc func(BingenUnmarshalable, []byte) error
+
+func RunAllocation_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	// TODO niko
 }
 
-func TestAllocationSet_BinaryEncoding(t *testing.T) {
-	// TODO niko
+func RunAllocationSet_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
+	end := time.Now().UTC().Truncate(day)
+	start := end.Add(-day * 10)
+
+	for start.Before(end) {
+		set0 := GenerateMockAllocationSetClusterIdle(start)
+
+		bytes, err := set0.MarshalBinary()
+		if err != nil {
+			t.Fatalf("Failed to AllocationSet.MarshalBinary: %s", err)
+			return
+		}
+
+		set1 := new(AllocationSet)
+		err = unmarshal(set1, bytes)
+		if err != nil {
+			t.Fatalf("Failed to AllocationSet.UnmarshalBinary: %s", err)
+			return
+		}
+
+		for key, alloc := range set1.Allocations {
+			other, ok := set0.Allocations[key]
+			if !ok {
+				t.Fatalf("Failed to match Allocation for key: %s", key)
+				return
+			}
+
+			if !alloc.Equal(other) {
+				t.Fatalf("allocations for key: %s did not match", key)
+			}
+		}
+
+		start = start.Add(day)
+	}
 }
 
 func BenchmarkAllocationSetRange_BinaryEncoding(b *testing.B) {
@@ -78,7 +114,7 @@ func BenchmarkAllocationSetRange_BinaryEncoding(b *testing.B) {
 	}
 }
 
-func TestAllocationSetRange_BinaryEncoding(t *testing.T) {
+func RunAllocationSetRange_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	endYesterday := time.Now().UTC().Truncate(day)
 	startYesterday := endYesterday.Add(-day)
 	startD2 := startYesterday
@@ -86,7 +122,6 @@ func TestAllocationSetRange_BinaryEncoding(t *testing.T) {
 	startD0 := startD1.Add(-day)
 
 	var asr0, asr1 *AllocationSetRange
-	var bs []byte
 	var err error
 
 	asr0 = NewAllocationSetRange(
@@ -94,20 +129,29 @@ func TestAllocationSetRange_BinaryEncoding(t *testing.T) {
 		GenerateMockAllocationSetClusterIdle(startD1),
 		GenerateMockAllocationSetClusterIdle(startD2),
 	)
-
-	bs, err = asr0.MarshalBinary()
-	if err != nil {
-		t.Fatalf("AllocationSetRange.Binary: unexpected error: %s", err)
-		return
+	asrSets0 := [][]byte{}
+	for _, as := range asr0.Allocations {
+		bytes, err := as.MarshalBinary()
+		if err != nil {
+			t.Fatalf("Failed to marshal allocation set into []byte: %s", err)
+			return
+		}
+		asrSets0 = append(asrSets0, bytes)
 	}
 
-	asr1 = &AllocationSetRange{}
-	err = asr1.UnmarshalBinary(bs)
-	if err != nil {
-		t.Fatalf("AllocationSetRange.Binary: unexpected error: %s", err)
-		return
+	asrSets1 := []*AllocationSet{}
+	for _, bytes := range asrSets0 {
+		allocSet := new(AllocationSet)
+		err = unmarshal(allocSet, bytes)
+		if err != nil {
+			t.Fatalf("AllocationSet.Binary: unexpected error: %s", err)
+			return
+		}
+		asrSets1 = append(asrSets1, allocSet)
 	}
 
+	asr1 = NewAllocationSetRange(asrSets1...)
+
 	if asr0.Length() != asr1.Length() {
 		t.Fatalf("AllocationSetRange.Binary: expected %d; found %d", asr0.Length(), asr1.Length())
 	}
@@ -143,7 +187,7 @@ func TestAllocationSetRange_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestAny_BinaryEncoding(t *testing.T) {
+func RunAny_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	start := time.Date(2020, time.September, 16, 0, 0, 0, 0, time.UTC)
 	end := start.Add(24 * time.Hour)
 	window := NewWindow(&start, &end)
@@ -167,7 +211,7 @@ func TestAny_BinaryEncoding(t *testing.T) {
 	}
 
 	a1 = &Any{}
-	err = a1.UnmarshalBinary(bs)
+	err = unmarshal(a1, bs)
 	if err != nil {
 		t.Fatalf("Any.Binary: unexpected error: %s", err)
 	}
@@ -192,15 +236,15 @@ func TestAny_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestAsset_BinaryEncoding(t *testing.T) {
+func RunAsset_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	// TODO niko
 }
 
-func TestAssetSet_BinaryEncoding(t *testing.T) {
+func RunAssetSet_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	// TODO niko
 }
 
-func TestAssetSetRange_BinaryEncoding(t *testing.T) {
+func RunAssetSetRange_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	endYesterday := time.Now().UTC().Truncate(day)
 	startYesterday := endYesterday.Add(-day)
 	startD2 := startYesterday
@@ -224,7 +268,7 @@ func TestAssetSetRange_BinaryEncoding(t *testing.T) {
 	}
 
 	asr1 = &AssetSetRange{}
-	err = asr1.UnmarshalBinary(bs)
+	err = unmarshal(asr1, bs)
 	if err != nil {
 		t.Fatalf("AssetSetRange.Binary: unexpected error: %s", err)
 		return
@@ -263,7 +307,7 @@ func TestAssetSetRange_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestBreakdown_BinaryEncoding(t *testing.T) {
+func RunBreakdown_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	var b0, b1 *Breakdown
 	var bs []byte
 	var err error
@@ -281,7 +325,7 @@ func TestBreakdown_BinaryEncoding(t *testing.T) {
 	}
 
 	b1 = &Breakdown{}
-	err = b1.UnmarshalBinary(bs)
+	err = unmarshal(b1, bs)
 	if err != nil {
 		t.Fatalf("Breakdown.Binary: unexpected error: %s", err)
 	}
@@ -300,7 +344,7 @@ func TestBreakdown_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestCloudAny_BinaryEncoding(t *testing.T) {
+func RunCloudAny_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	ws := time.Date(2020, time.September, 16, 0, 0, 0, 0, time.UTC)
 	we := ws.Add(24 * time.Hour)
 	window := NewWindow(&ws, &we)
@@ -319,7 +363,7 @@ func TestCloudAny_BinaryEncoding(t *testing.T) {
 	}
 
 	a1 = &Cloud{}
-	err = a1.UnmarshalBinary(bs)
+	err = unmarshal(a1, bs)
 	if err != nil {
 		t.Fatalf("CloudAny.Binary: unexpected error: %s", err)
 	}
@@ -329,7 +373,7 @@ func TestCloudAny_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestClusterManagement_BinaryEncoding(t *testing.T) {
+func RunClusterManagement_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	ws := time.Date(2020, time.September, 16, 0, 0, 0, 0, time.UTC)
 	we := ws.Add(24 * time.Hour)
 	window := NewWindow(&ws, &we)
@@ -358,7 +402,7 @@ func TestClusterManagement_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestDisk_BinaryEncoding(t *testing.T) {
+func RunDisk_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	ws := time.Date(2020, time.September, 16, 0, 0, 0, 0, time.UTC)
 	we := ws.Add(24 * time.Hour)
 	window := NewWindow(&ws, &we)
@@ -389,7 +433,7 @@ func TestDisk_BinaryEncoding(t *testing.T) {
 	}
 
 	a1 = &Disk{}
-	err = a1.UnmarshalBinary(bs)
+	err = unmarshal(a1, bs)
 	if err != nil {
 		t.Fatalf("Disk.Binary: unexpected error: %s", err)
 	}
@@ -399,7 +443,7 @@ func TestDisk_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestNode_BinaryEncoding(t *testing.T) {
+func RunNode_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	ws := time.Date(2020, time.September, 16, 0, 0, 0, 0, time.UTC)
 	we := ws.Add(24 * time.Hour)
 	window := NewWindow(&ws, &we)
@@ -441,7 +485,7 @@ func TestNode_BinaryEncoding(t *testing.T) {
 	}
 
 	a1 = &Node{}
-	err = a1.UnmarshalBinary(bs)
+	err = unmarshal(a1, bs)
 	if err != nil {
 		t.Fatalf("Node.Binary: unexpected error: %s", err)
 	}
@@ -451,7 +495,7 @@ func TestNode_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestProperties_BinaryEncoding(t *testing.T) {
+func RunProperties_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	var p0, p1 *AllocationProperties
 	var bs []byte
 	var err error
@@ -464,7 +508,7 @@ func TestProperties_BinaryEncoding(t *testing.T) {
 	}
 
 	p1 = &AllocationProperties{}
-	err = p1.UnmarshalBinary(bs)
+	err = unmarshal(p1, bs)
 	if err != nil {
 		t.Fatalf("AllocationProperties.Binary: unexpected error: %s", err)
 	}
@@ -501,7 +545,7 @@ func TestProperties_BinaryEncoding(t *testing.T) {
 	}
 
 	p1 = &AllocationProperties{}
-	err = p1.UnmarshalBinary(bs)
+	err = unmarshal(p1, bs)
 	if err != nil {
 		t.Fatalf("AllocationProperties.Binary: unexpected error: %s", err)
 	}
@@ -527,7 +571,7 @@ func TestProperties_BinaryEncoding(t *testing.T) {
 	}
 
 	p1 = &AllocationProperties{}
-	err = p1.UnmarshalBinary(bs)
+	err = unmarshal(p1, bs)
 	if err != nil {
 		t.Fatalf("AllocationProperties.Binary: unexpected error: %s", err)
 	}
@@ -537,7 +581,7 @@ func TestProperties_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestShared_BinaryEncoding(t *testing.T) {
+func RunShared_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	ws := time.Date(2020, time.September, 16, 0, 0, 0, 0, time.UTC)
 	we := ws.Add(24 * time.Hour)
 	window := NewWindow(&ws, &we)
@@ -556,7 +600,7 @@ func TestShared_BinaryEncoding(t *testing.T) {
 	}
 
 	a1 = &SharedAsset{}
-	err = a1.UnmarshalBinary(bs)
+	err = unmarshal(a1, bs)
 	if err != nil {
 		t.Fatalf("SharedAsset.Binary: unexpected error: %s", err)
 	}
@@ -566,7 +610,7 @@ func TestShared_BinaryEncoding(t *testing.T) {
 	}
 }
 
-func TestWindow_BinaryEncoding(t *testing.T) {
+func RunWindow_BinaryEncodingTest(t *testing.T, unmarshal UnmarshalFunc) {
 	var w0, w1 Window
 	var bs []byte
 	var err error
@@ -578,7 +622,7 @@ func TestWindow_BinaryEncoding(t *testing.T) {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
 
-	err = w1.UnmarshalBinary(bs)
+	err = unmarshal(&w1, bs)
 	if err != nil {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
@@ -598,7 +642,7 @@ func TestWindow_BinaryEncoding(t *testing.T) {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
 
-	err = w1.UnmarshalBinary(bs)
+	err = unmarshal(&w1, bs)
 	if err != nil {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
@@ -618,7 +662,7 @@ func TestWindow_BinaryEncoding(t *testing.T) {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
 
-	err = w1.UnmarshalBinary(bs)
+	err = unmarshal(&w1, bs)
 	if err != nil {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
@@ -638,7 +682,7 @@ func TestWindow_BinaryEncoding(t *testing.T) {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
 
-	err = w1.UnmarshalBinary(bs)
+	err = unmarshal(&w1, bs)
 	if err != nil {
 		t.Fatalf("Window.Binary: unexpected error: %s", err)
 	}
@@ -650,3 +694,137 @@ func TestWindow_BinaryEncoding(t *testing.T) {
 		t.Fatalf("Window.Binary: expected %v; found %v", w0.End(), w1.End())
 	}
 }
+
+type BingenUnmarshalable interface {
+	UnmarshalBinary([]byte) error
+	UnmarshalBinaryFromReader(io.Reader) error
+}
+
+func UnmarshalBingenBytes(value BingenUnmarshalable, b []byte) error {
+	return value.UnmarshalBinary(b)
+}
+
+func UnmarshalBingenReader(value BingenUnmarshalable, b []byte) error {
+	// convert bytes to reader in order to leverage io.Reader string table
+	reader := bytes.NewReader(b)
+	return value.UnmarshalBinaryFromReader(reader)
+}
+
+func RunAllOpencostBingenCodecTests(t *testing.T, unmarshal UnmarshalFunc) {
+	tests := []struct {
+		name string
+		f    func(*testing.T, UnmarshalFunc)
+	}{
+		{
+			name: "RunAllocation_BinaryEncodingTest",
+			f:    RunAllocation_BinaryEncodingTest,
+		},
+		{
+			name: "RunAllocationSet_BinaryEncodingTest",
+			f:    RunAllocationSet_BinaryEncodingTest,
+		},
+		{
+			name: "RunAllocationSetRange_BinaryEncodingTest",
+			f:    RunAllocationSetRange_BinaryEncodingTest,
+		},
+		{
+			name: "RunAny_BinaryEncodingTest",
+			f:    RunAny_BinaryEncodingTest,
+		},
+		{
+			name: "RunAsset_BinaryEncodingTest",
+			f:    RunAsset_BinaryEncodingTest,
+		},
+		{
+			name: "RunAssetSet_BinaryEncodingTest",
+			f:    RunAssetSet_BinaryEncodingTest,
+		},
+		{
+			name: "RunAssetSetRange_BinaryEncodingTest",
+			f:    RunAssetSetRange_BinaryEncodingTest,
+		},
+		{
+			name: "RunBreakdown_BinaryEncodingTest",
+			f:    RunBreakdown_BinaryEncodingTest,
+		},
+		{
+			name: "RunCloudAny_BinaryEncodingTest",
+			f:    RunCloudAny_BinaryEncodingTest,
+		},
+		{
+			name: "RunClusterManagement_BinaryEncodingTest",
+			f:    RunClusterManagement_BinaryEncodingTest,
+		},
+		{
+			name: "RunDisk_BinaryEncodingTest",
+			f:    RunDisk_BinaryEncodingTest,
+		},
+		{
+			name: "RunNode_BinaryEncodingTest",
+			f:    RunNode_BinaryEncodingTest,
+		},
+		{
+			name: "RunProperties_BinaryEncodingTest",
+			f:    RunProperties_BinaryEncodingTest,
+		},
+		{
+			name: "RunShared_BinaryEncodingTest",
+			f:    RunShared_BinaryEncodingTest,
+		},
+		{
+			name: "RunWindow_BinaryEncodingTest",
+			f:    RunWindow_BinaryEncodingTest,
+		},
+	}
+
+	for _, test := range tests {
+		t.Run(test.name, func(tt *testing.T) {
+			test.f(tt, unmarshal)
+		})
+	}
+}
+
+func TestOpencostBingenDefaultsWithBytes(t *testing.T) {
+	config := DefaultBingenConfiguration()
+	ConfigureBingen(config)
+
+	RunAllOpencostBingenCodecTests(t, UnmarshalBingenBytes)
+}
+
+func TestOpencostBingenFileStringTableEnabledWithBytes(t *testing.T) {
+	// This test _should_ still run the slice based string table because raw []byte
+	// data always uses the string slice table
+	config := DefaultBingenConfiguration()
+	config.FileBackedStringTableEnabled = true
+	config.FileBackedStringTableDir = t.TempDir()
+	ConfigureBingen(config)
+
+	// reset configuration to default on completion
+	defer ConfigureBingen(DefaultBingenConfiguration())
+
+	RunAllOpencostBingenCodecTests(t, UnmarshalBingenBytes)
+}
+
+func TestOpencostBingenDefaultsWithReader(t *testing.T) {
+	// This test _should_ still run the slice based string table because we haven't configured
+	// bingen to use the file string table
+	config := DefaultBingenConfiguration()
+	ConfigureBingen(config)
+
+	// we use the reader to unmarshal instead of []bytes
+	RunAllOpencostBingenCodecTests(t, UnmarshalBingenReader)
+}
+
+func TestOpencostBingenFileStringTableEnabledWithReader(t *testing.T) {
+	// This test _should_ use the file backed string table because we have enabled it AND
+	// we're using a reader
+	config := DefaultBingenConfiguration()
+	config.FileBackedStringTableEnabled = true
+	config.FileBackedStringTableDir = t.TempDir()
+	ConfigureBingen(config)
+
+	// reset configuration to default on completion
+	defer ConfigureBingen(DefaultBingenConfiguration())
+
+	RunAllOpencostBingenCodecTests(t, UnmarshalBingenReader)
+}

+ 215 - 196
core/pkg/util/buffer.go

@@ -1,30 +1,36 @@
 package util
 
 import (
+	"bufio"
 	"bytes"
-	"encoding/binary"
 	"errors"
+	"fmt"
 	"io"
 	"math"
+	"os"
 	"unsafe"
 
 	"github.com/opencost/opencost/core/pkg/util/stringutil"
 )
 
+var bytePool *bufferPool = newBufferPool()
+
 // NonPrimitiveTypeError represents an error where the user provided a non-primitive data type for reading/writing
 var NonPrimitiveTypeError error = errors.New("Type provided to read/write does not fit inside 8 bytes.")
 
 // Buffer is a utility type which implements a very basic binary protocol for
 // writing core go types.
 type Buffer struct {
-	b *bytes.Buffer
+	b  *bufio.Reader
+	bw *bytes.Buffer
 }
 
 // NewBuffer creates a new Buffer instance using LittleEndian ByteOrder.
 func NewBuffer() *Buffer {
 	var b bytes.Buffer
 	return &Buffer{
-		b: &b,
+		b:  nil,
+		bw: &b,
 	}
 }
 
@@ -32,7 +38,7 @@ func NewBuffer() *Buffer {
 // The new buffer assumes ownership of the byte slice.
 func NewBufferFromBytes(b []byte) *Buffer {
 	return &Buffer{
-		b: bytes.NewBuffer(b),
+		bw: bytes.NewBuffer(b),
 	}
 }
 
@@ -41,180 +47,298 @@ func NewBufferFromBytes(b []byte) *Buffer {
 func NewBufferFrom(b *Buffer) *Buffer {
 	bb := b.Bytes()
 	return &Buffer{
-		b: bytes.NewBuffer(bb),
+		bw: bytes.NewBuffer(bb),
+	}
+}
+
+// NewBufferFromReader creates a new Buffer instance using the provided io.Reader. This
+// buffer is set to read-only.
+func NewBufferFromReader(reader io.Reader) *Buffer {
+	return &Buffer{
+		b:  bufio.NewReader(reader),
+		bw: nil,
 	}
 }
 
-// WriteBool writes a bool value to the buffer.
-func (b *Buffer) WriteBool(t bool) {
-	write(b.b, t)
+// WriteBool writes a bool value to the buffer
+func (b *Buffer) WriteBool(i bool) {
+	b.checkRO()
+	writeBool(b.bw, i)
 }
 
 // WriteInt writes an int value to the buffer.
 func (b *Buffer) WriteInt(i int) {
-	write(b.b, int32(i))
+	b.checkRO()
+	writeInt(b.bw, i)
 }
 
 // WriteInt8 writes an int8 value to the buffer.
 func (b *Buffer) WriteInt8(i int8) {
-	write(b.b, i)
+	b.checkRO()
+	writeInt8(b.bw, i)
 }
 
 // WriteInt16 writes an int16 value to the buffer.
 func (b *Buffer) WriteInt16(i int16) {
-	write(b.b, i)
+	b.checkRO()
+	writeInt16(b.bw, i)
 }
 
 // WriteInt32 writes an int32 value to the buffer.
 func (b *Buffer) WriteInt32(i int32) {
-	write(b.b, i)
+	b.checkRO()
+	writeInt32(b.bw, i)
 }
 
 // WriteInt64 writes an int64 value to the buffer.
 func (b *Buffer) WriteInt64(i int64) {
-	write(b.b, i)
+	b.checkRO()
+	writeInt64(b.bw, i)
 }
 
 // WriteUInt writes a uint value to the buffer.
 func (b *Buffer) WriteUInt(i uint) {
-	write(b.b, i)
+	b.checkRO()
+	writeUint(b.bw, i)
 }
 
 // WriteUInt8 writes a uint8 value to the buffer.
 func (b *Buffer) WriteUInt8(i uint8) {
-	write(b.b, i)
+	b.checkRO()
+	writeUint8(b.bw, i)
 }
 
 // WriteUInt16 writes a uint16 value to the buffer.
 func (b *Buffer) WriteUInt16(i uint16) {
-	write(b.b, i)
+	b.checkRO()
+	writeUint16(b.bw, i)
 }
 
 // WriteUInt32 writes a uint32 value to the buffer.
 func (b *Buffer) WriteUInt32(i uint32) {
-	write(b.b, i)
+	b.checkRO()
+	writeUint32(b.bw, i)
 }
 
 // WriteUInt64 writes a uint64 value to the buffer.
 func (b *Buffer) WriteUInt64(i uint64) {
-	write(b.b, i)
+	b.checkRO()
+	writeUint64(b.bw, i)
 }
 
 // WriteFloat32 writes a float32 value to the buffer.
 func (b *Buffer) WriteFloat32(i float32) {
-	write(b.b, i)
+	b.checkRO()
+	writeFloat32(b.bw, i)
 }
 
 // WriteFloat64 writes a float64 value to the buffer.
 func (b *Buffer) WriteFloat64(i float64) {
-	write(b.b, i)
+	b.checkRO()
+	writeFloat64(b.bw, i)
 }
 
 // WriteString writes the string's length as a uint16 followed by the string contents.
 func (b *Buffer) WriteString(i string) {
+	b.checkRO()
 	s := stringToBytes(i)
 
 	// string lengths are limited to uint16 - See ReadString()
 	if len(s) > math.MaxUint16 {
 		s = s[:math.MaxUint16]
 	}
-	write(b.b, uint16(len(s)))
-	b.b.Write(s)
+	writeUint16(b.bw, uint16(len(s)))
+	b.bw.Write(s)
 }
 
 // WriteBytes writes the contents of the byte slice to the buffer.
 func (b *Buffer) WriteBytes(bytes []byte) {
-	b.b.Write(bytes)
+	b.checkRO()
+	b.bw.Write(bytes)
+}
+
+// Bytes returns the unread portion of the underlying buffer storage. If the buffer was
+// created with an `io.Reader`, then the remaining unread bytes are drained into a byte
+// slice and returned.
+func (b *Buffer) Bytes() []byte {
+	if b.bw != nil {
+		return b.bw.Bytes()
+	}
+
+	bytes, err := io.ReadAll(b.b)
+	if err != nil {
+		fmt.Fprintf(os.Stderr, "failed to read remaining bytes from Buffer: %s\n", err)
+	}
+	return bytes
+}
+
+func (b *Buffer) Peek(length int) ([]byte, error) {
+	if b.bw != nil {
+		return nil, fmt.Errorf("unsupported Peek() operation on read/write buffer.")
+	}
+	return b.b.Peek(length)
+}
+
+// this should be inlined
+func (b *Buffer) checkRO() {
+	if b.bw == nil {
+		panic("Buffer is set to read-only")
+	}
 }
 
 // ReadBool reads a bool value from the buffer.
 func (b *Buffer) ReadBool() bool {
 	var i bool
-	read(b.b, &i)
+	if b.bw != nil {
+		readBool(b.bw, &i)
+		return i
+	}
+
+	readBuffBool(b.b, &i)
 	return i
 }
 
 // ReadInt reads an int value from the buffer.
 func (b *Buffer) ReadInt() int {
-	var i int32
-	read(b.b, &i)
-	return int(i)
+	var i int
+	if b.bw != nil {
+		readInt(b.bw, &i)
+		return i
+	}
+
+	readBuffInt(b.b, &i)
+	return i
 }
 
 // ReadInt8 reads an int8 value from the buffer.
 func (b *Buffer) ReadInt8() int8 {
 	var i int8
-	read(b.b, &i)
+	if b.bw != nil {
+		readInt8(b.bw, &i)
+		return i
+	}
+
+	readBuffInt8(b.b, &i)
 	return i
 }
 
 // ReadInt16 reads an int16 value from the buffer.
 func (b *Buffer) ReadInt16() int16 {
 	var i int16
-	read(b.b, &i)
+	if b.bw != nil {
+		readInt16(b.bw, &i)
+		return i
+	}
+
+	readBuffInt16(b.b, &i)
 	return i
 }
 
 // ReadInt32 reads an int32 value from the buffer.
 func (b *Buffer) ReadInt32() int32 {
 	var i int32
-	read(b.b, &i)
+	if b.bw != nil {
+		readInt32(b.bw, &i)
+		return i
+	}
+
+	readBuffInt32(b.b, &i)
 	return i
 }
 
 // ReadInt64 reads an int64 value from the buffer.
 func (b *Buffer) ReadInt64() int64 {
 	var i int64
-	read(b.b, &i)
+	if b.bw != nil {
+		readInt64(b.bw, &i)
+		return i
+	}
+
+	readBuffInt64(b.b, &i)
 	return i
 }
 
 // ReadUInt reads a uint value from the buffer.
 func (b *Buffer) ReadUInt() uint {
 	var i uint
-	read(b.b, &i)
+	if b.bw != nil {
+		readUint(b.bw, &i)
+		return i
+	}
+
+	readBuffUint(b.b, &i)
 	return i
 }
 
 // ReadUInt8 reads a uint8 value from the buffer.
 func (b *Buffer) ReadUInt8() uint8 {
 	var i uint8
-	read(b.b, &i)
+	if b.bw != nil {
+		readUint8(b.bw, &i)
+		return i
+	}
+
+	readBuffUint8(b.b, &i)
 	return i
 }
 
 // ReadUInt16 reads a uint16 value from the buffer.
 func (b *Buffer) ReadUInt16() uint16 {
 	var i uint16
-	read(b.b, &i)
+	if b.bw != nil {
+		readUint16(b.bw, &i)
+		return i
+	}
+
+	readBuffUint16(b.b, &i)
 	return i
 }
 
 // ReadUInt32 reads a uint32 value from the buffer.
 func (b *Buffer) ReadUInt32() uint32 {
 	var i uint32
-	read(b.b, &i)
+	if b.bw != nil {
+		readUint32(b.bw, &i)
+		return i
+	}
+
+	readBuffUint32(b.b, &i)
 	return i
 }
 
 // ReadUInt64 reads a uint64 value from the buffer.
 func (b *Buffer) ReadUInt64() uint64 {
 	var i uint64
-	read(b.b, &i)
+	if b.bw != nil {
+		readUint64(b.bw, &i)
+		return i
+	}
+
+	readBuffUint64(b.b, &i)
 	return i
 }
 
 // ReadFloat32 reads a float32 value from the buffer.
 func (b *Buffer) ReadFloat32() float32 {
 	var i float32
-	read(b.b, &i)
+	if b.bw != nil {
+		readFloat32(b.bw, &i)
+		return i
+	}
+
+	readBuffFloat32(b.b, &i)
 	return i
 }
 
 // ReadFloat64 reads a float64 value from the buffer.
 func (b *Buffer) ReadFloat64() float64 {
 	var i float64
-	read(b.b, &i)
+	if b.bw != nil {
+		readFloat64(b.bw, &i)
+		return i
+	}
+
+	readBuffFloat64(b.b, &i)
 	return i
 }
 
@@ -222,172 +346,67 @@ func (b *Buffer) ReadFloat64() float64 {
 // then uses the length to extract the exact length []byte representing the string.
 func (b *Buffer) ReadString() string {
 	var l uint16
-	read(b.b, &l)
-	return bytesToString(b.b.Next(int(l)))
-}
+	if b.bw != nil {
+		readUint16(b.bw, &l)
+		return bytesToString(b.bw.Next(int(l)))
+	}
 
-// ReadBytes reads the specified length from the buffer and returns the byte slice.
-func (b *Buffer) ReadBytes(length int) []byte {
-	return b.b.Next(length)
-}
+	readBuffUint16(b.b, &l)
 
-// Bytes returns the unread portion of the underlying buffer storage.
-func (b *Buffer) Bytes() []byte {
-	return b.b.Bytes()
-}
-
-// Read reads structured binary data from r into data.
-func read(r *bytes.Buffer, data interface{}) error {
-	order := binary.LittleEndian
-
-	var b [8]byte
-	if n := intDataSize(data); n != 0 {
-		bs := b[:n]
-
-		if _, err := readFull(r, bs); err != nil {
-			return err
-		}
-
-		switch data := data.(type) {
-		case *bool:
-			*data = bs[0] != 0
-		case *int8:
-			*data = int8(bs[0])
-		case *uint8:
-			*data = bs[0]
-		case *int16:
-			*data = int16(order.Uint16(bs))
-		case *uint16:
-			*data = order.Uint16(bs)
-		case *int32:
-			*data = int32(order.Uint32(bs))
-		case *uint32:
-			*data = order.Uint32(bs)
-		case *int64:
-			*data = int64(order.Uint64(bs))
-		case *uint64:
-			*data = order.Uint64(bs)
-		case *float32:
-			*data = math.Float32frombits(order.Uint32(bs))
-		case *float64:
-			*data = math.Float64frombits(order.Uint64(bs))
-		default:
-			n = 0 // fast path doesn't apply
-		}
-
-		if n != 0 {
-			return nil
-		}
+	bytes := bytePool.Get(int(l))
+	defer bytePool.Put(bytes)
+
+	_, err := readBuffFull(b.b, bytes)
+	if err != nil {
+		return ""
 	}
 
-	return NonPrimitiveTypeError
+	return bytesToString(bytes)
 }
 
-// read full is a bytes.Buffer specific implementation of ioutil.ReadFull() which
-// avoids escaping our stack allocated scratch bytes
-func readFull(r *bytes.Buffer, buf []byte) (n int, err error) {
-	min := len(buf)
-	for n < min && err == nil {
-		var nn int
-		nn, err = r.Read(buf[n:])
-		n += nn
-	}
-	if n >= min {
-		err = nil
-	} else if n > 0 && err == io.EOF {
-		err = io.ErrUnexpectedEOF
-	}
-	return
-}
-
-// Write writes the binary representation of data into w.
-func write(w *bytes.Buffer, data interface{}) error {
-	order := binary.LittleEndian
-
-	var b [8]byte
-	if n := intDataSize(data); n != 0 {
-		bs := b[:n]
-
-		switch v := data.(type) {
-		case *bool:
-			if *v {
-				bs[0] = 1
-			} else {
-				bs[0] = 0
-			}
-		case bool:
-			if v {
-				bs[0] = 1
-			} else {
-				bs[0] = 0
-			}
-		case *int8:
-			bs[0] = byte(*v)
-		case int8:
-			bs[0] = byte(v)
-		case *uint8:
-			bs[0] = *v
-		case uint8:
-			bs[0] = v
-		case *int16:
-			order.PutUint16(bs, uint16(*v))
-		case int16:
-			order.PutUint16(bs, uint16(v))
-		case *uint16:
-			order.PutUint16(bs, *v)
-		case uint16:
-			order.PutUint16(bs, v)
-		case *int32:
-			order.PutUint32(bs, uint32(*v))
-		case int32:
-			order.PutUint32(bs, uint32(v))
-		case *uint32:
-			order.PutUint32(bs, *v)
-		case uint32:
-			order.PutUint32(bs, v)
-		case *int64:
-			order.PutUint64(bs, uint64(*v))
-		case int64:
-			order.PutUint64(bs, uint64(v))
-		case *uint64:
-			order.PutUint64(bs, *v)
-		case uint64:
-			order.PutUint64(bs, v)
-		case *float32:
-			order.PutUint32(bs, math.Float32bits(*v))
-		case float32:
-			order.PutUint32(bs, math.Float32bits(v))
-		case *float64:
-			order.PutUint64(bs, math.Float64bits(*v))
-		case float64:
-			order.PutUint64(bs, math.Float64bits(v))
-		}
-
-		_, err := w.Write(bs)
-		return err
+// ReadBytes reads the specified length from the buffer and returns the byte slice.
+func (b *Buffer) ReadBytes(length int) []byte {
+	if b.bw != nil {
+		return b.bw.Next(length)
 	}
 
-	return NonPrimitiveTypeError
-}
-
-// intDataSize returns the size of the data required to represent the data when encoded.
-// It returns zero if the type cannot be implemented by the fast path in Read or Write.
-func intDataSize(data interface{}) int {
-	switch data.(type) {
-	case bool, int8, uint8, *bool, *int8, *uint8:
-		return 1
-	case int16, uint16, *int16, *uint16:
-		return 2
-	case int32, uint32, *int32, *uint32:
-		return 4
-	case int64, uint64, *int64, *uint64:
-		return 8
-	case float32, *float32:
-		return 4
-	case float64, *float64:
-		return 8
+	bytes := make([]byte, length)
+	_, err := readBuffFull(b.b, bytes)
+	if err != nil {
+		return bytes
 	}
-	return 0
+
+	return bytes
+}
+
+// bytesAsString converts a []byte into a string in place. Note that you should use this helper
+// when the []byte slice contains _only_ the string data and isn't part of a larger underlying array.
+// For example, a case where you should *not* use this helper:
+//
+//	func parseString(buffer *bytes.Buffer, length int) string {
+//	  bytes := buffer.Next(length)   // this extracts a sub-slice of the underlying byte array from pos->pos+length
+//
+//	  return bytesAsString(bytes)
+//	}
+//
+// Now both the []byte AND the value string are linked and neither can be GC'd until the other one is GC'd.
+// This is especially problematic if you drop the references to the byte array, as you're effectively requiring
+// 1024 bytes for an 11-byte string.
+//
+// An example where it _is_ ok, and recommended to drop the underlying []byte reference is the following:
+//
+//	func parseString(reader io.Reader, length int) string {
+//	  bytes := make([]byte, length)
+//	  io.ReadFull(reader, bytes)
+//
+//	  return bytesAsString(bytes)
+//	}
+//
+// In this case, we've create a byte array just big enough for the string, we extract the string data from the reader
+// and then cast the byte array in place to the string, and finally drop the byte array reference. This omits an additional
+// allocation if you were to use string(bytes)
+func bytesAsString(b []byte) string {
+	return unsafe.String(unsafe.SliceData(b), len(b))
 }
 
 // Conversion from byte slice to string
@@ -401,7 +420,7 @@ func bytesToString(b []byte) string {
 	// cached string. If it does _not_ exist, then we use the passed func() string to allocate a new
 	// string and cache it. This will prevent us from allocating throw-away strings just to
 	// check our cache.
-	pinned := unsafe.String(unsafe.SliceData(b), len(b))
+	pinned := bytesAsString(b)
 
 	return stringutil.BankFunc(pinned, func() string {
 		return string(b)

+ 90 - 8
core/pkg/util/buffer_test.go

@@ -2,8 +2,9 @@ package util
 
 import (
 	"bytes"
+	"io"
 	"math"
-	"math/rand"
+	"math/rand/v2"
 	"runtime"
 	"strings"
 	"testing"
@@ -182,8 +183,8 @@ func TestBufferWriteReadInt64(t *testing.T) {
 func TestBufferBytes(t *testing.T) {
 	buf := NewBuffer()
 
-	buf.WriteInt(42)
-	buf.WriteFloat64(3.14)
+	buf.WriteInt(-42)
+	buf.WriteFloat64(-3.14)
 
 	unreadBytes := buf.Bytes()
 
@@ -192,11 +193,11 @@ func TestBufferBytes(t *testing.T) {
 	intVal := newBuf.ReadInt()
 	floatVal := newBuf.ReadFloat64()
 
-	if intVal != 42 {
-		t.Errorf("Expected int value to be 42, got %v", intVal)
+	if intVal != -42 {
+		t.Errorf("Expected int value to be -42, got %v", intVal)
 	}
-	if floatVal != 3.14 {
-		t.Errorf("Expected float value to be 3.14, got %v", floatVal)
+	if floatVal != -3.14 {
+		t.Errorf("Expected float value to be -3.14, got %v", floatVal)
 	}
 }
 
@@ -224,7 +225,7 @@ const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
 func generateRandomString(ln int) string {
 	b := make([]byte, ln)
 	for i := range b {
-		b[i] = letters[rand.Intn(len(letters))]
+		b[i] = letters[rand.IntN(len(letters))]
 	}
 	return string(b)
 }
@@ -283,6 +284,87 @@ func TestStringBytes(t *testing.T) {
 	}
 }
 
+type randomByteReader struct {
+	bytes []byte
+	pos   int
+}
+
+func newRandomByteReader(bytes []byte) *randomByteReader {
+	return &randomByteReader{
+		bytes: bytes,
+		pos:   0,
+	}
+}
+
+// reads a random number of bytes from 1-4 each time Read is called.
+// simulates partial buffered reads
+func (sbr *randomByteReader) Read(b []byte) (int, error) {
+	if sbr.pos >= len(sbr.bytes) {
+		return 0, io.EOF
+	}
+
+	toCopy := rand.IntN(4) + 1
+	if toCopy > len(b) {
+		toCopy = len(b)
+	}
+
+	var err error
+	remaining := len(sbr.bytes) - sbr.pos
+	if toCopy > remaining {
+		err = io.EOF
+		toCopy = remaining
+	}
+
+	bytesCopied := copy(b, sbr.bytes[sbr.pos:sbr.pos+toCopy])
+	sbr.pos += bytesCopied
+
+	return bytesCopied, err
+}
+
+func TestBufferReaderSupport(t *testing.T) {
+	buf := NewBuffer()
+	buf.WriteBool(true)
+	buf.WriteInt(42)
+	buf.WriteFloat64(3.14)
+	buf.WriteString("Testing, 1, 2, 3!")
+	buf.WriteUInt64(uint64(123456))
+	buf.WriteInt16(44)
+	buf.WriteFloat32(float32(5.0))
+
+	reader := newRandomByteReader(buf.Bytes())
+	readerBuff := NewBufferFromReader(reader)
+
+	b := readerBuff.ReadBool()
+	i := readerBuff.ReadInt()
+	f := readerBuff.ReadFloat64()
+	s := readerBuff.ReadString()
+	ui64 := readerBuff.ReadUInt64()
+	i16 := readerBuff.ReadInt16()
+	f32 := readerBuff.ReadFloat32()
+
+	if !b {
+		t.Errorf("expected true, got: false")
+	}
+	if i != 42 {
+		t.Errorf("expected 42, got: %d", i)
+	}
+	if f != 3.14 {
+		t.Errorf("expected 3.14, got: %f", f)
+	}
+	if s != "Testing, 1, 2, 3!" {
+		t.Errorf("expected 'Testing, 1, 2, 3!', got: '%s'", s)
+	}
+	if ui64 != uint64(123456) {
+		t.Errorf("expected 123456, got: %d", ui64)
+	}
+	if i16 != int16(44) {
+		t.Errorf("expected 44, got: %d", i16)
+	}
+	if f32 != float32(5.0) {
+		t.Errorf("expected 5.0, got: %f", f32)
+	}
+}
+
 func TestTooLargeStringTruncate(t *testing.T) {
 	normalStr := generateRandomString(100)
 	bigStr := generateRandomString(math.MaxUint16 + (math.MaxUint16 / 2))

+ 493 - 0
core/pkg/util/bufferhelper.go

@@ -0,0 +1,493 @@
+package util
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/binary"
+	"io"
+	"math"
+)
+
+func readBool(r *bytes.Buffer, data *bool) error {
+	b, err := r.ReadByte()
+	if err != nil {
+		return err
+	}
+
+	*data = b != 0
+	return nil
+}
+
+func readInt8(r *bytes.Buffer, data *int8) error {
+	b, err := r.ReadByte()
+	if err != nil {
+		return err
+	}
+
+	*data = int8(b)
+	return nil
+}
+
+func readUint8(r *bytes.Buffer, data *uint8) error {
+	b, err := r.ReadByte()
+	if err != nil {
+		return err
+	}
+
+	*data = uint8(b)
+	return nil
+}
+
+func readInt16(r *bytes.Buffer, data *int16) error {
+	order := binary.LittleEndian
+	var b [2]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int16(order.Uint16(bs))
+	return nil
+}
+
+func readUint16(r *bytes.Buffer, data *uint16) error {
+	order := binary.LittleEndian
+	var b [2]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = order.Uint16(bs)
+	return nil
+}
+
+func readInt(r *bytes.Buffer, data *int) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int(int32(order.Uint32(bs)))
+	return nil
+}
+
+func readInt32(r *bytes.Buffer, data *int32) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int32(order.Uint32(bs))
+	return nil
+}
+
+func readUint(r *bytes.Buffer, data *uint) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = uint(order.Uint32(bs))
+	return nil
+}
+
+func readUint32(r *bytes.Buffer, data *uint32) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = order.Uint32(bs)
+	return nil
+}
+
+func readInt64(r *bytes.Buffer, data *int64) error {
+	order := binary.LittleEndian
+	var b [8]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int64(order.Uint64(bs))
+	return nil
+}
+
+func readUint64(r *bytes.Buffer, data *uint64) error {
+	order := binary.LittleEndian
+	var b [8]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = order.Uint64(bs)
+	return nil
+}
+
+func readFloat32(r *bytes.Buffer, data *float32) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = math.Float32frombits(order.Uint32(bs))
+	return nil
+}
+
+func readFloat64(r *bytes.Buffer, data *float64) error {
+	order := binary.LittleEndian
+	var b [8]byte
+
+	bs := b[:]
+	_, err := readFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = math.Float64frombits(order.Uint64(bs))
+	return nil
+}
+
+func readBuffBool(r *bufio.Reader, data *bool) error {
+	b, err := r.ReadByte()
+	if err != nil {
+		return err
+	}
+
+	*data = b != 0
+	return nil
+}
+
+func readBuffInt8(r *bufio.Reader, data *int8) error {
+	b, err := r.ReadByte()
+	if err != nil {
+		return err
+	}
+
+	*data = int8(b)
+	return nil
+}
+
+func readBuffUint8(r *bufio.Reader, data *uint8) error {
+	b, err := r.ReadByte()
+	if err != nil {
+		return err
+	}
+
+	*data = uint8(b)
+	return nil
+}
+
+func readBuffInt16(r *bufio.Reader, data *int16) error {
+	order := binary.LittleEndian
+	var b [2]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int16(order.Uint16(bs))
+	return nil
+}
+
+func readBuffUint16(r *bufio.Reader, data *uint16) error {
+	order := binary.LittleEndian
+	var b [2]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = order.Uint16(bs)
+	return nil
+}
+
+func readBuffInt(r *bufio.Reader, data *int) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int(int32(order.Uint32(bs)))
+	return nil
+}
+
+func readBuffInt32(r *bufio.Reader, data *int32) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int32(order.Uint32(bs))
+	return nil
+}
+
+func readBuffUint(r *bufio.Reader, data *uint) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = uint(order.Uint32(bs))
+	return nil
+}
+
+func readBuffUint32(r *bufio.Reader, data *uint32) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = order.Uint32(bs)
+	return nil
+}
+
+func readBuffInt64(r *bufio.Reader, data *int64) error {
+	order := binary.LittleEndian
+	var b [8]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = int64(order.Uint64(bs))
+	return nil
+}
+
+func readBuffUint64(r *bufio.Reader, data *uint64) error {
+	order := binary.LittleEndian
+	var b [8]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = order.Uint64(bs)
+	return nil
+}
+
+func readBuffFloat32(r *bufio.Reader, data *float32) error {
+	order := binary.LittleEndian
+	var b [4]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = math.Float32frombits(order.Uint32(bs))
+	return nil
+}
+
+func readBuffFloat64(r *bufio.Reader, data *float64) error {
+	order := binary.LittleEndian
+	var b [8]byte
+
+	bs := b[:]
+	_, err := readBuffFull(r, bs)
+	if err != nil {
+		return err
+	}
+
+	*data = math.Float64frombits(order.Uint64(bs))
+	return nil
+}
+
+// read full is a bufio.Reader specific implementation of io.ReadFull() which
+// avoids escaping our stack allocated scratch bytes
+func readBuffFull(r *bufio.Reader, buf []byte) (n int, err error) {
+	min := len(buf)
+	for n < min && err == nil {
+		var nn int
+		nn, err = r.Read(buf[n:])
+		n += nn
+	}
+	if n >= min {
+		err = nil
+	} else if n > 0 && err == io.EOF {
+		err = io.ErrUnexpectedEOF
+	}
+	return
+}
+
+// read full is a bytes.Buffer specific implementation of io.ReadFull() which
+// avoids escaping our stack allocated scratch bytes
+func readFull(r *bytes.Buffer, buf []byte) (n int, err error) {
+	min := len(buf)
+	for n < min && err == nil {
+		var nn int
+		nn, err = r.Read(buf[n:])
+		n += nn
+	}
+	if n >= min {
+		err = nil
+	} else if n > 0 && err == io.EOF {
+		err = io.ErrUnexpectedEOF
+	}
+	return
+}
+
+func writeBool(w *bytes.Buffer, data bool) error {
+	if data {
+		w.WriteByte(1)
+		return nil
+	}
+
+	w.WriteByte(0)
+	return nil
+}
+
+func writeInt8(w *bytes.Buffer, data int8) error {
+	w.WriteByte(byte(data))
+	return nil
+}
+
+func writeUint8(w *bytes.Buffer, data uint8) error {
+	w.WriteByte(byte(data))
+	return nil
+}
+
+func writeInt16(w *bytes.Buffer, data int16) error {
+	var b [2]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint16(bs, uint16(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeUint16(w *bytes.Buffer, data uint16) error {
+	var b [2]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint16(bs, data)
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeInt32(w *bytes.Buffer, data int32) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, uint32(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeUint32(w *bytes.Buffer, data uint32) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, data)
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeInt(w *bytes.Buffer, data int) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, uint32(int32(data)))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeUint(w *bytes.Buffer, data uint) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, uint32(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeInt64(w *bytes.Buffer, data int64) error {
+	var b [8]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint64(bs, uint64(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeUint64(w *bytes.Buffer, data uint64) error {
+	var b [8]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint64(bs, data)
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeFloat32(w *bytes.Buffer, data float32) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, math.Float32bits(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeFloat64(w *bytes.Buffer, data float64) error {
+	var b [8]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint64(bs, math.Float64bits(data))
+	_, err := w.Write(bs)
+	return err
+}

+ 68 - 0
core/pkg/util/bufferpool.go

@@ -0,0 +1,68 @@
+package util
+
+import (
+	"math"
+	"math/bits"
+	"sync"
+)
+
+// bufferPool holds "tiered" []byte `sync.Pool` instances by capacity up to math.MaxUint16
+type bufferPool struct {
+	pools [17]sync.Pool
+}
+
+func newBufferPool() *bufferPool {
+	bp := new(bufferPool)
+
+	for i := 0; i < 17; i++ {
+		length := 1 << i
+		bp.pools[i].New = func() any {
+			return make([]byte, length)
+		}
+	}
+	return bp
+}
+
+// poolIndex returns the pool index for a buffer of the given size.
+func poolIndex(length int) int {
+	return bits.Len32(uint32(length - 1))
+}
+
+// putIndex returns the pool index for returning a buffer with the given capacity.
+// It is the inverse of poolIndex: given a capacity that was originally handed out
+// by Get, it finds the pool that owns it.
+//
+// Because Get always returns buffers with capacity 1<<i, the capacity here will
+// always be a power of two. bits.Len32(1<<i) = i+1, so we subtract 1 to recover i.
+func putIndex(capacity int) int {
+	return bits.Len32(uint32(capacity)) - 1
+}
+
+func isPowerOfTwo(capacity int) bool {
+	return capacity&(capacity-1) == 0
+}
+
+func (bp *bufferPool) Get(length int) []byte {
+	if length <= 0 {
+		return nil
+	}
+
+	// Beyond our pool range: allocate directly
+	if length > math.MaxUint16 {
+		return make([]byte, length)
+	}
+
+	i := poolIndex(length)
+	buf := bp.pools[i].Get().([]byte)
+	return buf[:length]
+}
+
+func (bp *bufferPool) Put(buf []byte) {
+	capacity := cap(buf)
+	if capacity == 0 || capacity > math.MaxUint16 || !isPowerOfTwo(capacity) {
+		return
+	}
+
+	i := putIndex(capacity)
+	bp.pools[i].Put(buf[:cap(buf)])
+}

+ 311 - 0
core/pkg/util/bufferpool_test.go

@@ -0,0 +1,311 @@
+package util
+
+import (
+	"math"
+	"math/bits"
+	"sync"
+	"testing"
+)
+
+// --- poolIndex / putIndex unit tests ---
+
+func TestPoolIndex(t *testing.T) {
+	cases := []struct {
+		length int
+		want   int
+	}{
+		{1, 0},
+		{2, 1},
+		{3, 2},
+		{4, 2},
+		{5, 3},
+		{7, 3},
+		{8, 3},
+		{255, 8},
+		{256, 8},
+		{1023, 10},
+		{1024, 10},
+		{math.MaxUint16 - 50, 16},
+	}
+	for _, c := range cases {
+		got := poolIndex(c.length)
+		if got != c.want {
+			t.Errorf("poolIndex(%d) = %d, want %d", c.length, got, c.want)
+		}
+	}
+}
+
+func TestAllocMinusOne(t *testing.T) {
+	bp := newBufferPool()
+	for i := 1; i <= 16; i++ {
+		capacity := 1 << i
+		length := capacity - 1
+		if length <= 0 {
+			continue
+		}
+
+		b := bp.Get(length)
+		c := cap(b)
+
+		pIndex := poolIndex(length)
+		rIndex := putIndex(c)
+
+		if pIndex != rIndex {
+			t.Errorf("pIndex: %d != rIndex: %d\n", pIndex, rIndex)
+		}
+
+	}
+}
+
+func TestPutIndex(t *testing.T) {
+	// putIndex must be the inverse of poolIndex for all power-of-two capacities
+	// that Get hands out.
+	for i := 1; i <= 16; i++ {
+		cap := 1 << i
+		got := putIndex(cap)
+		if got != i {
+			t.Errorf("putIndex(1<<%d = %d) = %d, want %d", i, cap, got, i)
+		}
+	}
+}
+
+func TestPoolIndexPutIndexRoundTrip(t *testing.T) {
+	// For any requested length, the buffer Get returns has capacity 1<<poolIndex(length).
+	// Confirm that putIndex maps that capacity back to the same pool slot.
+	for length := 1; length <= math.MaxUint16; length++ {
+		i := poolIndex(length)
+		capacity := 1 << i
+		j := putIndex(capacity)
+		if i != j {
+			t.Errorf("length=%d: poolIndex=%d, capacity=1<<%d=%d, putIndex=%d — round-trip broken",
+				length, i, i, capacity, j)
+		}
+	}
+}
+
+// --- Get ---
+
+func TestGetNilOnZeroOrNegative(t *testing.T) {
+	bp := newBufferPool()
+	for _, n := range []int{0, -1, -100} {
+		if got := bp.Get(n); got != nil {
+			t.Errorf("Get(%d) = %v, want nil", n, got)
+		}
+	}
+}
+
+func TestGetLengthIsExact(t *testing.T) {
+	bp := newBufferPool()
+	for _, n := range []int{1, 2, 3, 7, 8, 100, 1000, 65535, 65536} {
+		buf := bp.Get(n)
+		if len(buf) != n {
+			t.Errorf("Get(%d): len = %d, want %d", n, len(buf), n)
+		}
+	}
+}
+
+func TestGetCapacityIsPowerOfTwo(t *testing.T) {
+	bp := newBufferPool()
+	for _, n := range []int{1, 2, 3, 4, 5, 100, 1000, 550, math.MaxUint16 - 100, math.MaxUint16} {
+		buf := bp.Get(n)
+		c := cap(buf)
+		if c == 0 || !isPowerOfTwo(c) {
+			t.Errorf("Get(%d): cap = %d, not a power of two", n, c)
+		}
+	}
+}
+
+func TestGetCapacityIsSmallestFittingPowerOfTwo(t *testing.T) {
+	bp := newBufferPool()
+	cases := []struct {
+		n       int
+		wantCap int
+	}{
+		{1, 1},
+		{2, 2},
+		{3, 4},
+		{4, 4},
+		{5, 8},
+		{8, 8},
+		{9, 16},
+		{255, 256},
+		{256, 256},
+		{1024, 1024},
+	}
+	for _, c := range cases {
+		buf := bp.Get(c.n)
+		if cap(buf) != c.wantCap {
+			t.Errorf("Get(%d): cap = %d, want %d", c.n, cap(buf), c.wantCap)
+		}
+	}
+}
+
+func TestGetOversizeFallback(t *testing.T) {
+	bp := newBufferPool()
+	n := math.MaxUint16 + 1
+	buf := bp.Get(n)
+	if len(buf) != n {
+		t.Errorf("Get(MaxUint16+1): len = %d, want %d", len(buf), n)
+	}
+}
+
+// --- Put ---
+
+func TestPutDropsZeroCapBuffer(t *testing.T) {
+	// Put on a nil or zero-cap slice must not panic.
+	bp := newBufferPool()
+	bp.Put(nil)
+	bp.Put([]byte{})
+}
+
+// --- Get / Put round-trip ---
+
+func TestGetPutSamePool(t *testing.T) {
+	// A buffer returned via Put must land in the same pool that Get draws from,
+	// so the very next Get (with the same length) should reuse it.
+	bp := newBufferPool()
+
+	buf := bp.Get(100)
+	ptr := &buf[0]
+	bp.Put(buf)
+
+	buf2 := bp.Get(100)
+	if &buf2[0] != ptr {
+		// sync.Pool may have GC'd the entry; this is not a hard failure but
+		// we at minimum require length and capacity to be correct.
+		if len(buf2) != 100 {
+			t.Errorf("Get(100) after Put: len = %d, want 100", len(buf2))
+		}
+	}
+}
+
+func TestPutRestoresFullCapacity(t *testing.T) {
+	// After Put, the pooled slice should have full capacity, not the resliced length.
+	// We verify this by inspecting what comes out of the pool on the next Get.
+	bp := newBufferPool()
+
+	buf := bp.Get(10)  // len=10, cap=16
+	bp.Put(buf)        // must put back with cap=16
+	buf2 := bp.Get(15) // asks for 15 — still fits in cap=16 pool
+	if cap(buf2) < 15 {
+		t.Errorf("After Put(cap=16), Get(15): cap = %d, too small", cap(buf2))
+	}
+}
+
+func TestIsPowerOfTwo(t *testing.T) {
+	for i := 0; i < 16; i++ {
+		cap := 1 << i
+
+		if !isPowerOfTwo(cap) {
+			t.Fatalf("Failed at: i=%d, cap=%d\n", i, cap)
+		}
+	}
+
+	for _, v := range []int{5, 17, 19, 31, 55} {
+		if isPowerOfTwo(v) {
+			t.Fatalf("Unexpected isPowerOfTwo: %d", v)
+		}
+	}
+}
+
+func TestPutNonPowerOfTwoCapIsDiscarded(t *testing.T) {
+	// Buffers with non-power-of-two capacities (e.g. from outside the pool)
+	// get silently dropped. Confirm no panic and pool still works after.
+	bp := newBufferPool()
+	value := make([]byte, 0, 17)
+	bp.Put(value)
+
+	buf := bp.Get(24)
+	if len(buf) != 24 {
+		t.Errorf("Get(24) after spurious Put: len = %d, want 24", len(buf))
+	}
+	if cap(buf) != 32 {
+		t.Errorf("Get(24) after spurious Put: cap = %d, want 32", cap(buf))
+	}
+}
+
+// --- Concurrency ---
+
+func TestConcurrentGetPut(t *testing.T) {
+	bp := newBufferPool()
+	var wg sync.WaitGroup
+	const goroutines = 64
+	const iters = 1000
+
+	for g := 0; g < goroutines; g++ {
+		wg.Add(1)
+		go func(id int) {
+			defer wg.Done()
+			for i := 0; i < iters; i++ {
+				n := (id*iters + i) % 4096
+				if n == 0 {
+					n = 1
+				}
+				buf := bp.Get(n)
+				if len(buf) != n {
+					t.Errorf("concurrent Get(%d): len = %d", n, len(buf))
+				}
+				// Write to every byte to catch races under -race.
+				for j := range buf {
+					buf[j] = byte(j)
+				}
+				bp.Put(buf)
+			}
+		}(g)
+	}
+	wg.Wait()
+}
+
+// --- Edge cases at pool boundaries ---
+
+func TestGetExactPowerOfTwo(t *testing.T) {
+	// Exact powers of two are the boundary between two pools; confirm correct
+	// bucket selection and full round-trip for each.
+	bp := newBufferPool()
+	for i := 0; i < 17; i++ {
+		n := 1 << i
+		buf := bp.Get(n)
+		if len(buf) != n {
+			t.Errorf("Get(1<<%d=%d): len = %d", i, n, len(buf))
+		}
+		expectedCap := 1 << (bits.Len16(uint16(n - 1)))
+		if cap(buf) != expectedCap {
+			t.Errorf("Get(1<<%d=%d): cap = %d, want %d", i, n, cap(buf), expectedCap)
+		}
+		bp.Put(buf)
+	}
+}
+
+func TestGetMaxInt16(t *testing.T) {
+	i := poolIndex(math.MaxUint16)
+	if i >= 17 {
+		t.Errorf("poolIndex(MaxUint16) = %d, overflows pool array", i)
+	}
+}
+
+// --- Benchmarks ---
+
+func BenchmarkGetPut(b *testing.B) {
+	sizes := []int{64, 512, 4096, 65535}
+	for _, size := range sizes {
+		bp := newBufferPool()
+		b.Run("", func(b *testing.B) {
+			b.ReportAllocs()
+			for i := 0; i < b.N; i++ {
+				buf := bp.Get(size)
+				bp.Put(buf)
+			}
+		})
+	}
+}
+
+func BenchmarkGetPutParallel(b *testing.B) {
+	bp := newBufferPool()
+	b.ReportAllocs()
+	b.RunParallel(func(pb *testing.PB) {
+		for pb.Next() {
+			buf := bp.Get(4096)
+			bp.Put(buf)
+		}
+	})
+}

+ 310 - 0
core/pkg/util/monitor/memory/helpers.go

@@ -0,0 +1,310 @@
+package memory
+
+import (
+	"fmt"
+	"math"
+	"slices"
+)
+
+//--------------------------------------------------------------------------
+//  Helper Types
+//--------------------------------------------------------------------------
+
+// trackedValue maintains the state of an uninitialized value, a set value, and
+// the previous value. The previous value is always used when the current value
+// is unset. All values that are set become previous values when the current value
+// changes, or Reset() is called.
+type trackedValue struct {
+	current  *float64
+	previous float64
+}
+
+// newTrackedValue returns a new trackedValue instance for tracking unset, set, and previous
+// values for a float64.
+func newTrackedValue() *trackedValue {
+	return new(trackedValue)
+}
+
+// Value returns the current value if it has not been reset. Otherwise, it returns
+// the previous value
+func (tv *trackedValue) Value() float64 {
+	if tv.current == nil {
+		return tv.previous
+	}
+
+	return *tv.current
+}
+
+// IsSet returns `true` if the current value has been set.
+func (tv *trackedValue) IsSet() bool {
+	return tv.current != nil
+}
+
+// Set updates the current value if it is different. If the value is updated, `true` is returned.
+// Otherwise, `false` is returned.
+func (tv *trackedValue) Set(value float64) bool {
+	if tv.current == nil {
+		tv.current = &value
+		return true
+	}
+
+	curr := *tv.current
+	if value != curr {
+		tv.current = &value
+		tv.previous = curr
+		return true
+	}
+
+	return false
+}
+
+// Reset resets the current value to unset, moving it to the previous value if set.
+func (tv *trackedValue) Reset() {
+	if tv.current == nil {
+		return
+	}
+
+	tv.previous = *tv.current
+	tv.current = nil
+}
+
+// Clear resets the value and sets the previous to 0.
+func (tv *trackedValue) Clear() {
+	tv.current = nil
+	tv.previous = 0.0
+}
+
+// The Cumulative Sum (CUSUM) is a statistical process control tool that plots
+// the cumulative sums of deviations from a target mean to detect small, persistent
+// shifts (0.5 to 2 sigma) in process performance quickly.
+type cumulativeSum struct {
+	slack float64
+	sum   float64
+	base  float64
+}
+
+// newCumulativeSum creates a new cumulativeSum instance with the provided slack
+func newCumulativeSum(slack float64) *cumulativeSum {
+	return &cumulativeSum{
+		slack: slack,
+		sum:   0.0,
+		base:  0.0,
+	}
+}
+
+// Calibrate initializes the baseline for the CUSUM. This is generally the mean
+// of the samples once there are enough to consider the sample set as "stable."
+func (cs *cumulativeSum) Calibrate(mean float64) {
+	if cs.base != 0.0 {
+		return
+	}
+
+	cs.base = mean
+}
+
+// Update supplies a new sample to update the internal sum.
+func (cs *cumulativeSum) Update(value float64) {
+	if cs.base == 0.0 {
+		return
+	}
+
+	slack := cs.base * cs.slack
+	cs.sum = max(0, cs.sum+(value-cs.base)-slack)
+}
+
+// Sum returns the current CUSUM value.
+func (cs *cumulativeSum) Sum() float64 {
+	return cs.sum
+}
+
+// IsRecalibrationRequired tests the current CUSUM against the base * thresholdMagnitude.
+// If it has surpassed the magnitude provided, true is returned signalling a recalibration
+// should be performed.
+func (cs *cumulativeSum) IsRecalibrationRequired(thresholdMagnitude float64) bool {
+	if cs.base == 0.0 {
+		return false
+	}
+
+	threshold := cs.base * thresholdMagnitude
+	//fmt.Printf("Testing: %f > %f = %t\n", cs.sum, threshold, cs.sum > threshold)
+	return cs.sum > threshold
+}
+
+func (cs *cumulativeSum) Reset() {
+	cs.base = 0.0
+	cs.sum = 0.0
+}
+
+// exponentialMovingAverage is a helper type that tracks the current moving average
+// value using a providing smoothing factor.
+type exponentialMovingAverage struct {
+	smoothing float64
+	value     float64
+	set       bool
+}
+
+// creates a new exponential moving average instance using the provided smoothing factor
+func newExponentialMovingAverage(smoothing float64) *exponentialMovingAverage {
+	return &exponentialMovingAverage{
+		smoothing: smoothing,
+		set:       false,
+	}
+}
+
+// updates the moving average for the provided sample, and returns the updated
+// value
+func (ema *exponentialMovingAverage) Update(sample float64) float64 {
+	if !ema.set {
+		ema.set = true
+		ema.value = sample
+	} else {
+		ema.value = ema.smoothing*sample + (1.0-ema.smoothing)*ema.value
+	}
+	return ema.value
+}
+
+// The current moving average value
+func (ema *exponentialMovingAverage) Current() float64 {
+	return ema.value
+}
+
+// Resets the moving average calculation
+func (ema *exponentialMovingAverage) Reset() {
+	ema.set = false
+	ema.value = 0.0
+}
+
+// rollingWindow is a ring buffer helper type for tracking a set capacity number of
+// the most recent values. It also provides helper methods for calculating mean,
+// stddev, and percentiles of the contained data.
+type rollingWindow struct {
+	capacity int
+	length   int
+	window   []float64
+	index    int
+}
+
+// creates a new rolling window instance with the provided static capacity.
+func newRollingWindow(capacity int) *rollingWindow {
+	if capacity <= 0 || capacity > (math.MaxInt/2) {
+		panic(fmt.Sprintf("RollingWindow capacity limited to range 1-%d", math.MaxInt/2))
+	}
+
+	return &rollingWindow{
+		capacity: capacity,
+		window:   make([]float64, capacity),
+		index:    0,
+	}
+}
+
+// Pushes a new value into the rolling window, dropping the oldest value if
+// the total length surpasses the capacity.
+func (rw *rollingWindow) Push(value float64) {
+	// advance index, handle overflow
+	index := rw.index % rw.capacity
+	rw.window[index] = value
+
+	rw.index = (rw.index + 1) % rw.capacity
+	rw.length = min(rw.length+1, rw.capacity)
+}
+
+// Clears the rolling window values
+func (rw *rollingWindow) Clear() {
+	rw.window = make([]float64, rw.capacity)
+	rw.index = 0
+	rw.length = 0
+}
+
+// The length of the rolling window. Will never be greater that the `Cap()`.
+func (rw *rollingWindow) Len() int {
+	return rw.length
+}
+
+// Cap returns the maximum capacity of the rolling window.
+func (rw *rollingWindow) Cap() int {
+	return rw.capacity
+}
+
+// Each iterates all values within the rolling window and calls `f` passing each value.
+// NOTE: Ordering is _not_ guaranteed!
+func (rw *rollingWindow) Each(f func(float64)) {
+	total := rw.Len()
+	for i := range total {
+		f(rw.window[i])
+	}
+}
+
+// Mean returns the average of the values in the window
+func (rw *rollingWindow) Mean() float64 {
+	length := rw.Len()
+	if length == 0 {
+		return 0.0
+	}
+
+	sum := 0.0
+	for i := range length {
+		sum += rw.window[i]
+	}
+	return sum / float64(length)
+}
+
+// MeanStdDev computes the mean and standard deviation of the window values.
+func (rw *rollingWindow) MeanStdDev() (mean float64, stddev float64) {
+	mean = rw.Mean()
+
+	length := rw.Len()
+	if length < 2 {
+		return mean, 0
+	}
+
+	variance := 0.0
+	for i := range length {
+		d := rw.window[i] - mean
+		variance += d * d
+	}
+
+	// sample variance (Bessel's correction)
+	variance /= float64(length - 1)
+	stddev = math.Sqrt(variance)
+	return
+}
+
+// Percentile computes the p-th percentile of the values currently stored in the
+// rolling window.
+func (rw *rollingWindow) Percentile(p float64) float64 {
+	length := rw.Len()
+	if length == 0 {
+		return 0
+	}
+
+	sorted := make([]float64, length)
+	for i := range length {
+		sorted[i] = rw.window[i]
+	}
+	slices.Sort(sorted)
+
+	if p <= 0 {
+		return sorted[0]
+	}
+	if p >= 100 {
+		return sorted[len(sorted)-1]
+	}
+
+	rank := (p / 100.0) * float64(len(sorted)-1)
+	lo := int(math.Floor(rank))
+	hi := int(math.Ceil(rank))
+	frac := rank - float64(lo)
+
+	return sorted[lo]*(1-frac) + sorted[hi]*frac
+}
+
+// IsConfidenceSatisfied checks the relative margin of error is within the provided
+// `marginPercent` threshold using the provided z-score.
+func (rw *rollingWindow) IsConfidenceSatisfied(z float64, marginPercent float64) bool {
+	mean, stddev := rw.MeanStdDev()
+	length := float64(rw.Len())
+	marginOfError := z * (stddev / math.Sqrt(length))
+	relative := marginOfError / mean
+
+	return relative <= marginPercent
+}

+ 418 - 0
core/pkg/util/monitor/memory/helpers_test.go

@@ -0,0 +1,418 @@
+package memory
+
+import (
+	"math"
+	"testing"
+)
+
+const epsilon = 1e-9
+
+type set[T comparable] struct {
+	m map[T]struct{}
+}
+
+func newSet[T comparable](values ...T) *set[T] {
+	m := make(map[T]struct{})
+	for _, v := range values {
+		m[v] = struct{}{}
+	}
+	return &set[T]{
+		m: m,
+	}
+}
+
+func (s *set[T]) add(value T) {
+	s.m[value] = struct{}{}
+}
+
+func (s *set[T]) has(value T) bool {
+	_, hasValue := s.m[value]
+	return hasValue
+}
+
+func (s *set[T]) remove(value T) {
+	delete(s.m, value)
+}
+
+// -------------------------------------------------------------------------
+//  exponentialMovingAverage tests
+// -------------------------------------------------------------------------
+
+func TestEMA_InitialState(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	if ema.set {
+		t.Error("expected ema.set to be false on creation")
+	}
+	if ema.Current() != 0.0 {
+		t.Errorf("expected initial Current() = 0.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_FirstUpdateSetsValue(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	got := ema.Update(42.0)
+	if got != 42.0 {
+		t.Errorf("expected first Update() = 42.0, got %f", got)
+	}
+	if !ema.set {
+		t.Error("expected ema.set to be true after first update")
+	}
+}
+
+func TestEMA_SubsequentUpdates(t *testing.T) {
+	// With smoothing=0.5: EMA(n) = 0.5*sample + 0.5*EMA(n-1)
+	ema := newExponentialMovingAverage(0.5)
+	ema.Update(10.0)        // value = 10
+	got := ema.Update(20.0) // value = 0.5*20 + 0.5*10 = 15
+	want := 15.0
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected %f, got %f", want, got)
+	}
+}
+
+func TestEMA_SmoothingZero(t *testing.T) {
+	// smoothing=0 means the value never changes after the first sample
+	ema := newExponentialMovingAverage(0.0)
+	ema.Update(5.0)
+	ema.Update(100.0)
+	ema.Update(999.0)
+	if ema.Current() != 5.0 {
+		t.Errorf("expected Current() = 5.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_SmoothingOne(t *testing.T) {
+	// smoothing=1 means the value is always the latest sample
+	ema := newExponentialMovingAverage(1.0)
+	ema.Update(5.0)
+	ema.Update(99.0)
+	if ema.Current() != 99.0 {
+		t.Errorf("expected Current() = 99.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_Reset(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	ema.Update(10.0)
+	ema.Reset()
+	if ema.set {
+		t.Error("expected ema.set to be false after Reset()")
+	}
+	if ema.Current() != 0.0 {
+		t.Errorf("expected Current() = 0.0 after Reset(), got %f", ema.Current())
+	}
+	// First update after reset should treat as a fresh start
+	got := ema.Update(7.0)
+	if got != 7.0 {
+		t.Errorf("expected first Update() after Reset() = 7.0, got %f", got)
+	}
+}
+
+func TestEMA_MultipleUpdates(t *testing.T) {
+	smoothing := 0.3
+	ema := newExponentialMovingAverage(smoothing)
+
+	samples := []float64{10, 20, 30, 40, 50}
+	want := samples[0]
+	for i, s := range samples {
+		got := ema.Update(s)
+		if i == 0 {
+			want = s
+		} else {
+			want = smoothing*s + (1-smoothing)*want
+		}
+		if math.Abs(got-want) > epsilon {
+			t.Errorf("step %d: expected %f, got %f", i, want, got)
+		}
+	}
+}
+
+// -------------------------------------------------------------------------
+//  rollingWindow tests
+// -------------------------------------------------------------------------
+
+func TestRollingWindow_NewPanicsOnBadCapacity(t *testing.T) {
+	cases := []int{0, -1, math.MaxInt}
+	for _, cap := range cases {
+		func() {
+			defer func() {
+				if r := recover(); r == nil {
+					t.Errorf("expected panic for capacity %d", cap)
+				}
+			}()
+			newRollingWindow(cap)
+		}()
+	}
+}
+
+func TestRollingWindow_InitialLen(t *testing.T) {
+	rw := newRollingWindow(5)
+	if rw.Len() != 0 {
+		t.Errorf("expected Len() = 0, got %d", rw.Len())
+	}
+	if rw.Cap() != 5 {
+		t.Errorf("expected Cap() = 5, got %d", rw.Cap())
+	}
+}
+
+func TestRollingWindow_LenGrowsUpToCapacity(t *testing.T) {
+	rw := newRollingWindow(3)
+	rw.Push(1)
+	if rw.Len() != 1 {
+		t.Errorf("expected Len()=1, got %d", rw.Len())
+	}
+	rw.Push(2)
+	if rw.Len() != 2 {
+		t.Errorf("expected Len()=2, got %d", rw.Len())
+	}
+	rw.Push(3)
+	if rw.Len() != 3 {
+		t.Errorf("expected Len()=3, got %d", rw.Len())
+	}
+	// Pushing beyond capacity should not grow Len() past Cap()
+	rw.Push(4)
+	if rw.Len() != 3 {
+		t.Errorf("expected Len()=3 after overflow push, got %d", rw.Len())
+	}
+}
+
+func TestRollingWindow_Clear(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(1)
+	rw.Push(2)
+	rw.Clear()
+	if rw.Len() != 0 {
+		t.Errorf("expected Len()=0 after Clear(), got %d", rw.Len())
+	}
+}
+
+func TestRollingWindow_Mean_Empty(t *testing.T) {
+	rw := newRollingWindow(4)
+	if rw.Mean() != 0.0 {
+		t.Errorf("expected Mean()=0 for empty window, got %f", rw.Mean())
+	}
+}
+
+func TestRollingWindow_Mean_SingleValue(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(7.0)
+	if rw.Mean() != 7.0 {
+		t.Errorf("expected Mean()=7.0, got %f", rw.Mean())
+	}
+}
+
+func TestRollingWindow_Mean_MultipleValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{1, 2, 3, 4, 5} {
+		rw.Push(v)
+	}
+	want := 3.0
+	if math.Abs(rw.Mean()-want) > epsilon {
+		t.Errorf("expected Mean()=%f, got %f", want, rw.Mean())
+	}
+}
+
+func TestRollingWindow_MeanStdDev_SingleValue(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(10.0)
+	mean, stddev := rw.MeanStdDev()
+	if mean != 10.0 {
+		t.Errorf("expected mean=10.0, got %f", mean)
+	}
+	if stddev != 0.0 {
+		t.Errorf("expected stddev=0.0 for single value, got %f", stddev)
+	}
+}
+
+func TestRollingWindow_MeanStdDev_KnownValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{2, 4, 4, 4, 5, 5, 7, 9} {
+		rw.Push(v)
+	}
+	// Window holds only the last 5: [5, 5, 7, 9, 9] — wait, cap=5.
+	// Pushes: index 0=2,1=4,2=4,3=4,4=5 -> wraps: index 0=5,1=7,2=9
+	// Use a simpler known case instead.
+	rw2 := newRollingWindow(4)
+	for _, v := range []float64{10, 20, 30, 40} {
+		rw2.Push(v)
+	}
+	mean, stddev := rw2.MeanStdDev()
+	wantMean := 25.0
+	// sample stddev of {10,20,30,40} = sqrt(((−15)²+(−5)²+(5)²+(15)²)/3) = sqrt(500/3)
+	wantStddev := math.Sqrt(500.0 / 3.0)
+	if math.Abs(mean-wantMean) > epsilon {
+		t.Errorf("expected mean=%f, got %f", wantMean, mean)
+	}
+	if math.Abs(stddev-wantStddev) > epsilon {
+		t.Errorf("expected stddev=%f, got %f", wantStddev, stddev)
+	}
+}
+
+func TestRollingWindow_Percentile_Empty(t *testing.T) {
+	rw := newRollingWindow(4)
+	if rw.Percentile(50) != 0.0 {
+		t.Errorf("expected 0.0 for empty window percentile, got %f", rw.Percentile(50))
+	}
+}
+
+func TestRollingWindow_Percentile_BoundaryValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{3, 1, 4, 1, 5} {
+		rw.Push(v)
+	}
+	if rw.Percentile(0) != 1.0 {
+		t.Errorf("expected p0=1.0, got %f", rw.Percentile(0))
+	}
+	if rw.Percentile(100) != 5.0 {
+		t.Errorf("expected p100=5.0, got %f", rw.Percentile(100))
+	}
+}
+
+func TestRollingWindow_Percentile_Median(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{1, 2, 3, 4, 5} {
+		rw.Push(v)
+	}
+	got := rw.Percentile(50)
+	want := 3.0
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected p50=%f, got %f", want, got)
+	}
+}
+
+func TestRollingWindow_Percentile_Interpolation(t *testing.T) {
+	rw := newRollingWindow(4)
+	for _, v := range []float64{0, 10, 20, 30} {
+		rw.Push(v)
+	}
+	// rank = 0.25 * 3 = 0.75, lo=0(val=0), hi=1(val=10), frac=0.75 => 0*0.25 + 10*0.75 = 7.5
+	got := rw.Percentile(25)
+	want := 7.5
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected p25=%f, got %f", want, got)
+	}
+}
+
+func TestRollingWindow_IsConfidenceSatisfied(t *testing.T) {
+	rw := newRollingWindow(100)
+	// All the same value — stddev=0, margin=0, should always be satisfied
+	for range 100 {
+		rw.Push(50.0)
+	}
+	if !rw.IsConfidenceSatisfied(1.96, 0.05) {
+		t.Error("expected confidence satisfied for zero-variance data")
+	}
+}
+
+func TestRollingWindow_IsConfidenceSatisfied_HighVariance(t *testing.T) {
+	rw := newRollingWindow(10)
+	// High variance: alternating 1 and 1000
+	for i := range 10 {
+		if i%2 == 0 {
+			rw.Push(1.0)
+		} else {
+			rw.Push(1000.0)
+		}
+	}
+	// With high variance and small n, a tight margin should not be satisfied
+	if rw.IsConfidenceSatisfied(1.96, 0.001) {
+		t.Error("expected confidence NOT satisfied for high-variance data with tight margin")
+	}
+}
+
+func TestRollingWindow_Each(t *testing.T) {
+	rw := newRollingWindow(4)
+	for _, v := range []float64{1, 2, 3, 4} {
+		rw.Push(v)
+	}
+	sum := 0.0
+	rw.Each(func(v float64) { sum += v })
+	if math.Abs(sum-10.0) > epsilon {
+		t.Errorf("expected Each() sum=10.0, got %f", sum)
+	}
+}
+
+func TestRollingWindow_OverwritesOldestOnOverflow(t *testing.T) {
+	rw := newRollingWindow(3)
+	rw.Push(1)
+	rw.Push(2)
+	rw.Push(3)
+	rw.Push(100) // Should evict 1, window = [2, 3, 100]
+	mean := rw.Mean()
+	want := (2.0 + 3.0 + 100.0) / 3.0
+	if math.Abs(mean-want) > epsilon {
+		t.Errorf("expected mean=%f after overflow, got %f", want, mean)
+	}
+}
+
+func assertIndexLengthCap(t *testing.T, rw *rollingWindow, index int, length int, cap int) {
+	t.Helper()
+
+	if rw.index != index {
+		t.Errorf("RollingWindow Index: %d. Expected %d", rw.index, index)
+	}
+	if rw.Len() != length {
+		t.Errorf("RollingWindow Length: %d. Expected %d", rw.Len(), length)
+	}
+	if rw.Cap() != cap {
+		t.Errorf("RollingWindow Capacity: %d. Expected %d", rw.Cap(), cap)
+	}
+}
+
+func TestRollingWindow_BasicIndexingLengthCap(t *testing.T) {
+	capacity := 3
+	rw := newRollingWindow(capacity)
+
+	assertIndexLengthCap(t, rw, 0, 0, 3)
+	rw.Push(1)
+	assertIndexLengthCap(t, rw, 1, 1, 3)
+	rw.Push(2)
+	assertIndexLengthCap(t, rw, 2, 2, 3)
+	rw.Push(3)
+	assertIndexLengthCap(t, rw, 0, 3, 3)
+
+	rw.Push(1)
+	assertIndexLengthCap(t, rw, 1, 3, 3)
+	rw.Push(2)
+	assertIndexLengthCap(t, rw, 2, 3, 3)
+	rw.Push(3)
+	assertIndexLengthCap(t, rw, 0, 3, 3)
+
+	set := newSet(1, 2, 3)
+
+	rw.Each(func(value float64) {
+		v := int(value)
+		if !set.has(v) {
+			t.Errorf("Failed to find value: %d in set.\n", v)
+		}
+
+		set.remove(v)
+	})
+
+	// rewrite
+	rw.Push(4)
+	rw.Push(5)
+	rw.Push(6)
+
+	set = newSet(4, 5, 6)
+	rw.Each(func(value float64) {
+		v := int(value)
+		if !set.has(v) {
+			t.Errorf("Failed to find value: %d in set.\n", v)
+		}
+
+		set.remove(v)
+	})
+}
+
+func TestRollingWindow_PartialCapacityMean(t *testing.T) {
+	rw := newRollingWindow(10)
+	for range 5 {
+		rw.Push(5.0)
+	}
+
+	mean := rw.Mean()
+	if mean != 5.0 {
+		t.Errorf("Expected mean = 5.0. Got %f\n", mean)
+	}
+}

+ 226 - 0
core/pkg/util/monitor/memory/memorylimitstats.go

@@ -0,0 +1,226 @@
+package memory
+
+import "sync"
+
+// MemoryLimitConfig contains configuration values used to calculate the soft
+// memory limit based on heap usage over time.
+type MemoryLimitConfig struct {
+	// LimitRatio is the ratio applied to memory limit values calculated. This
+	// is generally set to 90% of the proposed limit. ie: 0.9
+	LimitRatio float64
+
+	// MinSamples is the required number of samples that must be collected before
+	// calculating a memory limit.
+	MinSamples int
+
+	// WindowSize is the total number of smoothed samples to maintain when calculating
+	// the proposed limit. This is generally set based on the timescale in which
+	// samples are added.
+	WindowSize int
+
+	// SmoothingFactor is a value between 0 and 1 which defines how weight importance
+	// should be distributed between a previous average value and the current observation.
+	SmoothingFactor float64
+
+	// BreachWindowSize is the total number of recent raw samples are maintained/used for
+	// breach detection.
+	BreachWindowSize int
+
+	// BreachThreshold is a limit of raw samples, within the `BreachWindowSize`, allowed to
+	// exceed the memory limit. If this threshold is reached, the samples are recalibrated.
+	BreachThreshold int
+
+	// CumulativeSumSlack is also known as the K-Factor (drift tolerance) in cumulative sum control
+	// charts uses the allowable slack range in deviations. If the deviations exceed the allowable
+	// slack, then they're used to calculate the sum. This is generally set from 0.5 to 1.0 standard
+	// deviations to filter out process noise.
+	CumulativeSumSlack float64
+
+	// CumulativeSumThreshold is a scaler applied to the "baseline" mean (set once there are enough
+	// samples to be considered "stable"). If the cumulative sum ever surpasses this baseline * threshold,
+	// the samples will be recalibrated.
+	CumulativeSumThreshold float64
+}
+
+// DefaultMemoryLimitConfig creates the recommended values to use for detecting soft memory limit updates
+func DefaultMemoryLimitConfig() *MemoryLimitConfig {
+	return &MemoryLimitConfig{
+		LimitRatio:             0.90,
+		MinSamples:             30,
+		WindowSize:             60,
+		SmoothingFactor:        0.30,
+		BreachWindowSize:       10,
+		BreachThreshold:        3,
+		CumulativeSumSlack:     0.05,
+		CumulativeSumThreshold: 5.0,
+	}
+}
+
+// MemoryLimitStats is a run-time memory statistics collector that maintains a soft memory limit
+// value based on configurable input parameters. It is designed to adjust the soft limit based on
+// meaningful changes to overall heap allocation, leveraging exponential moving average windows,
+// confidence interval gates, breach detection, and cumulative sum control chart to detect meaningful
+// deviations from the mean.
+type MemoryLimitStats struct {
+	lock   sync.Mutex
+	config *MemoryLimitConfig
+
+	// expontential moving average calculation
+	ema *exponentialMovingAverage
+
+	// ring buffers for tracking exponential moving averages and raw samples
+	window *rollingWindow
+	raw    *rollingWindow
+	breach *rollingWindow
+
+	// cusum calculation for detecting positive shifts in memory usage
+	cusum *cumulativeSum
+
+	// tracked value storage for the soft memory limit proposal which
+	// stores the previous limit as well as the current limit
+	softLimit *trackedValue
+}
+
+// NewMemoryLimitStats creates a new `MemoryLimitStats` instance with the provided
+// `MemoryLimitConfig`. If the provided config is `nil`, then the default configuration
+// values are used.
+func NewMemoryLimitStats(config *MemoryLimitConfig) *MemoryLimitStats {
+	if config == nil {
+		config = DefaultMemoryLimitConfig()
+	}
+
+	return &MemoryLimitStats{
+		config:    config,
+		ema:       newExponentialMovingAverage(config.SmoothingFactor),
+		window:    newRollingWindow(config.WindowSize),
+		raw:       newRollingWindow(config.MinSamples),
+		breach:    newRollingWindow(config.BreachWindowSize),
+		cusum:     newCumulativeSum(config.CumulativeSumSlack),
+		softLimit: newTrackedValue(),
+	}
+}
+
+// Record ingests the total heap memory usage (in bytes), and returns
+// (newSoftLimit, true) when the soft limit has been updated, or
+// (currentSoftLimit, false) when no change occurred.
+//
+// A return value of (0, false) means the monitor is still collecting samples
+// and no limit has been committed yet.
+func (mls *MemoryLimitStats) Record(heapBytes uint64) (softLimit uint64, updated bool) {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	sample := float64(heapBytes)
+	smoothed := mls.ema.Update(sample)
+
+	mls.window.Push(smoothed)
+	mls.raw.Push(sample)
+	mls.breach.Push(sample)
+
+	// Check that the minimum number of samples exist in the window before
+	// calculating the memory limit
+	totalSamples := mls.window.Len()
+	if totalSamples < mls.config.MinSamples {
+		return uint64(mls.softLimit.Value()), false
+	}
+
+	// NOTE: We could calculate the mean and stddev here, and determine if the data
+	// NOTE: matches a confidence interval, but this might be too strict. See the
+	// NOTE: method: mls.window.IsConfidenceSatisfied(...) method.
+
+	// Pull the P99 sample from the smoothed sample window
+	p99 := mls.window.Percentile(99)
+	candidate := p99 * mls.config.LimitRatio
+
+	// Ensure we've already set a soft limit before running breach
+	// detection or CUSUM deviation tests.
+	if mls.softLimit.IsSet() {
+		if mls.isBreachDetected() {
+			mls.recalibrate()
+			return uint64(mls.softLimit.Value()), false
+		}
+
+		// update cumulative sum and check for recalibration
+		mls.cusum.Update(sample)
+		if mls.cusum.IsRecalibrationRequired(mls.config.CumulativeSumThreshold) {
+			mls.recalibrate()
+			return uint64(mls.softLimit.Value()), false
+		}
+
+		// this will only end up running once after the min samples threshold
+		// is passed, and sets the baseline mean for the cusum calculations
+		mean := mls.raw.Mean()
+		mls.cusum.Calibrate(mean)
+	}
+
+	// update the soft limit to the candidate sample
+	updated = mls.softLimit.Set(candidate)
+	softLimit = uint64(mls.softLimit.Value())
+	return
+}
+
+// SoftLimit returns the current soft limit without recording a sample.
+// Returns 0 if the monitor is still collecting data samples.
+func (mls *MemoryLimitStats) SoftMemoryLimit() uint64 {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	return uint64(mls.softLimit.Value())
+}
+
+// TotalSamples returns the total number of samples _currently_ being used to
+// calculate the memory limit. The samples will reset if a deviation threshold
+// was reached in order to re-establish stability in the data set.
+func (mls *MemoryLimitStats) TotalSamples() int {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	return mls.window.Len()
+}
+
+// Reset clears all state and samples collected.
+func (mls *MemoryLimitStats) Reset() {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	mls.window.Clear()
+	mls.raw.Clear()
+	mls.breach.Clear()
+	mls.softLimit.Clear()
+	mls.ema.Reset()
+	mls.cusum.Reset()
+}
+
+// isBreachDetected iterates through the breach sample window and tallies the
+// total number of samples that exceed the p99 smoothed memory usage sample.
+func (mls *MemoryLimitStats) isBreachDetected() bool {
+	if !mls.softLimit.IsSet() {
+		return false
+	}
+
+	// due to the nature of breach detection, we want to compare
+	// against the smoothed p99 sample, so unroll the ratio
+	p99 := mls.softLimit.Value() / mls.config.LimitRatio
+
+	// Tally the total number of recent raw samples that are
+	// greater than the p99 smoothed sample
+	count := 0
+	mls.breach.Each(func(value float64) {
+		if value > p99 {
+			count++
+		}
+	})
+
+	return count >= mls.config.BreachThreshold
+}
+
+// recalibrate dumps the existing samples and calculations, but will preserve
+// the previous softLimit value until a new soft limit is set.
+func (mls *MemoryLimitStats) recalibrate() {
+	mls.window.Clear()
+	mls.raw.Clear()
+	mls.breach.Clear()
+	mls.ema.Reset()
+	mls.cusum.Reset()
+	mls.softLimit.Reset()
+}

+ 141 - 0
core/pkg/util/monitor/memory/memorylimitstats_test.go

@@ -0,0 +1,141 @@
+package memory_test
+
+import (
+	"math/rand"
+	"testing"
+
+	"github.com/opencost/opencost/core/pkg/util/monitor/memory"
+)
+
+func TestObservationMode(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	// Feed fewer than MinSamples — should never return updated=true.
+	for i := 0; i < config.MinSamples-1; i++ {
+		_, updated := m.Record(100 * 1024 * 1024) // 100 MiB
+		if updated {
+			t.Fatalf("sample %d: got updated=true before MinSamples reached", i)
+		}
+	}
+	if got := m.SoftMemoryLimit(); got != 0 {
+		t.Fatalf("expected SoftLimit 0 during observation, got %d", got)
+	}
+}
+
+func TestLimitCommittedAfterMinSamples(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	const alloc = 200 * 1024 * 1024 // 200 MiB, perfectly stable
+	var lastLimit uint64
+	var sawUpdate bool
+
+	for i := 0; i < config.MinSamples+10; i++ {
+		limit, updated := m.Record(alloc)
+		if updated {
+			sawUpdate = true
+			lastLimit = limit
+		}
+	}
+
+	if !sawUpdate {
+		t.Fatal("expected at least one limit update after MinSamples")
+	}
+
+	// Soft limit should be ~90% of the stable allocation.
+	expected := uint64(float64(alloc) * 0.90)
+	delta := int64(lastLimit) - int64(expected)
+	if delta < 0 {
+		delta = -delta
+	}
+	// Allow 1% tolerance.
+	if delta > int64(expected)/100 {
+		t.Fatalf("soft limit %d too far from expected %d (delta %d)", lastLimit, expected, delta)
+	}
+}
+
+func TestElasticRecalibrationOnGrowth(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	config.BreachWindowSize = 5
+	config.BreachThreshold = 3
+	m := memory.NewMemoryLimitStats(config)
+
+	// Phase 1: stable at 100 MiB — establish a limit.
+	for i := 0; i < config.MinSamples+20; i++ {
+		m.Record(100 * 1024 * 1024)
+	}
+	limitBefore := m.SoftMemoryLimit()
+	if limitBefore == 0 {
+		t.Fatal("expected a non-zero limit after phase 1")
+	}
+
+	// Phase 2: spike to 300 MiB repeatedly — should trigger recalibration.
+	for i := 0; i < config.BreachThreshold+1; i++ {
+		m.Record(500 * 1024 * 1024)
+	}
+
+	// Phase 3: feed enough samples at new level to re-commit.
+	var recalibrated bool
+	for i := 0; i < config.MinSamples+20; i++ {
+		limit, updated := m.Record(800 * 1024 * 1024)
+		if updated && limit > limitBefore {
+			recalibrated = true
+			break
+		}
+	}
+	if !recalibrated {
+		t.Fatal("expected the soft limit to grow after sustained high usage")
+	}
+}
+
+func TestReset(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	for i := 0; i < config.MinSamples+5; i++ {
+		m.Record(128 * 1024 * 1024)
+	}
+	if m.SoftMemoryLimit() == 0 {
+		t.Fatal("expected non-zero soft limit before reset")
+	}
+
+	m.Reset()
+
+	if m.SoftMemoryLimit() != 0 {
+		t.Fatal("expected zero soft limit after reset")
+	}
+	if m.TotalSamples() != 0 {
+		t.Fatal("expected zero sample count after reset")
+	}
+}
+
+func TestNoisyInputStability(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	rng := rand.New(rand.NewSource(42))
+	base := float64(256 * 1024 * 1024) // 256 MiB
+
+	var limits []uint64
+	for i := 0; i < 200; i++ {
+		// ±10% noise around base
+		noise := (rng.Float64()*0.2 - 0.1) * base
+		_, _ = m.Record(uint64(base + noise))
+		if l := m.SoftMemoryLimit(); l > 0 {
+			limits = append(limits, l)
+		}
+	}
+
+	if len(limits) == 0 {
+		t.Fatal("expected at least one committed limit")
+	}
+
+	// The final limit should be in a sensible range: 75–95% of base.
+	last := limits[len(limits)-1]
+	lo := uint64(base * 0.75)
+	hi := uint64(base * 0.95)
+	if last < lo || last > hi {
+		t.Fatalf("final limit %d outside expected range [%d, %d]", last, lo, hi)
+	}
+}

+ 123 - 0
core/pkg/util/monitor/memorylimiter.go

@@ -0,0 +1,123 @@
+package monitor
+
+import (
+	"fmt"
+	"math"
+	"runtime"
+	"runtime/debug"
+	"sync"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/atomic"
+	"github.com/opencost/opencost/core/pkg/util/monitor/memory"
+)
+
+var (
+	once          sync.Once
+	memoryLimiter *MemoryLimiter
+)
+
+// MemoryLimiter is a heap usage monitor for the go runtime which will attempt to
+// dynamically set a GOMEMLIMIT value to best fit the heap usage. It will only
+// adjust the GOMEMLIMIT if the usage analysis results in an increase, and won't
+// try to "best fit" the current usage. It takes into account the initial GOMEMLIMIT
+// value as the baseline.
+type MemoryLimiter struct {
+	runState atomic.AtomicRunState
+	monitor  *memory.MemoryLimitStats
+}
+
+// Start begins collecting heap allocation samples for automatically adjusting the go soft memory limit
+// for heap usage.
+func (ml *MemoryLimiter) Start(interval time.Duration) error {
+	ml.runState.WaitForReset()
+
+	if !ml.runState.Start() {
+		return fmt.Errorf("memory limiter was already started")
+	}
+
+	// main limiter driver
+	go func() {
+		var memStats runtime.MemStats
+		var prevLimit uint64
+
+		// determine if mem limit was set prior by passing a negative
+		// value to SetMemoryLimit, which will return the current value
+		// without making any changes -- the default is MaxInt64
+		goMemLimit := debug.SetMemoryLimit(-1)
+		if goMemLimit == math.MaxInt64 {
+			prevLimit = 0
+		} else {
+			prevLimit = uint64(goMemLimit)
+		}
+
+		// take initial heap measurement
+		runtime.ReadMemStats(&memStats)
+		ml.monitor.Record(memStats.HeapAlloc)
+
+		for {
+			select {
+			case <-ml.runState.OnStop():
+				ml.runState.Reset()
+				return
+
+			case <-time.After(interval):
+			}
+
+			// in the event that someone updates the limit outside of this monitor
+			// we want to make sure that we synchronize the correct value
+			goMemLimit = debug.SetMemoryLimit(-1)
+			if goMemLimit != math.MaxInt64 && goMemLimit != int64(prevLimit) {
+				prevLimit = uint64(goMemLimit)
+			}
+
+			// record and determine if we should update the memory limit
+			runtime.ReadMemStats(&memStats)
+			if softLimit, updated := ml.monitor.Record(memStats.HeapAlloc); updated {
+				// we only allow the limit to increase for now, as this best reflects a
+				// max stable set of samples. Worth observation and potentially updating
+				// in the future
+				if softLimit != 0 && softLimit > prevLimit {
+					prevLimit = softLimit
+					log.Debugf("Updating Go Memory Limit: %dmb", int64(softLimit/1024.0/1024.0))
+					debug.SetMemoryLimit(int64(softLimit))
+				}
+			}
+		}
+	}()
+
+	return nil
+}
+
+// Stops automatically adjusting the memory limiter
+func (ml *MemoryLimiter) Stop() error {
+	if !ml.runState.Stop() {
+		return fmt.Errorf("could not stop memory limiter - in the state of stopping or already stopped")
+	}
+	return nil
+}
+
+// returns the singleton instance of the memory limiter
+func getMemoryLimiter() *MemoryLimiter {
+	once.Do(func() {
+		config := memory.DefaultMemoryLimitConfig()
+		memoryLimiter = &MemoryLimiter{
+			monitor: memory.NewMemoryLimitStats(config),
+		}
+	})
+
+	return memoryLimiter
+}
+
+// DefaultMemoryLimiterSampleInterval is the sample interval in which the auto limiter
+// gathers heap usage.
+const DefaultMemoryLimiterSampleInterval = time.Second
+
+func StartMemoryLimiter() error {
+	return getMemoryLimiter().Start(DefaultMemoryLimiterSampleInterval)
+}
+
+func StopMemoryLimiter() error {
+	return getMemoryLimiter().Stop()
+}

+ 160 - 0
core/pkg/util/stringutil/lrubank.go

@@ -0,0 +1,160 @@
+package stringutil
+
+import (
+	"container/heap"
+	"sync"
+	"time"
+)
+
+type lruEntry struct {
+	value string
+	used  int64
+}
+type maxHeap []*lruEntry
+
+func (h maxHeap) Len() int           { return len(h) }
+func (h maxHeap) Less(i, j int) bool { return h[i].used > h[j].used } // newer = "larger"
+func (h maxHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
+
+func (h *maxHeap) Push(x any) {
+	*h = append(*h, x.(*lruEntry))
+}
+
+func (h *maxHeap) Pop() any {
+	old := *h
+	n := len(old)
+	x := old[n-1]
+	*h = old[:n-1]
+	return x
+}
+
+func nOldest(arr []*lruEntry, n int) []*lruEntry {
+	if n <= 0 {
+		return []*lruEntry{}
+	}
+
+	if n >= len(arr) {
+		return arr
+	}
+
+	h := maxHeap(arr[:n])
+	heap.Init(&h)
+
+	for _, entry := range arr[n:] {
+		// swap in oldest, re-heapify
+		if entry.used < h[0].used {
+			h[0] = entry
+			heap.Fix(&h, 0)
+		}
+	}
+
+	return []*lruEntry(h)
+}
+
+type lruStringBank struct {
+	lock     sync.Mutex
+	stop     chan struct{}
+	m        map[string]*lruEntry
+	capacity int
+}
+
+func NewLruStringBank(capacity int, evictionInterval time.Duration) StringBank {
+	stop := make(chan struct{})
+	bank := &lruStringBank{
+		stop:     stop,
+		m:        make(map[string]*lruEntry),
+		capacity: capacity,
+	}
+
+	go func() {
+		for {
+			select {
+			case <-stop:
+				return
+			case <-time.After(evictionInterval):
+			}
+
+			// need to take the lock during eviction
+			bank.lock.Lock()
+			evict(bank, capacity)
+			bank.lock.Unlock()
+		}
+	}()
+
+	return bank
+}
+
+func evict(bank *lruStringBank, capacity int) {
+	if len(bank.m) <= capacity {
+		return
+	}
+
+	// we collect a list of all lru entries so we can max heap the first n elements
+	arr := make([]*lruEntry, 0, len(bank.m))
+	for _, v := range bank.m {
+		arr = append(arr, v)
+	}
+
+	oldest := nOldest(arr, len(bank.m)-capacity)
+	for _, old := range oldest {
+		delete(bank.m, old.value)
+	}
+}
+
+func (sb *lruStringBank) Stop() {
+	sb.lock.Lock()
+	defer sb.lock.Unlock()
+
+	if sb.stop != nil {
+		close(sb.stop)
+		sb.stop = nil
+	}
+}
+
+func (sb *lruStringBank) LoadOrStore(key, value string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		v.used = time.Now().UnixMilli()
+		sb.lock.Unlock()
+		return v.value, ok
+	}
+
+	sb.m[key] = &lruEntry{
+		value: value,
+		used:  time.Now().UnixMilli(),
+	}
+	if len(sb.m) > (sb.capacity + (sb.capacity / 2)) {
+		evict(sb, sb.capacity)
+	}
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *lruStringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		v.used = time.Now().UnixMilli()
+		sb.lock.Unlock()
+		return v.value, ok
+	}
+
+	// create the key and value using the func (the key could be deallocated later)
+	value := f()
+	sb.m[value] = &lruEntry{
+		value: value,
+		used:  time.Now().UnixMilli(),
+	}
+	if len(sb.m) > (sb.capacity + (sb.capacity / 2)) {
+		evict(sb, sb.capacity)
+	}
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *lruStringBank) Clear() {
+	sb.lock.Lock()
+	sb.m = make(map[string]*lruEntry)
+	sb.lock.Unlock()
+}

+ 408 - 0
core/pkg/util/stringutil/lrubank_test.go

@@ -0,0 +1,408 @@
+package stringutil
+
+import (
+	"fmt"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestBasicLruEvict(t *testing.T) {
+	lruBank := NewLruStringBank(3, 2*time.Second).(*lruStringBank)
+	defer lruBank.Stop()
+
+	lruBank.LoadOrStore("foo", "foo")
+	time.Sleep(500 * time.Millisecond)
+	lruBank.LoadOrStore("bar", "bar")
+	time.Sleep(500 * time.Millisecond)
+	lruBank.LoadOrStore("whaz", "whaz")
+	time.Sleep(500 * time.Millisecond)
+	// access foo, updating recency
+	lruBank.LoadOrStore("foo", "foo")
+	// should push bar out after eviction runs
+	lruBank.LoadOrStore("test", "test")
+	time.Sleep(time.Second)
+
+	lruBank.lock.Lock()
+	for _, v := range lruBank.m {
+		t.Logf("Value: %s\n", v.value)
+		if v.value == "bar" {
+			t.Errorf("The 'bar' entry should've been replaced by 'test'")
+		}
+	}
+	lruBank.lock.Unlock()
+}
+
+// ---------------------------------------------------------------------------
+// LoadOrStore
+// ---------------------------------------------------------------------------
+
+// A stored value must be retrievable and LoadOrStore must signal the hit/miss
+// correctly via the boolean return.
+func TestLoadOrStore_MissAndHit(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	v, loaded := bank.LoadOrStore("hello", "hello")
+	if loaded {
+		t.Errorf("first LoadOrStore: expected loaded=false, got true")
+	}
+	if v != "hello" {
+		t.Errorf("first LoadOrStore: expected value %q, got %q", "hello", v)
+	}
+
+	v, loaded = bank.LoadOrStore("hello", "world")
+	if !loaded {
+		t.Errorf("second LoadOrStore: expected loaded=true, got false")
+	}
+	// The original value must be returned on a hit, not the new candidate.
+	if v != "hello" {
+		t.Errorf("second LoadOrStore: expected cached value %q, got %q", "hello", v)
+	}
+}
+
+// Hitting an existing entry must update its recency so it is not evicted ahead
+// of entries that were never touched again.
+func TestLoadOrStore_HitUpdateRecency(t *testing.T) {
+	bank := NewLruStringBank(2, 500*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	bank.LoadOrStore("old", "old")
+	time.Sleep(100 * time.Millisecond)
+	bank.LoadOrStore("keep", "keep")
+	time.Sleep(100 * time.Millisecond)
+
+	// Re-touch "old" so it becomes the most-recently-used.
+	bank.LoadOrStore("old", "old")
+	time.Sleep(100 * time.Millisecond)
+
+	// Adding a third entry exceeds capacity; "keep" should be the oldest now.
+	bank.LoadOrStore("new", "new")
+
+	// Wait for the eviction goroutine.
+	time.Sleep(600 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if _, ok := bank.m["keep"]; ok {
+		t.Error("expected 'keep' to be evicted but it is still present")
+	}
+	if _, ok := bank.m["old"]; !ok {
+		t.Error("expected 'old' to survive eviction after its recency was refreshed")
+	}
+}
+
+// ---------------------------------------------------------------------------
+// LoadOrStoreFunc
+// ---------------------------------------------------------------------------
+
+// The factory function must only be called on a cache miss, not on a hit.
+func TestLoadOrStoreFunc_FactoryCalledOnMissOnly(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+	calls := 0
+
+	factory := func() string {
+		calls++
+		return "k"
+	}
+
+	bank.LoadOrStoreFunc("k", factory)
+	bank.LoadOrStoreFunc("k", factory)
+
+	if calls != 1 {
+		t.Errorf("factory should be called exactly once, got %d calls", calls)
+	}
+}
+
+// ---------------------------------------------------------------------------
+// Capacity / eviction
+// ---------------------------------------------------------------------------
+
+// If the bank never exceeds capacity, nothing should be evicted.
+func TestEviction_BelowCapacityNoEviction(t *testing.T) {
+	const capacity = 5
+	bank := NewLruStringBank(capacity, 200*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	for i := 0; i < capacity; i++ {
+		bank.LoadOrStore(fmt.Sprintf("v%d", i), fmt.Sprintf("v%d", i))
+	}
+
+	// Wait several eviction cycles.
+	time.Sleep(600 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if got := len(bank.m); got != capacity {
+		t.Errorf("expected %d entries, got %d", capacity, got)
+	}
+}
+
+// After eviction the map must be trimmed down to exactly capacity.
+func TestEviction_ExceedCapacityTrimsToCapacity(t *testing.T) {
+	const capacity = 3
+	bank := NewLruStringBank(capacity, 350*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	for i := 0; i < capacity+3; i++ {
+		bank.LoadOrStore(fmt.Sprintf("v%d", i), fmt.Sprintf("v%d", i))
+		time.Sleep(20 * time.Millisecond) // ensure distinct timestamps
+	}
+
+	// Wait for eviction.
+	time.Sleep(500 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if got := len(bank.m); got > capacity {
+		t.Errorf("expected at most %d entries after eviction, got %d", capacity, got)
+	}
+}
+
+// The most-recently-used entries must survive eviction.
+func TestEviction_MRUSurvives(t *testing.T) {
+	const capacity = 2
+	bank := NewLruStringBank(capacity, 300*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	bank.LoadOrStore("evict1", "evict1")
+	time.Sleep(50 * time.Millisecond)
+	bank.LoadOrStore("evict2", "evict2")
+	time.Sleep(50 * time.Millisecond)
+
+	// These two are the most recent; they must survive.
+	bank.LoadOrStore("keep1", "keep1")
+	time.Sleep(50 * time.Millisecond)
+	bank.LoadOrStore("keep2", "keep2")
+
+	time.Sleep(500 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	for _, must := range []string{"keep1", "keep2"} {
+		if _, ok := bank.m[must]; !ok {
+			t.Errorf("expected %q to survive eviction", must)
+		}
+	}
+}
+
+// ---------------------------------------------------------------------------
+// Clear
+// ---------------------------------------------------------------------------
+
+func TestClear_EmptiesMap(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	for i := 0; i < 5; i++ {
+		bank.LoadOrStore(fmt.Sprintf("v%d", i), fmt.Sprintf("v%d", i))
+	}
+
+	bank.Clear()
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if len(bank.m) != 0 {
+		t.Errorf("expected empty map after Clear, got %d entries", len(bank.m))
+	}
+}
+
+// After a Clear, previously stored keys must not be found.
+func TestClear_PreviousKeysGone(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	bank.LoadOrStore("hello", "world")
+	bank.Clear()
+
+	_, loaded := bank.LoadOrStore("hello", "new")
+	if loaded {
+		t.Error("expected key to be absent after Clear, but it was found")
+	}
+}
+
+// ---------------------------------------------------------------------------
+// nOldest helper
+// ---------------------------------------------------------------------------
+
+func TestNOldest_ReturnsCorrectCount(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "a", used: now.Add(-4 * time.Second).UnixMilli()},
+		{value: "b", used: now.Add(-3 * time.Second).UnixMilli()},
+		{value: "c", used: now.Add(-2 * time.Second).UnixMilli()},
+		{value: "d", used: now.Add(-1 * time.Second).UnixMilli()},
+		{value: "e", used: now.UnixMilli()},
+	}
+
+	oldest := nOldest(entries, 2)
+	if len(oldest) != 2 {
+		t.Fatalf("expected 2 oldest entries, got %d", len(oldest))
+	}
+
+	values := map[string]bool{}
+	for _, e := range oldest {
+		values[e.value] = true
+	}
+	for _, must := range []string{"a", "b"} {
+		if !values[must] {
+			t.Errorf("expected %q in oldest set, got %v", must, values)
+		}
+	}
+}
+
+func TestNOldest_NGreaterThanLen(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "x", used: now.UnixMilli()},
+		{value: "y", used: now.Add(-time.Second).UnixMilli()},
+	}
+
+	result := nOldest(entries, 10)
+	if len(result) != 2 {
+		t.Errorf("expected all %d entries when n >= len, got %d", 2, len(result))
+	}
+}
+
+func TestNOldest_NEqualsLen(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "x", used: now.UnixMilli()},
+		{value: "y", used: now.Add(-time.Second).UnixMilli()},
+	}
+
+	result := nOldest(entries, 2)
+	if len(result) != 2 {
+		t.Errorf("expected 2 entries when n == len, got %d", len(result))
+	}
+}
+
+func TestNOldest_NIsZero(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "x", used: now.UnixMilli()},
+	}
+
+	result := nOldest(entries, 0)
+	if len(result) != 0 {
+		t.Errorf("expected 0 entries when n=0, got %d", len(result))
+	}
+}
+
+// ---------------------------------------------------------------------------
+// Concurrency
+// ---------------------------------------------------------------------------
+
+// Concurrent LoadOrStore calls must not race or panic.
+func TestConcurrentLoadOrStore(t *testing.T) {
+	bank := NewLruStringBank(50, 100*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	const goroutines = 20
+	const opsEach = 100
+
+	var wg sync.WaitGroup
+	for i := 0; i < goroutines; i++ {
+		g := i
+		wg.Go(func() {
+			for i := 0; i < opsEach; i++ {
+				key := fmt.Sprintf("k%d", (g*opsEach+i)%30)
+				bank.LoadOrStore(key, key)
+			}
+		})
+	}
+
+	waiter := func() chan struct{} {
+		st := make(chan struct{})
+
+		go func() {
+			wg.Wait()
+			close(st)
+		}()
+
+		return st
+	}
+
+	select {
+	case <-waiter():
+		t.Logf("Completed Successfully\n")
+	case <-time.After(10 * time.Second):
+		t.Logf("Timed out\n")
+	}
+}
+
+// Concurrent calls interleaved with eviction cycles must not deadlock or race.
+func TestConcurrentLoadOrStoreWithEviction(t *testing.T) {
+	bank := NewLruStringBank(5, 50*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	const goroutines = 10
+	const duration = 300 * time.Millisecond
+
+	var wg sync.WaitGroup
+
+	for i := 0; i < goroutines; i++ {
+		g := i
+		stop := time.After(duration)
+
+		wg.Go(func() {
+			for {
+				select {
+				case <-stop:
+					return
+				default:
+					key := fmt.Sprintf("g%d", g)
+					bank.LoadOrStore(key, key)
+				}
+			}
+		})
+	}
+
+	waiter := func() chan struct{} {
+		st := make(chan struct{})
+
+		go func() {
+			wg.Wait()
+			close(st)
+		}()
+
+		return st
+	}
+
+	select {
+	case <-waiter():
+		t.Logf("Completed Successfully\n")
+	case <-time.After(10 * time.Second):
+		t.Logf("Timed out\n")
+	}
+}
+
+// Concurrent Clear calls alongside reads/writes must not panic.
+func TestConcurrentClear(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	var wg sync.WaitGroup
+	for i := 0; i < 5; i++ {
+		wg.Add(1)
+		go func(i int) {
+			defer wg.Done()
+			bank.LoadOrStore(fmt.Sprintf("k%d", i), "v")
+		}(i)
+	}
+	for i := 0; i < 3; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			bank.Clear()
+		}()
+	}
+
+	wg.Wait()
+}

+ 48 - 0
core/pkg/util/stringutil/mapbank.go

@@ -0,0 +1,48 @@
+package stringutil
+
+import "sync"
+
+type stringBank struct {
+	lock sync.Mutex
+	m    map[string]string
+}
+
+func NewStringBank() StringBank {
+	return &stringBank{
+		m: make(map[string]string),
+	}
+}
+
+func (sb *stringBank) LoadOrStore(key, value string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		sb.lock.Unlock()
+		return v, ok
+	}
+
+	sb.m[value] = value
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *stringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		sb.lock.Unlock()
+		return v, ok
+	}
+
+	// create the key and value using the func (the key could be deallocated later)
+	value := f()
+	sb.m[value] = value
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *stringBank) Clear() {
+	sb.lock.Lock()
+	sb.m = make(map[string]string)
+	sb.lock.Unlock()
+}

+ 17 - 0
core/pkg/util/stringutil/noopbank.go

@@ -0,0 +1,17 @@
+package stringutil
+
+type noOpStringBank struct{}
+
+func NewNoOpStringBank() StringBank {
+	return new(noOpStringBank)
+}
+
+func (nsb *noOpStringBank) LoadOrStore(key, value string) (string, bool) {
+	return value, true
+}
+
+func (nsb *noOpStringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
+	return f(), true
+}
+
+func (nsb *noOpStringBank) Clear() {}

+ 27 - 44
core/pkg/util/stringutil/stringutil.go

@@ -23,75 +23,58 @@ const (
 var alpha = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
 var alphanumeric = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
 
-type stringBank struct {
-	lock sync.Mutex
-	m    map[string]string
+type StringBank interface {
+	LoadOrStore(key, value string) (string, bool)
+	LoadOrStoreFunc(key string, f func() string) (string, bool)
+	Clear()
 }
 
-func newStringBank() *stringBank {
-	return &stringBank{
-		m: make(map[string]string),
-	}
-}
+var (
+	lock sync.RWMutex
 
-func (sb *stringBank) LoadOrStore(key, value string) (string, bool) {
-	sb.lock.Lock()
-
-	if v, ok := sb.m[key]; ok {
-		sb.lock.Unlock()
-		return v, ok
-	}
+	// stringBank is an unbounded string cache that is thread-safe. It is especially useful if
+	// storing a large frequency of dynamically allocated duplicate strings.
+	strings StringBank = NewStringBank()
+)
 
-	sb.m[key] = value
-	sb.lock.Unlock()
-	return value, false
+func init() {
+	rand.Seed(time.Now().UnixNano())
 }
 
-func (sb *stringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
-	sb.lock.Lock()
+func UpdateStringBank(sb StringBank) {
+	lock.Lock()
+	defer lock.Unlock()
 
-	if v, ok := sb.m[key]; ok {
-		sb.lock.Unlock()
-		return v, ok
-	}
-
-	// create the key and value using the func (the key could be deallocated later)
-	value := f()
-	sb.m[value] = value
-	sb.lock.Unlock()
-	return value, false
-}
-
-func (sb *stringBank) Clear() {
-	sb.lock.Lock()
-	sb.m = make(map[string]string)
-	sb.lock.Unlock()
+	strings.Clear()
+	strings = sb
 }
 
-// stringBank is an unbounded string cache that is thread-safe. It is especially useful if
-// storing a large frequency of dynamically allocated duplicate strings.
-var strings = newStringBank() // sync.Map
+// GetStringBank returns the _current_ StringBank implementation. Note that the read-lock is
+// not held for the duration of usage, so the returned string bank could be swapped out
+// after being retrieved.
+func GetStringBank() StringBank {
+	lock.RLock()
+	defer lock.RUnlock()
 
-func init() {
-	rand.Seed(time.Now().UnixNano())
+	return strings
 }
 
 // Bank will return a non-copy of a string if it has been used before. Otherwise, it will store
 // the string as the unique instance.
 func Bank(s string) string {
-	ss, _ := strings.LoadOrStore(s, s)
+	ss, _ := GetStringBank().LoadOrStore(s, s)
 	return ss
 }
 
 // BankFunc will use the provided s string to check for an existing allocation of the string. However,
 // if no allocation exists, the f parameter will be used to create the string and store in the bank.
 func BankFunc(s string, f func() string) string {
-	ss, _ := strings.LoadOrStoreFunc(s, f)
+	ss, _ := GetStringBank().LoadOrStoreFunc(s, f)
 	return ss
 }
 
 func ClearBank() {
-	strings.Clear()
+	GetStringBank().Clear()
 }
 
 // RandSeq generates a pseudo-random alphabetic string of the given length

+ 105 - 4
core/pkg/util/stringutil/stringutil_test.go

@@ -6,6 +6,8 @@ import (
 	"strings"
 	"sync"
 	"testing"
+	"time"
+	"unsafe"
 
 	"github.com/opencost/opencost/core/pkg/util/stringutil"
 )
@@ -46,7 +48,7 @@ func copyString(s string) string {
 	return string([]byte(s))
 }
 
-func generateBenchData(totalStrings, totalUnique int) []string {
+func generateBenchData(totalStrings, totalUnique int) [][]byte {
 	randStrings := make([]string, 0, totalStrings)
 	r := rand.New(rand.NewSource(27644437))
 
@@ -69,7 +71,11 @@ func generateBenchData(totalStrings, totalUnique int) []string {
 	// shuffle the list of strings
 	r.Shuffle(totalStrings, func(i, j int) { randStrings[i], randStrings[j] = randStrings[j], randStrings[i] })
 
-	return randStrings
+	stringBytes := make([][]byte, 0, totalStrings)
+	for _, str := range randStrings {
+		stringBytes = append(stringBytes, []byte(str))
+	}
+	return stringBytes
 }
 
 func benchmarkStringBank(b *testing.B, bt bankTest, totalStrings, totalUnique int, useBankFunc bool) {
@@ -80,10 +86,16 @@ func benchmarkStringBank(b *testing.B, bt bankTest, totalStrings, totalUnique in
 		for i := 0; i < b.N; i++ {
 			b.StartTimer()
 			for bb := 0; bb < totalStrings; bb++ {
+				bytes := randStrings[bb]
+
 				if useBankFunc {
-					bt.BankFunc(randStrings[bb], func() string { return randStrings[bb] })
+					str := unsafe.String(unsafe.SliceData(bytes), len(bytes))
+
+					bt.BankFunc(str, func() string {
+						return string(bytes)
+					})
 				} else {
-					bt.Bank(randStrings[bb])
+					bt.Bank(string(bytes))
 				}
 			}
 			b.StopTimer()
@@ -153,3 +165,92 @@ func BenchmarkStringBankFunc25PercentDuplicate(b *testing.B) {
 func BenchmarkStringBankFuncNoDuplicate(b *testing.B) {
 	benchmarkStringBank(b, standardBankTest, 1_000_000, 1_000_000, true)
 }
+
+const LruCapacity = 500_000
+const LruEvictInterval = 5 * time.Second
+
+func BenchmarkLruStringBankFunc90PercentDuplicate(b *testing.B) {
+	prevBank := stringutil.GetStringBank()
+	defer func() {
+		stringutil.UpdateStringBank(prevBank)
+	}()
+
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 100_000, true)
+}
+
+func BenchmarkLruStringBankFunc75PercentDuplicate(b *testing.B) {
+	prevBank := stringutil.GetStringBank()
+	defer func() {
+		stringutil.UpdateStringBank(prevBank)
+	}()
+
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 250_000, true)
+}
+
+func BenchmarkLruStringBankFunc50PercentDuplicate(b *testing.B) {
+	prevBank := stringutil.GetStringBank()
+	defer func() {
+		stringutil.UpdateStringBank(prevBank)
+	}()
+
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 100_000, true)
+}
+
+func BenchmarkLruStringBankFunc25PercentDuplicate(b *testing.B) {
+	prevBank := stringutil.GetStringBank()
+	defer func() {
+		stringutil.UpdateStringBank(prevBank)
+	}()
+
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 750_000, true)
+}
+
+func BenchmarkLruStringBankFuncNoDuplicate(b *testing.B) {
+	prevBank := stringutil.GetStringBank()
+	defer func() {
+		stringutil.UpdateStringBank(prevBank)
+	}()
+
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 1_000_000, true)
+}

Деякі файли не було показано, через те що забагато файлів було змінено