Просмотр исходного кода

Merge commit 'f024bacc153e5a09f178c0320e67befff832d49f' into memory-tweaks-2

Alex Meijer 2 месяцев назад
Родитель
Сommit
4b4cc2c907

+ 321 - 0
core/pkg/util/monitor/memory/helpers.go

@@ -0,0 +1,321 @@
+package memory
+
+import (
+	"fmt"
+	"math"
+	"slices"
+)
+
+//--------------------------------------------------------------------------
+//  Helper Types
+//--------------------------------------------------------------------------
+
+// trackedValue maintains the state of an uninitialized value, a set value, and
+// the previous value. The previous value is always used when the current value
+// is unset. All values that are set become previous values when the current value
+// changes, or Reset() is called.
+type trackedValue struct {
+	current  *float64
+	previous float64
+}
+
+// newTrackedValue returns a new trackedValue instance for tracking unset, set, and previous
+// values for a float64.
+func newTrackedValue() *trackedValue {
+	return new(trackedValue)
+}
+
+// Value returns the current value if it has not been reset. Otherwise, it returns
+// the previous value
+func (tv *trackedValue) Value() float64 {
+	if tv.current == nil {
+		return tv.previous
+	}
+
+	return *tv.current
+}
+
+// IsSet returns `true` if the current value has been set.
+func (tv *trackedValue) IsSet() bool {
+	return tv.current != nil
+}
+
+// Set updates the current value if it is different. If the vaue is updated, `true` is returned.
+// Otherwise, `false` is returned.
+func (tv *trackedValue) Set(value float64) bool {
+	if tv.current == nil {
+		tv.current = &value
+		return true
+	}
+
+	curr := *tv.current
+	if value != curr {
+		tv.current = &value
+		tv.previous = curr
+		return true
+	}
+
+	return false
+}
+
+// Reset resets the current value to unset, moving it to the previous value if set.
+func (tv *trackedValue) Reset() {
+	if tv.current == nil {
+		return
+	}
+
+	tv.previous = *tv.current
+	tv.current = nil
+}
+
+// Clear resets the value and sets the previous to 0.
+func (tv *trackedValue) Clear() {
+	tv.current = nil
+	tv.previous = 0.0
+}
+
+// The Cumulative Sum (CUSUM) is a statistical process control tool that plots
+// the cumulative sums of deviations from a target mean to detect small, persistent
+// shifts (0.5 to 2 sigma) in process performance quickly.
+type cumulativeSum struct {
+	slack float64
+	sum   float64
+	base  float64
+}
+
+// newCumulativeSum creates a new cumulativeSum instance with the provided slack
+func newCumulativeSum(slack float64) *cumulativeSum {
+	return &cumulativeSum{
+		slack: slack,
+		sum:   0.0,
+		base:  0.0,
+	}
+}
+
+// Calibrate initializes the baseline for the CUSUM. This is generally the mean
+// of the samples once there are enough to consider the sample set as "stable."
+func (cs *cumulativeSum) Calibrate(mean float64) {
+	if cs.base != 0.0 {
+		return
+	}
+
+	cs.base = mean
+}
+
+// Update supplies a new sample to update the internal sum.
+func (cs *cumulativeSum) Update(value float64) {
+	if cs.base == 0.0 {
+		return
+	}
+
+	slack := cs.base * cs.slack
+	cs.sum = max(0, cs.sum+(value-cs.base)-slack)
+}
+
+// Sum returns the current CUSUM value.
+func (cs *cumulativeSum) Sum() float64 {
+	return cs.sum
+}
+
+// IsRecalibrationRequired tests the current CUSUM against the base * thresholdMagnitude.
+// If it has surpassed the magnitude provided, true is returned signalling a recalibration
+// should be performed.
+func (cs *cumulativeSum) IsRecalibrationRequired(thresholdMagnitude float64) bool {
+	if cs.base == 0.0 {
+		return false
+	}
+
+	threshold := cs.base * thresholdMagnitude
+	//fmt.Printf("Testing: %f > %f = %t\n", cs.sum, threshold, cs.sum > threshold)
+	return cs.sum > threshold
+}
+
+func (cs *cumulativeSum) Reset() {
+	cs.base = 0.0
+	cs.sum = 0.0
+}
+
+// exponentialMovingAverage is a helper type that tracks the current moving average
+// value using a providing smoothing factor.
+type exponentialMovingAverage struct {
+	smoothing float64
+	value     float64
+	set       bool
+}
+
+// creates a new exponential moving average instance using the provided smoothing factor
+func newExponentialMovingAverage(smoothing float64) *exponentialMovingAverage {
+	return &exponentialMovingAverage{
+		smoothing: smoothing,
+		set:       false,
+	}
+}
+
+// updates the moving average for the provided sample, and returns the updated
+// value
+func (ema *exponentialMovingAverage) Update(sample float64) float64 {
+	if !ema.set {
+		ema.set = true
+		ema.value = sample
+	} else {
+		ema.value = ema.smoothing*sample + (1.0-ema.smoothing)*ema.value
+	}
+	return ema.value
+}
+
+// The current moving average value
+func (ema *exponentialMovingAverage) Current() float64 {
+	return ema.value
+}
+
+// Resets the moving average calculation
+func (ema *exponentialMovingAverage) Reset() {
+	ema.set = false
+	ema.value = 0.0
+}
+
+// rollingWindow is a ring buffer helper type for tracking a set capacity number of
+// the most recent values. It also provides helper methods for calculating mean,
+// stddev, and percentiles of the contained data.
+type rollingWindow struct {
+	capacity int
+	window   []float64
+	index    int
+}
+
+// creates a new rolling window instance with the provided static capacity.
+func newRollingWindow(capacity int) *rollingWindow {
+	if capacity <= 0 || capacity > (math.MaxInt/2) {
+		panic(fmt.Sprintf("RollingWindow capacity limited to range 1-%d", math.MaxInt/2))
+	}
+
+	return &rollingWindow{
+		capacity: capacity,
+		window:   make([]float64, capacity),
+		index:    0,
+	}
+}
+
+// Pushes a new value into the rolling window, dropping the oldest value if
+// the total length surpasses the capacity.
+func (rw *rollingWindow) Push(value float64) {
+	// advance index, handle overflow
+	index := rw.index % rw.capacity
+	if index < 0 {
+		index = -index
+	}
+
+	rw.window[index] = value
+	rw.index++
+
+	// if we have wrapped all the way back around to 0, just advance the index
+	// by capacity to ensure our Len() algorithm continues to function correctly
+	if rw.index == 0 {
+		rw.index = rw.capacity
+	}
+}
+
+// Clears the rolling window values
+func (rw *rollingWindow) Clear() {
+	rw.window = make([]float64, rw.capacity)
+	rw.index = 0
+}
+
+// The length of the rolling window. Will never be greater that the `Cap()`.
+func (rw *rollingWindow) Len() int {
+	index := rw.index
+	if index < 0 {
+		index = -index
+	}
+
+	return min(index, rw.capacity)
+}
+
+// Cap returns the maximum capacity of the rolling window.
+func (rw *rollingWindow) Cap() int {
+	return rw.capacity
+}
+
+// Each iterates all values within the rolling window and calls `f` passing each value.
+// NOTE: Ordering is _not_ guaranteed!
+func (rw *rollingWindow) Each(f func(float64)) {
+	total := rw.Len()
+	for i := range total {
+		f(rw.window[i])
+	}
+}
+
+// Mean returns the average of the values in the window
+func (rw *rollingWindow) Mean() float64 {
+	length := rw.Len()
+	if length == 0 {
+		return 0.0
+	}
+
+	sum := 0.0
+	for i := range length {
+		sum += rw.window[i]
+	}
+	return sum / float64(length)
+}
+
+// MeanStdDev computes the mean and standard deviation of the window values.
+func (rw *rollingWindow) MeanStdDev() (mean float64, stddev float64) {
+	mean = rw.Mean()
+
+	length := rw.Len()
+	if length < 2 {
+		return mean, 0
+	}
+
+	variance := 0.0
+	for i := range length {
+		d := rw.window[i] - mean
+		variance += d * d
+	}
+
+	// sample variance (Bessel's correction)
+	variance /= float64(length - 1)
+	stddev = math.Sqrt(variance)
+	return
+}
+
+// Percentile computes the p-th percentile of the values currently stored in the
+// rolling window.
+func (rw *rollingWindow) Percentile(p float64) float64 {
+	length := rw.Len()
+	if length == 0 {
+		return 0
+	}
+
+	sorted := make([]float64, length)
+	for i := range length {
+		sorted[i] = rw.window[i]
+	}
+	slices.Sort(sorted)
+
+	if p <= 0 {
+		return sorted[0]
+	}
+	if p >= 100 {
+		return sorted[len(sorted)-1]
+	}
+
+	rank := (p / 100.0) * float64(len(sorted)-1)
+	lo := int(math.Floor(rank))
+	hi := int(math.Ceil(rank))
+	frac := rank - float64(lo)
+
+	return sorted[lo]*(1-frac) + sorted[hi]*frac
+}
+
+// IsConfidenceSatisfied checks the relative margin of error is within the provided
+// `marginPercent` threshold using the provided z-score.
+func (rw *rollingWindow) IsConfidenceSatisfied(z float64, marginPercent float64) bool {
+	mean, stddev := rw.MeanStdDev()
+	length := float64(rw.Len())
+	marginOfError := z * (stddev / math.Sqrt(length))
+	relative := marginOfError / mean
+
+	return relative <= marginPercent
+}

+ 420 - 0
core/pkg/util/monitor/memory/helpers_test.go

@@ -0,0 +1,420 @@
+package memory
+
+import (
+	"math"
+	"testing"
+)
+
+const epsilon = 1e-9
+
+type set[T comparable] struct {
+	m map[T]struct{}
+}
+
+func newSet[T comparable](values ...T) *set[T] {
+	m := make(map[T]struct{})
+	for _, v := range values {
+		m[v] = struct{}{}
+	}
+	return &set[T]{
+		m: m,
+	}
+}
+
+func (s *set[T]) add(value T) {
+	s.m[value] = struct{}{}
+}
+
+func (s *set[T]) has(value T) bool {
+	_, hasValue := s.m[value]
+	return hasValue
+}
+
+func (s *set[T]) remove(value T) {
+	delete(s.m, value)
+}
+
+// -------------------------------------------------------------------------
+//  exponentialMovingAverage tests
+// -------------------------------------------------------------------------
+
+func TestEMA_InitialState(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	if ema.set {
+		t.Error("expected ema.set to be false on creation")
+	}
+	if ema.Current() != 0.0 {
+		t.Errorf("expected initial Current() = 0.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_FirstUpdateSetsValue(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	got := ema.Update(42.0)
+	if got != 42.0 {
+		t.Errorf("expected first Update() = 42.0, got %f", got)
+	}
+	if !ema.set {
+		t.Error("expected ema.set to be true after first update")
+	}
+}
+
+func TestEMA_SubsequentUpdates(t *testing.T) {
+	// With smoothing=0.5: EMA(n) = 0.5*sample + 0.5*EMA(n-1)
+	ema := newExponentialMovingAverage(0.5)
+	ema.Update(10.0)        // value = 10
+	got := ema.Update(20.0) // value = 0.5*20 + 0.5*10 = 15
+	want := 15.0
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected %f, got %f", want, got)
+	}
+}
+
+func TestEMA_SmoothingZero(t *testing.T) {
+	// smoothing=0 means the value never changes after the first sample
+	ema := newExponentialMovingAverage(0.0)
+	ema.Update(5.0)
+	ema.Update(100.0)
+	ema.Update(999.0)
+	if ema.Current() != 5.0 {
+		t.Errorf("expected Current() = 5.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_SmoothingOne(t *testing.T) {
+	// smoothing=1 means the value is always the latest sample
+	ema := newExponentialMovingAverage(1.0)
+	ema.Update(5.0)
+	ema.Update(99.0)
+	if ema.Current() != 99.0 {
+		t.Errorf("expected Current() = 99.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_Reset(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	ema.Update(10.0)
+	ema.Reset()
+	if ema.set {
+		t.Error("expected ema.set to be false after Reset()")
+	}
+	if ema.Current() != 0.0 {
+		t.Errorf("expected Current() = 0.0 after Reset(), got %f", ema.Current())
+	}
+	// First update after reset should treat as a fresh start
+	got := ema.Update(7.0)
+	if got != 7.0 {
+		t.Errorf("expected first Update() after Reset() = 7.0, got %f", got)
+	}
+}
+
+func TestEMA_MultipleUpdates(t *testing.T) {
+	smoothing := 0.3
+	ema := newExponentialMovingAverage(smoothing)
+
+	samples := []float64{10, 20, 30, 40, 50}
+	want := samples[0]
+	for i, s := range samples {
+		got := ema.Update(s)
+		if i == 0 {
+			want = s
+		} else {
+			want = smoothing*s + (1-smoothing)*want
+		}
+		if math.Abs(got-want) > epsilon {
+			t.Errorf("step %d: expected %f, got %f", i, want, got)
+		}
+	}
+}
+
+// -------------------------------------------------------------------------
+//  rollingWindow tests
+// -------------------------------------------------------------------------
+
+func TestRollingWindow_NewPanicsOnBadCapacity(t *testing.T) {
+	cases := []int{0, -1, math.MaxInt}
+	for _, cap := range cases {
+		func() {
+			defer func() {
+				if r := recover(); r == nil {
+					t.Errorf("expected panic for capacity %d", cap)
+				}
+			}()
+			newRollingWindow(cap)
+		}()
+	}
+}
+
+func TestRollingWindow_InitialLen(t *testing.T) {
+	rw := newRollingWindow(5)
+	if rw.Len() != 0 {
+		t.Errorf("expected Len() = 0, got %d", rw.Len())
+	}
+	if rw.Cap() != 5 {
+		t.Errorf("expected Cap() = 5, got %d", rw.Cap())
+	}
+}
+
+func TestRollingWindow_LenGrowsUpToCapacity(t *testing.T) {
+	rw := newRollingWindow(3)
+	rw.Push(1)
+	if rw.Len() != 1 {
+		t.Errorf("expected Len()=1, got %d", rw.Len())
+	}
+	rw.Push(2)
+	if rw.Len() != 2 {
+		t.Errorf("expected Len()=2, got %d", rw.Len())
+	}
+	rw.Push(3)
+	if rw.Len() != 3 {
+		t.Errorf("expected Len()=3, got %d", rw.Len())
+	}
+	// Pushing beyond capacity should not grow Len() past Cap()
+	rw.Push(4)
+	if rw.Len() != 3 {
+		t.Errorf("expected Len()=3 after overflow push, got %d", rw.Len())
+	}
+}
+
+func TestRollingWindow_Clear(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(1)
+	rw.Push(2)
+	rw.Clear()
+	if rw.Len() != 0 {
+		t.Errorf("expected Len()=0 after Clear(), got %d", rw.Len())
+	}
+}
+
+func TestRollingWindow_Mean_Empty(t *testing.T) {
+	rw := newRollingWindow(4)
+	if rw.Mean() != 0.0 {
+		t.Errorf("expected Mean()=0 for empty window, got %f", rw.Mean())
+	}
+}
+
+func TestRollingWindow_Mean_SingleValue(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(7.0)
+	if rw.Mean() != 7.0 {
+		t.Errorf("expected Mean()=7.0, got %f", rw.Mean())
+	}
+}
+
+func TestRollingWindow_Mean_MultipleValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{1, 2, 3, 4, 5} {
+		rw.Push(v)
+	}
+	want := 3.0
+	if math.Abs(rw.Mean()-want) > epsilon {
+		t.Errorf("expected Mean()=%f, got %f", want, rw.Mean())
+	}
+}
+
+func TestRollingWindow_MeanStdDev_SingleValue(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(10.0)
+	mean, stddev := rw.MeanStdDev()
+	if mean != 10.0 {
+		t.Errorf("expected mean=10.0, got %f", mean)
+	}
+	if stddev != 0.0 {
+		t.Errorf("expected stddev=0.0 for single value, got %f", stddev)
+	}
+}
+
+func TestRollingWindow_MeanStdDev_KnownValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{2, 4, 4, 4, 5, 5, 7, 9} {
+		rw.Push(v)
+	}
+	// Window holds only the last 5: [5, 5, 7, 9, 9] — wait, cap=5.
+	// Pushes: index 0=2,1=4,2=4,3=4,4=5 -> wraps: index 0=5,1=7,2=9
+	// Use a simpler known case instead.
+	rw2 := newRollingWindow(4)
+	for _, v := range []float64{10, 20, 30, 40} {
+		rw2.Push(v)
+	}
+	mean, stddev := rw2.MeanStdDev()
+	wantMean := 25.0
+	// sample stddev of {10,20,30,40} = sqrt(((−15)²+(−5)²+(5)²+(15)²)/3) = sqrt(500/3)
+	wantStddev := math.Sqrt(500.0 / 3.0)
+	if math.Abs(mean-wantMean) > epsilon {
+		t.Errorf("expected mean=%f, got %f", wantMean, mean)
+	}
+	if math.Abs(stddev-wantStddev) > epsilon {
+		t.Errorf("expected stddev=%f, got %f", wantStddev, stddev)
+	}
+}
+
+func TestRollingWindow_Percentile_Empty(t *testing.T) {
+	rw := newRollingWindow(4)
+	if rw.Percentile(50) != 0.0 {
+		t.Errorf("expected 0.0 for empty window percentile, got %f", rw.Percentile(50))
+	}
+}
+
+func TestRollingWindow_Percentile_BoundaryValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{3, 1, 4, 1, 5} {
+		rw.Push(v)
+	}
+	if rw.Percentile(0) != 1.0 {
+		t.Errorf("expected p0=1.0, got %f", rw.Percentile(0))
+	}
+	if rw.Percentile(100) != 5.0 {
+		t.Errorf("expected p100=5.0, got %f", rw.Percentile(100))
+	}
+}
+
+func TestRollingWindow_Percentile_Median(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{1, 2, 3, 4, 5} {
+		rw.Push(v)
+	}
+	got := rw.Percentile(50)
+	want := 3.0
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected p50=%f, got %f", want, got)
+	}
+}
+
+func TestRollingWindow_Percentile_Interpolation(t *testing.T) {
+	rw := newRollingWindow(4)
+	for _, v := range []float64{0, 10, 20, 30} {
+		rw.Push(v)
+	}
+	// rank = 0.25 * 3 = 0.75, lo=0(val=0), hi=1(val=10), frac=0.75 => 0*0.25 + 10*0.75 = 7.5
+	got := rw.Percentile(25)
+	want := 7.5
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected p25=%f, got %f", want, got)
+	}
+}
+
+func TestRollingWindow_IsConfidenceSatisfied(t *testing.T) {
+	rw := newRollingWindow(100)
+	// All the same value — stddev=0, margin=0, should always be satisfied
+	for range 100 {
+		rw.Push(50.0)
+	}
+	if !rw.IsConfidenceSatisfied(1.96, 0.05) {
+		t.Error("expected confidence satisfied for zero-variance data")
+	}
+}
+
+func TestRollingWindow_IsConfidenceSatisfied_HighVariance(t *testing.T) {
+	rw := newRollingWindow(10)
+	// High variance: alternating 1 and 1000
+	for i := range 10 {
+		if i%2 == 0 {
+			rw.Push(1.0)
+		} else {
+			rw.Push(1000.0)
+		}
+	}
+	// With high variance and small n, a tight margin should not be satisfied
+	if rw.IsConfidenceSatisfied(1.96, 0.001) {
+		t.Error("expected confidence NOT satisfied for high-variance data with tight margin")
+	}
+}
+
+func TestRollingWindow_Each(t *testing.T) {
+	rw := newRollingWindow(4)
+	for _, v := range []float64{1, 2, 3, 4} {
+		rw.Push(v)
+	}
+	sum := 0.0
+	rw.Each(func(v float64) { sum += v })
+	if math.Abs(sum-10.0) > epsilon {
+		t.Errorf("expected Each() sum=10.0, got %f", sum)
+	}
+}
+
+func TestRollingWindow_OverwritesOldestOnOverflow(t *testing.T) {
+	rw := newRollingWindow(3)
+	rw.Push(1)
+	rw.Push(2)
+	rw.Push(3)
+	rw.Push(100) // Should evict 1, window = [2, 3, 100]
+	mean := rw.Mean()
+	want := (2.0 + 3.0 + 100.0) / 3.0
+	if math.Abs(mean-want) > epsilon {
+		t.Errorf("expected mean=%f after overflow, got %f", want, mean)
+	}
+}
+
+func TestRollingWindow_InternalIndexOverflowWrapToZero(t *testing.T) {
+	capacity := 3
+	rw := newRollingWindow(capacity)
+
+	// Assuming we have pushed ((2 * MaxInt) - 3) elements
+	rw.index = -3
+
+	// cycle back to index=0, ensure our length is still valid
+	rw.Push(1)
+	rw.Push(2)
+	rw.Push(3)
+
+	if rw.Len() != capacity {
+		t.Errorf("expected length=%d after overflowing internal index back to 0. Got %d\n", capacity, rw.Len())
+	}
+
+	// This is because we trick our length algorithm by adding back capacity if the next index == 0
+	if rw.index != capacity {
+		t.Errorf("expected internal index = %d after reaching 0. Got %d\n", capacity, rw.index)
+	}
+}
+
+func TestRollingWindow_InternalIndexOverflow(t *testing.T) {
+	capacity := 3
+	rw := newRollingWindow(capacity)
+
+	// set index to the 0 position relative to max int
+	rw.index = math.MaxInt - 1
+
+	// advance to MaxInt, advance to -MaxInt (overflow), andvance to -MaxInt + 1
+	rw.Push(1)
+	rw.Push(2)
+	rw.Push(3)
+
+	set := newSet(1, 2, 3)
+
+	rw.Each(func(value float64) {
+		v := int(value)
+		if !set.has(v) {
+			t.Errorf("Failed to find value: %d in set.\n", v)
+		}
+
+		set.remove(v)
+	})
+
+	// rewrite
+	rw.Push(4)
+	rw.Push(5)
+	rw.Push(6)
+
+	set = newSet(4, 5, 6)
+	rw.Each(func(value float64) {
+		v := int(value)
+		if !set.has(v) {
+			t.Errorf("Failed to find value: %d in set.\n", v)
+		}
+
+		set.remove(v)
+	})
+
+}
+
+func TestRollingWindow_PartialCapacityMean(t *testing.T) {
+	rw := newRollingWindow(10)
+	for range 5 {
+		rw.Push(5.0)
+	}
+
+	mean := rw.Mean()
+	if mean != 5.0 {
+		t.Errorf("Expected mean = 5.0. Got %f\n", mean)
+	}
+}

+ 227 - 0
core/pkg/util/monitor/memory/memorylimitstats.go

@@ -0,0 +1,227 @@
+package memory
+
+import "sync"
+
+// MemoryLimitConfig contains configuration values used to calculate the soft
+// memory limit based on heap usage over time.
+type MemoryLimitConfig struct {
+	// LimitRatio is the ratio applied to memory limit values calculated. This
+	// is generally set to 90% of the proposed limit. ie: 0.9
+	LimitRatio float64
+
+	// MinSamples is the required number of samples that must be collected before
+	// calculating a memory limit.
+	MinSamples int
+
+	// WindowSize is the total number of smoothed samples to maintain when calculating
+	// the proposed limit. This is generally set based on the timescale in which
+	// samples are added.
+	WindowSize int
+
+	// SmoothingFactor is a value between 0 and 1 which defines how weight importance
+	// should be distributed between a previous average value and the current observation.
+	SmoothingFactor float64
+
+	// BreachWindowSize is the total number of recent raw samples are maintained used for
+	// breach detection. This occurs when the number of samples geather than the current
+	// memory limit exceeds a threshold.
+	BreachWindowSize int
+
+	// BreachThreshold is a limit of raw samples allowed to exceed the memory limit. If this
+	// threshold is reached, the samples are recalibrated.
+	BreachThreshold int
+
+	// CumulativeSumSlack is also known as the K-Factor (drift tolerance) in cumulative sum control
+	// charts uses the allowable slack range in deviations. If the deviations exceed the allowable
+	// slack, then they're used to calculate the sum. This is generally set from 0.5 to 1.0 standard
+	// deviations to filter out process noise.
+	CumulativeSumSlack float64
+
+	// CumulativeSumThreshold is a scaler applied to the "baseline" mean (set once there are enough
+	// samples to be considered "stable"). If the cumulative sum ever surpasses this baseline * threshold,
+	// the samples will be recalibrated.
+	CumulativeSumThreshold float64
+}
+
+// DfaultMemoryLimitConfig creates the recommendded values to use for detecting soft memory limit updates
+func DefaultMemoryLimitConfig() *MemoryLimitConfig {
+	return &MemoryLimitConfig{
+		LimitRatio:             0.90,
+		MinSamples:             30,
+		WindowSize:             60,
+		SmoothingFactor:        0.30,
+		BreachWindowSize:       10,
+		BreachThreshold:        3,
+		CumulativeSumSlack:     0.05,
+		CumulativeSumThreshold: 5.0,
+	}
+}
+
+// MemoryLimitStats is a run-time memory statistics collector that maintains a soft memory limit
+// value based on configurable input parameters. It is designed to adjust the soft limit based on
+// meaningful changes to overall heap allocation, leveraging expontential moving average windows,
+// confidence interval gates, breach detection, and cumulative sum control chart to detect meaningful
+// deviations from the mean.
+type MemoryLimitStats struct {
+	lock   sync.Mutex
+	config *MemoryLimitConfig
+
+	// expontential moving average calculation
+	ema *exponentialMovingAverage
+
+	// ring buffers for tracking exponential moving averages and raw samples
+	window *rollingWindow
+	raw    *rollingWindow
+	breach *rollingWindow
+
+	// cusum calculation for detecting positive shifts in memory usage
+	cusum *cumulativeSum
+
+	// tracked value storage for the soft memory limit proposal which
+	// stores the previous limit as well as the current limit
+	softLimit *trackedValue
+}
+
+// NewMemoryLimitStats creates a new `MemoryLimitStats` instance with the provided
+// `MemoryLimitConfig`. If the provided config is `nil`, then the default configuration
+// values are used.
+func NewMemoryLimitStats(config *MemoryLimitConfig) *MemoryLimitStats {
+	if config == nil {
+		config = DefaultMemoryLimitConfig()
+	}
+
+	return &MemoryLimitStats{
+		config:    config,
+		ema:       newExponentialMovingAverage(config.SmoothingFactor),
+		window:    newRollingWindow(config.WindowSize),
+		raw:       newRollingWindow(config.MinSamples),
+		breach:    newRollingWindow(config.BreachWindowSize),
+		cusum:     newCumulativeSum(config.CumulativeSumSlack),
+		softLimit: newTrackedValue(),
+	}
+}
+
+// Record ingests the total heap memory usage (in bytes), and returns
+// (newSoftLimit, true) when the soft limit has been updated, or
+// (currentSoftLimit, false) when no change occurred.
+//
+// A return value of (0, false) means the monitor is still collecting samples
+// and no limit has been committed yet.
+func (mls *MemoryLimitStats) Record(heapBytes uint64) (softLimit uint64, updated bool) {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	sample := float64(heapBytes)
+	smoothed := mls.ema.Update(sample)
+
+	mls.window.Push(smoothed)
+	mls.raw.Push(sample)
+	mls.breach.Push(sample)
+
+	// Check that the minimum number of sammples exist in the window before
+	// calculating the memory limit
+	totalSamples := mls.window.Len()
+	if totalSamples < mls.config.MinSamples {
+		return uint64(mls.softLimit.Value()), false
+	}
+
+	// NOTE: We could calculate the mean and stddev here, and determine if the data
+	// NOTE: matches a confidence interval, but this might be too strict. See the
+	// NOTE: method: mls.window.IsConfidenceSatisfied(...) method.
+
+	// Pull the P99 sample from the smoothed sample window
+	p99 := mls.window.Percentile(99)
+	candidate := p99 * mls.config.LimitRatio
+
+	// Ensure we've already set a soft limit before running breach
+	// detection or CUSUM deviation tests.
+	if mls.softLimit.IsSet() {
+		if mls.isBreachDetected() {
+			mls.recalibrate()
+			return uint64(mls.softLimit.Value()), false
+		}
+
+		// update cumulative sum and check for recalibration
+		mls.cusum.Update(sample)
+		if mls.cusum.IsRecalibrationRequired(mls.config.CumulativeSumThreshold) {
+			mls.recalibrate()
+			return uint64(mls.softLimit.Value()), false
+		}
+
+		// this will only end up running once after the min samples threshold
+		// is passed, and sets the baseline mean for the cusum calculations
+		mean := mls.raw.Mean()
+		mls.cusum.Calibrate(mean)
+	}
+
+	// update the soft limit to the candidate sample
+	updated = mls.softLimit.Set(candidate)
+	softLimit = uint64(mls.softLimit.Value())
+	return
+}
+
+// SoftLimit returns the current soft limit without recording a sample.
+// Returns 0 if the monitor is still collecting data samples.
+func (mls *MemoryLimitStats) SoftMemoryLimit() uint64 {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	return uint64(mls.softLimit.Value())
+}
+
+// TotalSamples returns the total number of samples _currently_ being used to
+// calculate the memory limit. The samples will reset if a deviation threshold
+// was reached in order to re-establish stability in the data set.
+func (mls *MemoryLimitStats) TotalSamples() int {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	return mls.window.Len()
+}
+
+// Reset clears all state and samples collected.
+func (mls *MemoryLimitStats) Reset() {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	mls.window.Clear()
+	mls.raw.Clear()
+	mls.breach.Clear()
+	mls.softLimit.Clear()
+	mls.ema.Reset()
+	mls.cusum.Reset()
+}
+
+// isBreachDetected iterates through the breach sample window and tallies the
+// total number of samples that exceed the p99 smoothed memory usage sample.
+func (mls *MemoryLimitStats) isBreachDetected() bool {
+	if !mls.softLimit.IsSet() {
+		return false
+	}
+
+	// due to the nature of breach detection, we want to compare
+	// against the smoothed p99 sample, so unroll the ratio
+	p99 := mls.softLimit.Value() / mls.config.LimitRatio
+
+	// Tally the total number of recent raw samples that are
+	// greater than the p99 smoothed sample
+	count := 0
+	mls.breach.Each(func(value float64) {
+		if value > p99 {
+			count++
+		}
+	})
+
+	return count >= mls.config.BreachThreshold
+}
+
+// recalibrate dumps the existing samples and calculations, but will preserve
+// the previous softLimit value until a new soft limit is set.
+func (mls *MemoryLimitStats) recalibrate() {
+	mls.window.Clear()
+	mls.raw.Clear()
+	mls.breach.Clear()
+	mls.ema.Reset()
+	mls.cusum.Reset()
+	mls.softLimit.Reset()
+}

+ 146 - 0
core/pkg/util/monitor/memory/memorylimitstats_test.go

@@ -0,0 +1,146 @@
+package memory_test
+
+import (
+	"fmt"
+	"math/rand"
+	"testing"
+
+	"github.com/mbolt35/bingen-file-loader/core/util/monitor/memory"
+)
+
+func TestObservationMode(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	// Feed fewer than MinSamples — should never return updated=true.
+	for i := 0; i < config.MinSamples-1; i++ {
+		_, updated := m.Record(100 * 1024 * 1024) // 100 MiB
+		if updated {
+			t.Fatalf("sample %d: got updated=true before MinSamples reached", i)
+		}
+	}
+	if got := m.SoftMemoryLimit(); got != 0 {
+		t.Fatalf("expected SoftLimit 0 during observation, got %d", got)
+	}
+}
+
+func TestLimitCommittedAfterMinSamples(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	const alloc = 200 * 1024 * 1024 // 200 MiB, perfectly stable
+	var lastLimit uint64
+	var sawUpdate bool
+
+	for i := 0; i < config.MinSamples+10; i++ {
+		limit, updated := m.Record(alloc)
+		if updated {
+			sawUpdate = true
+			lastLimit = limit
+		}
+	}
+
+	if !sawUpdate {
+		t.Fatal("expected at least one limit update after MinSamples")
+	}
+
+	// Soft limit should be ~90% of the stable allocation.
+	expected := uint64(float64(alloc) * 0.90)
+	delta := int64(lastLimit) - int64(expected)
+	if delta < 0 {
+		delta = -delta
+	}
+	// Allow 1% tolerance.
+	if delta > int64(expected)/100 {
+		t.Fatalf("soft limit %d too far from expected %d (delta %d)", lastLimit, expected, delta)
+	}
+}
+
+func TestElasticRecalibrationOnGrowth(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	config.BreachWindowSize = 5
+	config.BreachThreshold = 3
+	m := memory.NewMemoryLimitStats(config)
+
+	// Phase 1: stable at 100 MiB — establish a limit.
+	for i := 0; i < config.MinSamples+20; i++ {
+		m.Record(100 * 1024 * 1024)
+	}
+	limitBefore := m.SoftMemoryLimit()
+	if limitBefore == 0 {
+		t.Fatal("expected a non-zero limit after phase 1")
+	}
+
+	fmt.Printf("Before Breach...\n")
+
+	// Phase 2: spike to 300 MiB repeatedly — should trigger recalibration.
+	for i := 0; i < config.BreachThreshold+1; i++ {
+		m.Record(500 * 1024 * 1024)
+	}
+
+	fmt.Printf("Crossed Threshold here\n")
+
+	// Phase 3: feed enough samples at new level to re-commit.
+	var recalibrated bool
+	for i := 0; i < config.MinSamples+20; i++ {
+		limit, updated := m.Record(800 * 1024 * 1024)
+		if updated && limit > limitBefore {
+			recalibrated = true
+			break
+		}
+	}
+	if !recalibrated {
+		t.Fatal("expected the soft limit to grow after sustained high usage")
+	}
+}
+
+func TestReset(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	for i := 0; i < config.MinSamples+5; i++ {
+		m.Record(128 * 1024 * 1024)
+	}
+	if m.SoftMemoryLimit() == 0 {
+		t.Fatal("expected non-zero soft limit before reset")
+	}
+
+	m.Reset()
+
+	if m.SoftMemoryLimit() != 0 {
+		t.Fatal("expected zero soft limit after reset")
+	}
+	if m.TotalSamples() != 0 {
+		t.Fatal("expected zero sample count after reset")
+	}
+}
+
+func TestNoisyInputStability(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	rng := rand.New(rand.NewSource(42))
+	base := float64(256 * 1024 * 1024) // 256 MiB
+
+	var limits []uint64
+	for i := 0; i < 200; i++ {
+		// ±10% noise around base
+		noise := (rng.Float64()*0.2 - 0.1) * base
+		_, _ = m.Record(uint64(base + noise))
+		if l := m.SoftMemoryLimit(); l > 0 {
+			limits = append(limits, l)
+		}
+	}
+
+	if len(limits) == 0 {
+		t.Fatal("expected at least one committed limit")
+	}
+
+	// The final limit should be in a sensible range: 75–95% of base.
+	last := limits[len(limits)-1]
+	lo := uint64(base * 0.75)
+	hi := uint64(base * 0.95)
+	if last < lo || last > hi {
+		t.Fatalf("final limit %d outside expected range [%d, %d]", last, lo, hi)
+	}
+}

+ 123 - 0
core/pkg/util/monitor/memorylimiter.go

@@ -0,0 +1,123 @@
+package monitor
+
+import (
+	"fmt"
+	"math"
+	"runtime"
+	"runtime/debug"
+	"sync"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/atomic"
+	"github.com/opencost/opencost/core/pkg/util/monitor/memory"
+)
+
+var (
+	once          sync.Once
+	memoryLimiter *MemoryLimiter
+)
+
+// MemoryLimiter is a heap usage monitor for the go runtime which will attempt to
+// dynamically set a GOMEMLIMIT value to best fit the heap usage. It will only
+// adjust the GOMEMLIMIT if the usage analysis results in an increase, and won't
+// try to "best fit" the current usage. It takes into account the initial GOMEMLIMIT
+// value as the baseline.
+type MemoryLimiter struct {
+	runState atomic.AtomicRunState
+	monitor  *memory.MemoryLimitStats
+}
+
+// Start begins collecting heap allocation samples for automatically adjusting the go soft memory limit
+// for heap usage.
+func (ml *MemoryLimiter) Start(interval time.Duration) error {
+	ml.runState.WaitForReset()
+
+	if !ml.runState.Start() {
+		return fmt.Errorf("memory limiter was already started")
+	}
+
+	// main limiter driver
+	go func() {
+		var memStats runtime.MemStats
+		var prevLimit uint64
+
+		// determine if mem limit was set prior by passing a negative
+		// value to SetMemoryLimit, which will return the current value
+		// without making any changes -- the default is MaxInt64
+		goMemLimit := debug.SetMemoryLimit(-1)
+		if goMemLimit == math.MaxInt64 {
+			prevLimit = 0
+		} else {
+			prevLimit = uint64(goMemLimit)
+		}
+
+		// take initial heap measurement
+		runtime.ReadMemStats(&memStats)
+		ml.monitor.Record(memStats.HeapAlloc)
+
+		for {
+			select {
+			case <-ml.runState.OnStop():
+				ml.runState.Reset()
+				return
+
+			case <-time.After(interval):
+			}
+
+			// in the event that someone updates the limit outside of this monitor
+			// we want to make sure that we synchronize the correct value
+			goMemLimit = debug.SetMemoryLimit(-1)
+			if goMemLimit != math.MaxInt64 && goMemLimit != int64(prevLimit) {
+				prevLimit = uint64(goMemLimit)
+			}
+
+			// record and determine if we should update the memory limit
+			runtime.ReadMemStats(&memStats)
+			if softLimit, updated := ml.monitor.Record(memStats.HeapAlloc); updated {
+				// we only allow the limit to increase for now, as this best reflects a
+				// max stable set of samples. Worth observation and potentially updating
+				// in the future
+				if softLimit != 0 && softLimit > prevLimit {
+					prevLimit = softLimit
+					log.Debugf("Updating Go Memory Limit: %dmb", int64(softLimit/1024.0/1024.0))
+					debug.SetMemoryLimit(int64(softLimit))
+				}
+			}
+		}
+	}()
+
+	return nil
+}
+
+// Stops automatically adjusting the memory limiter
+func (ml *MemoryLimiter) Stop() error {
+	if !ml.runState.Stop() {
+		return fmt.Errorf("could not stop memory limiter - in the state of stopping or already stopped")
+	}
+	return nil
+}
+
+// returns the singleton instance of the memory limiter
+func getMemoryLimiter() *MemoryLimiter {
+	once.Do(func() {
+		config := memory.DefaultMemoryLimitConfig()
+		memoryLimiter = &MemoryLimiter{
+			monitor: memory.NewMemoryLimitStats(config),
+		}
+	})
+
+	return memoryLimiter
+}
+
+// DefaultMemoryLimiterSampleInterval is the sample interval in which the auto limiter
+// gathers heap usage.
+const DefaultMemoryLimiterSampleInterval = time.Second
+
+func StartMemoryLimiter() error {
+	return getMemoryLimiter().Start(DefaultMemoryLimiterSampleInterval)
+}
+
+func StopMemoryLimiter() error {
+	return getMemoryLimiter().Stop()
+}

+ 159 - 0
core/pkg/util/stringutil/lrubank.go

@@ -0,0 +1,159 @@
+package stringutil
+
+import (
+	"container/heap"
+	"sync"
+	"time"
+)
+
+type lruEntry struct {
+	value string
+	used  int64
+}
+type maxHeap []*lruEntry
+
+func (h maxHeap) Len() int           { return len(h) }
+func (h maxHeap) Less(i, j int) bool { return h[i].used > h[j].used } // newer = "larger"
+func (h maxHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
+
+func (h *maxHeap) Push(x any) {
+	*h = append(*h, x.(*lruEntry))
+}
+
+func (h *maxHeap) Pop() any {
+	old := *h
+	n := len(old)
+	x := old[n-1]
+	*h = old[:n-1]
+	return x
+}
+
+func nOldest(arr []*lruEntry, n int) []*lruEntry {
+	if n <= 0 {
+		return []*lruEntry{}
+	}
+
+	if n >= len(arr) {
+		return arr
+	}
+
+	h := maxHeap(arr[:n])
+	heap.Init(&h)
+
+	for _, entry := range arr[n:] {
+		// swap in oldest, re-heapify
+		if entry.used < h[0].used {
+			h[0] = entry
+			heap.Fix(&h, 0)
+		}
+	}
+
+	return []*lruEntry(h)
+}
+
+type lruStringBank struct {
+	lock     sync.Mutex
+	stop     chan struct{}
+	m        map[string]*lruEntry
+	capacity int
+}
+
+func NewLruStringBank(capacity int, evictionInterval time.Duration) StringBank {
+	stop := make(chan struct{})
+	bank := &lruStringBank{
+		m:        make(map[string]*lruEntry),
+		capacity: capacity,
+	}
+
+	go func() {
+		for {
+			select {
+			case <-stop:
+				return
+			case <-time.After(evictionInterval):
+			}
+
+			// need to take the lock during eviction
+			bank.lock.Lock()
+			evict(bank, capacity)
+			bank.lock.Unlock()
+		}
+	}()
+
+	return bank
+}
+
+func evict(bank *lruStringBank, capacity int) {
+	if len(bank.m) <= capacity {
+		return
+	}
+
+	// we collect a list of all lru entries so we can max heap the first n elements
+	arr := make([]*lruEntry, 0, len(bank.m))
+	for _, v := range bank.m {
+		arr = append(arr, v)
+	}
+
+	oldest := nOldest(arr, len(bank.m)-capacity)
+	for _, old := range oldest {
+		delete(bank.m, old.value)
+	}
+}
+
+func (sb *lruStringBank) Stop() {
+	sb.lock.Lock()
+	defer sb.lock.Unlock()
+
+	if sb.stop != nil {
+		close(sb.stop)
+		sb.stop = nil
+	}
+}
+
+func (sb *lruStringBank) LoadOrStore(key, value string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		v.used = time.Now().UnixMilli()
+		sb.lock.Unlock()
+		return v.value, ok
+	}
+
+	sb.m[key] = &lruEntry{
+		value: value,
+		used:  time.Now().UnixMilli(),
+	}
+	if len(sb.m) > (sb.capacity + (sb.capacity / 2)) {
+		evict(sb, sb.capacity)
+	}
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *lruStringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		v.used = time.Now().UnixMilli()
+		sb.lock.Unlock()
+		return v.value, ok
+	}
+
+	// create the key and value using the func (the key could be deallocated later)
+	value := f()
+	sb.m[value] = &lruEntry{
+		value: value,
+		used:  time.Now().UnixMilli(),
+	}
+	if len(sb.m) > (sb.capacity + (sb.capacity / 2)) {
+		evict(sb, sb.capacity)
+	}
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *lruStringBank) Clear() {
+	sb.lock.Lock()
+	sb.m = make(map[string]*lruEntry)
+	sb.lock.Unlock()
+}

+ 408 - 0
core/pkg/util/stringutil/lrubank_test.go

@@ -0,0 +1,408 @@
+package stringutil
+
+import (
+	"fmt"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestBasicLruEvict(t *testing.T) {
+	lruBank := NewLruStringBank(3, 2*time.Second).(*lruStringBank)
+	defer lruBank.Stop()
+
+	lruBank.LoadOrStore("foo", "foo")
+	time.Sleep(500 * time.Millisecond)
+	lruBank.LoadOrStore("bar", "bar")
+	time.Sleep(500 * time.Millisecond)
+	lruBank.LoadOrStore("whaz", "whaz")
+	time.Sleep(500 * time.Millisecond)
+	// access foo, updating recency
+	lruBank.LoadOrStore("foo", "foo")
+	// should push bar out after eviction runs
+	lruBank.LoadOrStore("test", "test")
+	time.Sleep(time.Second)
+
+	lruBank.lock.Lock()
+	for _, v := range lruBank.m {
+		t.Logf("Value: %s\n", v.value)
+		if v.value == "bar" {
+			t.Errorf("The 'bar' entry should've been replaced by 'test'")
+		}
+	}
+	lruBank.lock.Unlock()
+}
+
+// ---------------------------------------------------------------------------
+// LoadOrStore
+// ---------------------------------------------------------------------------
+
+// A stored value must be retrievable and LoadOrStore must signal the hit/miss
+// correctly via the boolean return.
+func TestLoadOrStore_MissAndHit(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	v, loaded := bank.LoadOrStore("hello", "hello")
+	if loaded {
+		t.Errorf("first LoadOrStore: expected loaded=false, got true")
+	}
+	if v != "hello" {
+		t.Errorf("first LoadOrStore: expected value %q, got %q", "hello", v)
+	}
+
+	v, loaded = bank.LoadOrStore("hello", "world")
+	if !loaded {
+		t.Errorf("second LoadOrStore: expected loaded=true, got false")
+	}
+	// The original value must be returned on a hit, not the new candidate.
+	if v != "hello" {
+		t.Errorf("second LoadOrStore: expected cached value %q, got %q", "hello", v)
+	}
+}
+
+// Hitting an existing entry must update its recency so it is not evicted ahead
+// of entries that were never touched again.
+func TestLoadOrStore_HitUpdateRecency(t *testing.T) {
+	bank := NewLruStringBank(2, 500*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	bank.LoadOrStore("old", "old")
+	time.Sleep(100 * time.Millisecond)
+	bank.LoadOrStore("keep", "keep")
+	time.Sleep(100 * time.Millisecond)
+
+	// Re-touch "old" so it becomes the most-recently-used.
+	bank.LoadOrStore("old", "old")
+	time.Sleep(100 * time.Millisecond)
+
+	// Adding a third entry exceeds capacity; "keep" should be the oldest now.
+	bank.LoadOrStore("new", "new")
+
+	// Wait for the eviction goroutine.
+	time.Sleep(600 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if _, ok := bank.m["keep"]; ok {
+		t.Error("expected 'keep' to be evicted but it is still present")
+	}
+	if _, ok := bank.m["old"]; !ok {
+		t.Error("expected 'old' to survive eviction after its recency was refreshed")
+	}
+}
+
+// ---------------------------------------------------------------------------
+// LoadOrStoreFunc
+// ---------------------------------------------------------------------------
+
+// The factory function must only be called on a cache miss, not on a hit.
+func TestLoadOrStoreFunc_FactoryCalledOnMissOnly(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+	calls := 0
+
+	factory := func() string {
+		calls++
+		return "k"
+	}
+
+	bank.LoadOrStoreFunc("k", factory)
+	bank.LoadOrStoreFunc("k", factory)
+
+	if calls != 1 {
+		t.Errorf("factory should be called exactly once, got %d calls", calls)
+	}
+}
+
+// ---------------------------------------------------------------------------
+// Capacity / eviction
+// ---------------------------------------------------------------------------
+
+// If the bank never exceeds capacity, nothing should be evicted.
+func TestEviction_BelowCapacityNoEviction(t *testing.T) {
+	const capacity = 5
+	bank := NewLruStringBank(capacity, 200*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	for i := 0; i < capacity; i++ {
+		bank.LoadOrStore(fmt.Sprintf("v%d", i), fmt.Sprintf("v%d", i))
+	}
+
+	// Wait several eviction cycles.
+	time.Sleep(600 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if got := len(bank.m); got != capacity {
+		t.Errorf("expected %d entries, got %d", capacity, got)
+	}
+}
+
+// After eviction the map must be trimmed down to exactly capacity.
+func TestEviction_ExceedCapacityTrimsToCapacity(t *testing.T) {
+	const capacity = 3
+	bank := NewLruStringBank(capacity, 350*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	for i := 0; i < capacity+3; i++ {
+		bank.LoadOrStore(fmt.Sprintf("v%d", i), fmt.Sprintf("v%d", i))
+		time.Sleep(20 * time.Millisecond) // ensure distinct timestamps
+	}
+
+	// Wait for eviction.
+	time.Sleep(500 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if got := len(bank.m); got > capacity {
+		t.Errorf("expected at most %d entries after eviction, got %d", capacity, got)
+	}
+}
+
+// The most-recently-used entries must survive eviction.
+func TestEviction_MRUSurvives(t *testing.T) {
+	const capacity = 2
+	bank := NewLruStringBank(capacity, 300*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	bank.LoadOrStore("evict1", "evict1")
+	time.Sleep(50 * time.Millisecond)
+	bank.LoadOrStore("evict2", "evict2")
+	time.Sleep(50 * time.Millisecond)
+
+	// These two are the most recent; they must survive.
+	bank.LoadOrStore("keep1", "keep1")
+	time.Sleep(50 * time.Millisecond)
+	bank.LoadOrStore("keep2", "keep2")
+
+	time.Sleep(500 * time.Millisecond)
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	for _, must := range []string{"keep1", "keep2"} {
+		if _, ok := bank.m[must]; !ok {
+			t.Errorf("expected %q to survive eviction", must)
+		}
+	}
+}
+
+// ---------------------------------------------------------------------------
+// Clear
+// ---------------------------------------------------------------------------
+
+func TestClear_EmptiesMap(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	for i := 0; i < 5; i++ {
+		bank.LoadOrStore(fmt.Sprintf("v%d", i), fmt.Sprintf("v%d", i))
+	}
+
+	bank.Clear()
+
+	bank.lock.Lock()
+	defer bank.lock.Unlock()
+
+	if len(bank.m) != 0 {
+		t.Errorf("expected empty map after Clear, got %d entries", len(bank.m))
+	}
+}
+
+// After a Clear, previously stored keys must not be found.
+func TestClear_PreviousKeysGone(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	bank.LoadOrStore("hello", "world")
+	bank.Clear()
+
+	_, loaded := bank.LoadOrStore("hello", "new")
+	if loaded {
+		t.Error("expected key to be absent after Clear, but it was found")
+	}
+}
+
+// ---------------------------------------------------------------------------
+// nOldest helper
+// ---------------------------------------------------------------------------
+
+func TestNOldest_ReturnsCorrectCount(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "a", used: now.Add(-4 * time.Second).UnixMilli()},
+		{value: "b", used: now.Add(-3 * time.Second).UnixMilli()},
+		{value: "c", used: now.Add(-2 * time.Second).UnixMilli()},
+		{value: "d", used: now.Add(-1 * time.Second).UnixMilli()},
+		{value: "e", used: now.UnixMilli()},
+	}
+
+	oldest := nOldest(entries, 2)
+	if len(oldest) != 2 {
+		t.Fatalf("expected 2 oldest entries, got %d", len(oldest))
+	}
+
+	values := map[string]bool{}
+	for _, e := range oldest {
+		values[e.value] = true
+	}
+	for _, must := range []string{"a", "b"} {
+		if !values[must] {
+			t.Errorf("expected %q in oldest set, got %v", must, values)
+		}
+	}
+}
+
+func TestNOldest_NGreaterThanLen(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "x", used: now.UnixMilli()},
+		{value: "y", used: now.Add(-time.Second).UnixMilli()},
+	}
+
+	result := nOldest(entries, 10)
+	if len(result) != 2 {
+		t.Errorf("expected all %d entries when n >= len, got %d", 2, len(result))
+	}
+}
+
+func TestNOldest_NEqualsLen(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "x", used: now.UnixMilli()},
+		{value: "y", used: now.Add(-time.Second).UnixMilli()},
+	}
+
+	result := nOldest(entries, 2)
+	if len(result) != 2 {
+		t.Errorf("expected 2 entries when n == len, got %d", len(result))
+	}
+}
+
+func TestNOldest_NIsZero(t *testing.T) {
+	now := time.Now()
+	entries := []*lruEntry{
+		{value: "x", used: now.UnixMilli()},
+	}
+
+	result := nOldest(entries, 0)
+	if len(result) != 0 {
+		t.Errorf("expected 0 entries when n=0, got %d", len(result))
+	}
+}
+
+// ---------------------------------------------------------------------------
+// Concurrency
+// ---------------------------------------------------------------------------
+
+// Concurrent LoadOrStore calls must not race or panic.
+func TestConcurrentLoadOrStore(t *testing.T) {
+	bank := NewLruStringBank(50, 100*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	const goroutines = 20
+	const opsEach = 100
+
+	var wg sync.WaitGroup
+	for i := 0; i < goroutines; i++ {
+		g := i
+		wg.Go(func() {
+			for i := 0; i < opsEach; i++ {
+				key := fmt.Sprintf("k%d", (g*opsEach+i)%30)
+				bank.LoadOrStore(key, key)
+			}
+		})
+	}
+
+	waiter := func() chan struct{} {
+		st := make(chan struct{})
+
+		go func() {
+			wg.Wait()
+			close(st)
+		}()
+
+		return st
+	}
+
+	select {
+	case <-waiter():
+		t.Logf("Completed Successfully\n")
+	case <-time.After(10 * time.Second):
+		t.Logf("Timed out\n")
+	}
+}
+
+// Concurrent calls interleaved with eviction cycles must not deadlock or race.
+func TestConcurrentLoadOrStoreWithEviction(t *testing.T) {
+	bank := NewLruStringBank(5, 50*time.Millisecond).(*lruStringBank)
+	defer bank.Stop()
+
+	const goroutines = 10
+	const duration = 300 * time.Millisecond
+
+	var wg sync.WaitGroup
+
+	for i := 0; i < goroutines; i++ {
+		g := i
+		stop := time.After(duration)
+
+		wg.Go(func() {
+			for {
+				select {
+				case <-stop:
+					return
+				default:
+					key := fmt.Sprintf("g%d", g)
+					bank.LoadOrStore(key, key)
+				}
+			}
+		})
+	}
+
+	waiter := func() chan struct{} {
+		st := make(chan struct{})
+
+		go func() {
+			wg.Wait()
+			close(st)
+		}()
+
+		return st
+	}
+
+	select {
+	case <-waiter():
+		t.Logf("Completed Successfully\n")
+	case <-time.After(10 * time.Second):
+		t.Logf("Timed out\n")
+	}
+}
+
+// Concurrent Clear calls alongside reads/writes must not panic.
+func TestConcurrentClear(t *testing.T) {
+	bank := NewLruStringBank(10, time.Minute).(*lruStringBank)
+	defer bank.Stop()
+
+	var wg sync.WaitGroup
+	for i := 0; i < 5; i++ {
+		wg.Add(1)
+		go func(i int) {
+			defer wg.Done()
+			bank.LoadOrStore(fmt.Sprintf("k%d", i), "v")
+		}(i)
+	}
+	for i := 0; i < 3; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			bank.Clear()
+		}()
+	}
+
+	wg.Wait()
+}

+ 48 - 0
core/pkg/util/stringutil/mapbank.go

@@ -0,0 +1,48 @@
+package stringutil
+
+import "sync"
+
+type stringBank struct {
+	lock sync.Mutex
+	m    map[string]string
+}
+
+func NewStringBank() StringBank {
+	return &stringBank{
+		m: make(map[string]string),
+	}
+}
+
+func (sb *stringBank) LoadOrStore(key, value string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		sb.lock.Unlock()
+		return v, ok
+	}
+
+	sb.m[value] = value
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *stringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
+	sb.lock.Lock()
+
+	if v, ok := sb.m[key]; ok {
+		sb.lock.Unlock()
+		return v, ok
+	}
+
+	// create the key and value using the func (the key could be deallocated later)
+	value := f()
+	sb.m[value] = value
+	sb.lock.Unlock()
+	return value, false
+}
+
+func (sb *stringBank) Clear() {
+	sb.lock.Lock()
+	sb.m = make(map[string]string)
+	sb.lock.Unlock()
+}

+ 17 - 0
core/pkg/util/stringutil/noopbank.go

@@ -0,0 +1,17 @@
+package stringutil
+
+type noOpStringBank struct{}
+
+func NewNoOpStringBank() StringBank {
+	return new(noOpStringBank)
+}
+
+func (nsb *noOpStringBank) LoadOrStore(key, value string) (string, bool) {
+	return value, true
+}
+
+func (nsb *noOpStringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
+	return f(), true
+}
+
+func (nsb *noOpStringBank) Clear() {}

+ 27 - 44
core/pkg/util/stringutil/stringutil.go

@@ -23,75 +23,58 @@ const (
 var alpha = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
 var alphanumeric = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
 
-type stringBank struct {
-	lock sync.Mutex
-	m    map[string]string
+type StringBank interface {
+	LoadOrStore(key, value string) (string, bool)
+	LoadOrStoreFunc(key string, f func() string) (string, bool)
+	Clear()
 }
 
-func newStringBank() *stringBank {
-	return &stringBank{
-		m: make(map[string]string),
-	}
-}
+var (
+	lock sync.RWMutex
 
-func (sb *stringBank) LoadOrStore(key, value string) (string, bool) {
-	sb.lock.Lock()
-
-	if v, ok := sb.m[key]; ok {
-		sb.lock.Unlock()
-		return v, ok
-	}
+	// stringBank is an unbounded string cache that is thread-safe. It is especially useful if
+	// storing a large frequency of dynamically allocated duplicate strings.
+	strings StringBank = NewStringBank()
+)
 
-	sb.m[key] = value
-	sb.lock.Unlock()
-	return value, false
+func init() {
+	rand.Seed(time.Now().UnixNano())
 }
 
-func (sb *stringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
-	sb.lock.Lock()
+func UpdateStringBank(sb StringBank) {
+	lock.Lock()
+	defer lock.Unlock()
 
-	if v, ok := sb.m[key]; ok {
-		sb.lock.Unlock()
-		return v, ok
-	}
-
-	// create the key and value using the func (the key could be deallocated later)
-	value := f()
-	sb.m[value] = value
-	sb.lock.Unlock()
-	return value, false
-}
-
-func (sb *stringBank) Clear() {
-	sb.lock.Lock()
-	sb.m = make(map[string]string)
-	sb.lock.Unlock()
+	strings.Clear()
+	strings = sb
 }
 
-// stringBank is an unbounded string cache that is thread-safe. It is especially useful if
-// storing a large frequency of dynamically allocated duplicate strings.
-var strings = newStringBank() // sync.Map
+// GetStringBank returns the _current_ StringBank implementation. Note that the read-lock is
+// not held for the duration of usage, so the returned string bank could be swapped out
+// after being retrieved.
+func GetStringBank() StringBank {
+	lock.RLock()
+	defer lock.RUnlock()
 
-func init() {
-	rand.Seed(time.Now().UnixNano())
+	return strings
 }
 
 // Bank will return a non-copy of a string if it has been used before. Otherwise, it will store
 // the string as the unique instance.
 func Bank(s string) string {
-	ss, _ := strings.LoadOrStore(s, s)
+	ss, _ := GetStringBank().LoadOrStore(s, s)
 	return ss
 }
 
 // BankFunc will use the provided s string to check for an existing allocation of the string. However,
 // if no allocation exists, the f parameter will be used to create the string and store in the bank.
 func BankFunc(s string, f func() string) string {
-	ss, _ := strings.LoadOrStoreFunc(s, f)
+	ss, _ := GetStringBank().LoadOrStoreFunc(s, f)
 	return ss
 }
 
 func ClearBank() {
-	strings.Clear()
+	GetStringBank().Clear()
 }
 
 // RandSeq generates a pseudo-random alphabetic string of the given length

+ 79 - 4
core/pkg/util/stringutil/stringutil_test.go

@@ -6,6 +6,8 @@ import (
 	"strings"
 	"sync"
 	"testing"
+	"time"
+	"unsafe"
 
 	"github.com/opencost/opencost/core/pkg/util/stringutil"
 )
@@ -46,7 +48,7 @@ func copyString(s string) string {
 	return string([]byte(s))
 }
 
-func generateBenchData(totalStrings, totalUnique int) []string {
+func generateBenchData(totalStrings, totalUnique int) [][]byte {
 	randStrings := make([]string, 0, totalStrings)
 	r := rand.New(rand.NewSource(27644437))
 
@@ -69,7 +71,11 @@ func generateBenchData(totalStrings, totalUnique int) []string {
 	// shuffle the list of strings
 	r.Shuffle(totalStrings, func(i, j int) { randStrings[i], randStrings[j] = randStrings[j], randStrings[i] })
 
-	return randStrings
+	stringBytes := make([][]byte, 0, totalStrings)
+	for _, str := range randStrings {
+		stringBytes = append(stringBytes, []byte(str))
+	}
+	return stringBytes
 }
 
 func benchmarkStringBank(b *testing.B, bt bankTest, totalStrings, totalUnique int, useBankFunc bool) {
@@ -80,10 +86,16 @@ func benchmarkStringBank(b *testing.B, bt bankTest, totalStrings, totalUnique in
 		for i := 0; i < b.N; i++ {
 			b.StartTimer()
 			for bb := 0; bb < totalStrings; bb++ {
+				bytes := randStrings[bb]
+
 				if useBankFunc {
-					bt.BankFunc(randStrings[bb], func() string { return randStrings[bb] })
+					str := unsafe.String(unsafe.SliceData(bytes), len(bytes))
+
+					bt.BankFunc(str, func() string {
+						return string(bytes)
+					})
 				} else {
-					bt.Bank(randStrings[bb])
+					bt.Bank(string(bytes))
 				}
 			}
 			b.StopTimer()
@@ -153,3 +165,66 @@ func BenchmarkStringBankFunc25PercentDuplicate(b *testing.B) {
 func BenchmarkStringBankFuncNoDuplicate(b *testing.B) {
 	benchmarkStringBank(b, standardBankTest, 1_000_000, 1_000_000, true)
 }
+
+const LruCapacity = 500_000
+const LruEvictInterval = 5 * time.Second
+
+func BenchmarkLruStringBankFunc90PercentDuplicate(b *testing.B) {
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 100_000, true)
+}
+
+func BenchmarkLruStringBankFunc75PercentDuplicate(b *testing.B) {
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 250_000, true)
+}
+
+func BenchmarkLruStringBankFunc50PercentDuplicate(b *testing.B) {
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 100_000, true)
+}
+
+func BenchmarkLruStringBankFunc25PercentDuplicate(b *testing.B) {
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 750_000, true)
+}
+
+func BenchmarkLruStringBankFuncNoDuplicate(b *testing.B) {
+	sb := stringutil.NewLruStringBank(LruCapacity, LruEvictInterval)
+	defer func() {
+		if lruBank, ok := sb.(interface{ Stop() }); ok {
+			lruBank.Stop()
+		}
+	}()
+
+	stringutil.UpdateStringBank(sb)
+	benchmarkStringBank(b, standardBankTest, 1_000_000, 1_000_000, true)
+}