Просмотр исходного кода

Add automated memory limitter to try and monitor and adjust the GOMEMLIMIT soft limit real time

Matt Bolt 1 месяц назад
Родитель
Сommit
f024bacc15

+ 321 - 0
core/pkg/util/monitor/memory/helpers.go

@@ -0,0 +1,321 @@
+package memory
+
+import (
+	"fmt"
+	"math"
+	"slices"
+)
+
+//--------------------------------------------------------------------------
+//  Helper Types
+//--------------------------------------------------------------------------
+
+// trackedValue maintains the state of an uninitialized value, a set value, and
+// the previous value. The previous value is always used when the current value
+// is unset. All values that are set become previous values when the current value
+// changes, or Reset() is called.
+type trackedValue struct {
+	current  *float64
+	previous float64
+}
+
+// newTrackedValue returns a new trackedValue instance for tracking unset, set, and previous
+// values for a float64.
+func newTrackedValue() *trackedValue {
+	return new(trackedValue)
+}
+
+// Value returns the current value if it has not been reset. Otherwise, it returns
+// the previous value
+func (tv *trackedValue) Value() float64 {
+	if tv.current == nil {
+		return tv.previous
+	}
+
+	return *tv.current
+}
+
+// IsSet returns `true` if the current value has been set.
+func (tv *trackedValue) IsSet() bool {
+	return tv.current != nil
+}
+
+// Set updates the current value if it is different. If the vaue is updated, `true` is returned.
+// Otherwise, `false` is returned.
+func (tv *trackedValue) Set(value float64) bool {
+	if tv.current == nil {
+		tv.current = &value
+		return true
+	}
+
+	curr := *tv.current
+	if value != curr {
+		tv.current = &value
+		tv.previous = curr
+		return true
+	}
+
+	return false
+}
+
+// Reset resets the current value to unset, moving it to the previous value if set.
+func (tv *trackedValue) Reset() {
+	if tv.current == nil {
+		return
+	}
+
+	tv.previous = *tv.current
+	tv.current = nil
+}
+
+// Clear resets the value and sets the previous to 0.
+func (tv *trackedValue) Clear() {
+	tv.current = nil
+	tv.previous = 0.0
+}
+
+// The Cumulative Sum (CUSUM) is a statistical process control tool that plots
+// the cumulative sums of deviations from a target mean to detect small, persistent
+// shifts (0.5 to 2 sigma) in process performance quickly.
+type cumulativeSum struct {
+	slack float64
+	sum   float64
+	base  float64
+}
+
+// newCumulativeSum creates a new cumulativeSum instance with the provided slack
+func newCumulativeSum(slack float64) *cumulativeSum {
+	return &cumulativeSum{
+		slack: slack,
+		sum:   0.0,
+		base:  0.0,
+	}
+}
+
+// Calibrate initializes the baseline for the CUSUM. This is generally the mean
+// of the samples once there are enough to consider the sample set as "stable."
+func (cs *cumulativeSum) Calibrate(mean float64) {
+	if cs.base != 0.0 {
+		return
+	}
+
+	cs.base = mean
+}
+
+// Update supplies a new sample to update the internal sum.
+func (cs *cumulativeSum) Update(value float64) {
+	if cs.base == 0.0 {
+		return
+	}
+
+	slack := cs.base * cs.slack
+	cs.sum = max(0, cs.sum+(value-cs.base)-slack)
+}
+
+// Sum returns the current CUSUM value.
+func (cs *cumulativeSum) Sum() float64 {
+	return cs.sum
+}
+
+// IsRecalibrationRequired tests the current CUSUM against the base * thresholdMagnitude.
+// If it has surpassed the magnitude provided, true is returned signalling a recalibration
+// should be performed.
+func (cs *cumulativeSum) IsRecalibrationRequired(thresholdMagnitude float64) bool {
+	if cs.base == 0.0 {
+		return false
+	}
+
+	threshold := cs.base * thresholdMagnitude
+	//fmt.Printf("Testing: %f > %f = %t\n", cs.sum, threshold, cs.sum > threshold)
+	return cs.sum > threshold
+}
+
+func (cs *cumulativeSum) Reset() {
+	cs.base = 0.0
+	cs.sum = 0.0
+}
+
+// exponentialMovingAverage is a helper type that tracks the current moving average
+// value using a providing smoothing factor.
+type exponentialMovingAverage struct {
+	smoothing float64
+	value     float64
+	set       bool
+}
+
+// creates a new exponential moving average instance using the provided smoothing factor
+func newExponentialMovingAverage(smoothing float64) *exponentialMovingAverage {
+	return &exponentialMovingAverage{
+		smoothing: smoothing,
+		set:       false,
+	}
+}
+
+// updates the moving average for the provided sample, and returns the updated
+// value
+func (ema *exponentialMovingAverage) Update(sample float64) float64 {
+	if !ema.set {
+		ema.set = true
+		ema.value = sample
+	} else {
+		ema.value = ema.smoothing*sample + (1.0-ema.smoothing)*ema.value
+	}
+	return ema.value
+}
+
+// The current moving average value
+func (ema *exponentialMovingAverage) Current() float64 {
+	return ema.value
+}
+
+// Resets the moving average calculation
+func (ema *exponentialMovingAverage) Reset() {
+	ema.set = false
+	ema.value = 0.0
+}
+
+// rollingWindow is a ring buffer helper type for tracking a set capacity number of
+// the most recent values. It also provides helper methods for calculating mean,
+// stddev, and percentiles of the contained data.
+type rollingWindow struct {
+	capacity int
+	window   []float64
+	index    int
+}
+
+// creates a new rolling window instance with the provided static capacity.
+func newRollingWindow(capacity int) *rollingWindow {
+	if capacity <= 0 || capacity > (math.MaxInt/2) {
+		panic(fmt.Sprintf("RollingWindow capacity limited to range 1-%d", math.MaxInt/2))
+	}
+
+	return &rollingWindow{
+		capacity: capacity,
+		window:   make([]float64, capacity),
+		index:    0,
+	}
+}
+
+// Pushes a new value into the rolling window, dropping the oldest value if
+// the total length surpasses the capacity.
+func (rw *rollingWindow) Push(value float64) {
+	// advance index, handle overflow
+	index := rw.index % rw.capacity
+	if index < 0 {
+		index = -index
+	}
+
+	rw.window[index] = value
+	rw.index++
+
+	// if we have wrapped all the way back around to 0, just advance the index
+	// by capacity to ensure our Len() algorithm continues to function correctly
+	if rw.index == 0 {
+		rw.index = rw.capacity
+	}
+}
+
+// Clears the rolling window values
+func (rw *rollingWindow) Clear() {
+	rw.window = make([]float64, rw.capacity)
+	rw.index = 0
+}
+
+// The length of the rolling window. Will never be greater that the `Cap()`.
+func (rw *rollingWindow) Len() int {
+	index := rw.index
+	if index < 0 {
+		index = -index
+	}
+
+	return min(index, rw.capacity)
+}
+
+// Cap returns the maximum capacity of the rolling window.
+func (rw *rollingWindow) Cap() int {
+	return rw.capacity
+}
+
+// Each iterates all values within the rolling window and calls `f` passing each value.
+// NOTE: Ordering is _not_ guaranteed!
+func (rw *rollingWindow) Each(f func(float64)) {
+	total := rw.Len()
+	for i := range total {
+		f(rw.window[i])
+	}
+}
+
+// Mean returns the average of the values in the window
+func (rw *rollingWindow) Mean() float64 {
+	length := rw.Len()
+	if length == 0 {
+		return 0.0
+	}
+
+	sum := 0.0
+	for i := range length {
+		sum += rw.window[i]
+	}
+	return sum / float64(length)
+}
+
+// MeanStdDev computes the mean and standard deviation of the window values.
+func (rw *rollingWindow) MeanStdDev() (mean float64, stddev float64) {
+	mean = rw.Mean()
+
+	length := rw.Len()
+	if length < 2 {
+		return mean, 0
+	}
+
+	variance := 0.0
+	for i := range length {
+		d := rw.window[i] - mean
+		variance += d * d
+	}
+
+	// sample variance (Bessel's correction)
+	variance /= float64(length - 1)
+	stddev = math.Sqrt(variance)
+	return
+}
+
+// Percentile computes the p-th percentile of the values currently stored in the
+// rolling window.
+func (rw *rollingWindow) Percentile(p float64) float64 {
+	length := rw.Len()
+	if length == 0 {
+		return 0
+	}
+
+	sorted := make([]float64, length)
+	for i := range length {
+		sorted[i] = rw.window[i]
+	}
+	slices.Sort(sorted)
+
+	if p <= 0 {
+		return sorted[0]
+	}
+	if p >= 100 {
+		return sorted[len(sorted)-1]
+	}
+
+	rank := (p / 100.0) * float64(len(sorted)-1)
+	lo := int(math.Floor(rank))
+	hi := int(math.Ceil(rank))
+	frac := rank - float64(lo)
+
+	return sorted[lo]*(1-frac) + sorted[hi]*frac
+}
+
+// IsConfidenceSatisfied checks the relative margin of error is within the provided
+// `marginPercent` threshold using the provided z-score.
+func (rw *rollingWindow) IsConfidenceSatisfied(z float64, marginPercent float64) bool {
+	mean, stddev := rw.MeanStdDev()
+	length := float64(rw.Len())
+	marginOfError := z * (stddev / math.Sqrt(length))
+	relative := marginOfError / mean
+
+	return relative <= marginPercent
+}

+ 420 - 0
core/pkg/util/monitor/memory/helpers_test.go

@@ -0,0 +1,420 @@
+package memory
+
+import (
+	"math"
+	"testing"
+)
+
+const epsilon = 1e-9
+
+type set[T comparable] struct {
+	m map[T]struct{}
+}
+
+func newSet[T comparable](values ...T) *set[T] {
+	m := make(map[T]struct{})
+	for _, v := range values {
+		m[v] = struct{}{}
+	}
+	return &set[T]{
+		m: m,
+	}
+}
+
+func (s *set[T]) add(value T) {
+	s.m[value] = struct{}{}
+}
+
+func (s *set[T]) has(value T) bool {
+	_, hasValue := s.m[value]
+	return hasValue
+}
+
+func (s *set[T]) remove(value T) {
+	delete(s.m, value)
+}
+
+// -------------------------------------------------------------------------
+//  exponentialMovingAverage tests
+// -------------------------------------------------------------------------
+
+func TestEMA_InitialState(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	if ema.set {
+		t.Error("expected ema.set to be false on creation")
+	}
+	if ema.Current() != 0.0 {
+		t.Errorf("expected initial Current() = 0.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_FirstUpdateSetsValue(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	got := ema.Update(42.0)
+	if got != 42.0 {
+		t.Errorf("expected first Update() = 42.0, got %f", got)
+	}
+	if !ema.set {
+		t.Error("expected ema.set to be true after first update")
+	}
+}
+
+func TestEMA_SubsequentUpdates(t *testing.T) {
+	// With smoothing=0.5: EMA(n) = 0.5*sample + 0.5*EMA(n-1)
+	ema := newExponentialMovingAverage(0.5)
+	ema.Update(10.0)        // value = 10
+	got := ema.Update(20.0) // value = 0.5*20 + 0.5*10 = 15
+	want := 15.0
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected %f, got %f", want, got)
+	}
+}
+
+func TestEMA_SmoothingZero(t *testing.T) {
+	// smoothing=0 means the value never changes after the first sample
+	ema := newExponentialMovingAverage(0.0)
+	ema.Update(5.0)
+	ema.Update(100.0)
+	ema.Update(999.0)
+	if ema.Current() != 5.0 {
+		t.Errorf("expected Current() = 5.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_SmoothingOne(t *testing.T) {
+	// smoothing=1 means the value is always the latest sample
+	ema := newExponentialMovingAverage(1.0)
+	ema.Update(5.0)
+	ema.Update(99.0)
+	if ema.Current() != 99.0 {
+		t.Errorf("expected Current() = 99.0, got %f", ema.Current())
+	}
+}
+
+func TestEMA_Reset(t *testing.T) {
+	ema := newExponentialMovingAverage(0.5)
+	ema.Update(10.0)
+	ema.Reset()
+	if ema.set {
+		t.Error("expected ema.set to be false after Reset()")
+	}
+	if ema.Current() != 0.0 {
+		t.Errorf("expected Current() = 0.0 after Reset(), got %f", ema.Current())
+	}
+	// First update after reset should treat as a fresh start
+	got := ema.Update(7.0)
+	if got != 7.0 {
+		t.Errorf("expected first Update() after Reset() = 7.0, got %f", got)
+	}
+}
+
+func TestEMA_MultipleUpdates(t *testing.T) {
+	smoothing := 0.3
+	ema := newExponentialMovingAverage(smoothing)
+
+	samples := []float64{10, 20, 30, 40, 50}
+	want := samples[0]
+	for i, s := range samples {
+		got := ema.Update(s)
+		if i == 0 {
+			want = s
+		} else {
+			want = smoothing*s + (1-smoothing)*want
+		}
+		if math.Abs(got-want) > epsilon {
+			t.Errorf("step %d: expected %f, got %f", i, want, got)
+		}
+	}
+}
+
+// -------------------------------------------------------------------------
+//  rollingWindow tests
+// -------------------------------------------------------------------------
+
+func TestRollingWindow_NewPanicsOnBadCapacity(t *testing.T) {
+	cases := []int{0, -1, math.MaxInt}
+	for _, cap := range cases {
+		func() {
+			defer func() {
+				if r := recover(); r == nil {
+					t.Errorf("expected panic for capacity %d", cap)
+				}
+			}()
+			newRollingWindow(cap)
+		}()
+	}
+}
+
+func TestRollingWindow_InitialLen(t *testing.T) {
+	rw := newRollingWindow(5)
+	if rw.Len() != 0 {
+		t.Errorf("expected Len() = 0, got %d", rw.Len())
+	}
+	if rw.Cap() != 5 {
+		t.Errorf("expected Cap() = 5, got %d", rw.Cap())
+	}
+}
+
+func TestRollingWindow_LenGrowsUpToCapacity(t *testing.T) {
+	rw := newRollingWindow(3)
+	rw.Push(1)
+	if rw.Len() != 1 {
+		t.Errorf("expected Len()=1, got %d", rw.Len())
+	}
+	rw.Push(2)
+	if rw.Len() != 2 {
+		t.Errorf("expected Len()=2, got %d", rw.Len())
+	}
+	rw.Push(3)
+	if rw.Len() != 3 {
+		t.Errorf("expected Len()=3, got %d", rw.Len())
+	}
+	// Pushing beyond capacity should not grow Len() past Cap()
+	rw.Push(4)
+	if rw.Len() != 3 {
+		t.Errorf("expected Len()=3 after overflow push, got %d", rw.Len())
+	}
+}
+
+func TestRollingWindow_Clear(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(1)
+	rw.Push(2)
+	rw.Clear()
+	if rw.Len() != 0 {
+		t.Errorf("expected Len()=0 after Clear(), got %d", rw.Len())
+	}
+}
+
+func TestRollingWindow_Mean_Empty(t *testing.T) {
+	rw := newRollingWindow(4)
+	if rw.Mean() != 0.0 {
+		t.Errorf("expected Mean()=0 for empty window, got %f", rw.Mean())
+	}
+}
+
+func TestRollingWindow_Mean_SingleValue(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(7.0)
+	if rw.Mean() != 7.0 {
+		t.Errorf("expected Mean()=7.0, got %f", rw.Mean())
+	}
+}
+
+func TestRollingWindow_Mean_MultipleValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{1, 2, 3, 4, 5} {
+		rw.Push(v)
+	}
+	want := 3.0
+	if math.Abs(rw.Mean()-want) > epsilon {
+		t.Errorf("expected Mean()=%f, got %f", want, rw.Mean())
+	}
+}
+
+func TestRollingWindow_MeanStdDev_SingleValue(t *testing.T) {
+	rw := newRollingWindow(4)
+	rw.Push(10.0)
+	mean, stddev := rw.MeanStdDev()
+	if mean != 10.0 {
+		t.Errorf("expected mean=10.0, got %f", mean)
+	}
+	if stddev != 0.0 {
+		t.Errorf("expected stddev=0.0 for single value, got %f", stddev)
+	}
+}
+
+func TestRollingWindow_MeanStdDev_KnownValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{2, 4, 4, 4, 5, 5, 7, 9} {
+		rw.Push(v)
+	}
+	// Window holds only the last 5: [5, 5, 7, 9, 9] — wait, cap=5.
+	// Pushes: index 0=2,1=4,2=4,3=4,4=5 -> wraps: index 0=5,1=7,2=9
+	// Use a simpler known case instead.
+	rw2 := newRollingWindow(4)
+	for _, v := range []float64{10, 20, 30, 40} {
+		rw2.Push(v)
+	}
+	mean, stddev := rw2.MeanStdDev()
+	wantMean := 25.0
+	// sample stddev of {10,20,30,40} = sqrt(((−15)²+(−5)²+(5)²+(15)²)/3) = sqrt(500/3)
+	wantStddev := math.Sqrt(500.0 / 3.0)
+	if math.Abs(mean-wantMean) > epsilon {
+		t.Errorf("expected mean=%f, got %f", wantMean, mean)
+	}
+	if math.Abs(stddev-wantStddev) > epsilon {
+		t.Errorf("expected stddev=%f, got %f", wantStddev, stddev)
+	}
+}
+
+func TestRollingWindow_Percentile_Empty(t *testing.T) {
+	rw := newRollingWindow(4)
+	if rw.Percentile(50) != 0.0 {
+		t.Errorf("expected 0.0 for empty window percentile, got %f", rw.Percentile(50))
+	}
+}
+
+func TestRollingWindow_Percentile_BoundaryValues(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{3, 1, 4, 1, 5} {
+		rw.Push(v)
+	}
+	if rw.Percentile(0) != 1.0 {
+		t.Errorf("expected p0=1.0, got %f", rw.Percentile(0))
+	}
+	if rw.Percentile(100) != 5.0 {
+		t.Errorf("expected p100=5.0, got %f", rw.Percentile(100))
+	}
+}
+
+func TestRollingWindow_Percentile_Median(t *testing.T) {
+	rw := newRollingWindow(5)
+	for _, v := range []float64{1, 2, 3, 4, 5} {
+		rw.Push(v)
+	}
+	got := rw.Percentile(50)
+	want := 3.0
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected p50=%f, got %f", want, got)
+	}
+}
+
+func TestRollingWindow_Percentile_Interpolation(t *testing.T) {
+	rw := newRollingWindow(4)
+	for _, v := range []float64{0, 10, 20, 30} {
+		rw.Push(v)
+	}
+	// rank = 0.25 * 3 = 0.75, lo=0(val=0), hi=1(val=10), frac=0.75 => 0*0.25 + 10*0.75 = 7.5
+	got := rw.Percentile(25)
+	want := 7.5
+	if math.Abs(got-want) > epsilon {
+		t.Errorf("expected p25=%f, got %f", want, got)
+	}
+}
+
+func TestRollingWindow_IsConfidenceSatisfied(t *testing.T) {
+	rw := newRollingWindow(100)
+	// All the same value — stddev=0, margin=0, should always be satisfied
+	for range 100 {
+		rw.Push(50.0)
+	}
+	if !rw.IsConfidenceSatisfied(1.96, 0.05) {
+		t.Error("expected confidence satisfied for zero-variance data")
+	}
+}
+
+func TestRollingWindow_IsConfidenceSatisfied_HighVariance(t *testing.T) {
+	rw := newRollingWindow(10)
+	// High variance: alternating 1 and 1000
+	for i := range 10 {
+		if i%2 == 0 {
+			rw.Push(1.0)
+		} else {
+			rw.Push(1000.0)
+		}
+	}
+	// With high variance and small n, a tight margin should not be satisfied
+	if rw.IsConfidenceSatisfied(1.96, 0.001) {
+		t.Error("expected confidence NOT satisfied for high-variance data with tight margin")
+	}
+}
+
+func TestRollingWindow_Each(t *testing.T) {
+	rw := newRollingWindow(4)
+	for _, v := range []float64{1, 2, 3, 4} {
+		rw.Push(v)
+	}
+	sum := 0.0
+	rw.Each(func(v float64) { sum += v })
+	if math.Abs(sum-10.0) > epsilon {
+		t.Errorf("expected Each() sum=10.0, got %f", sum)
+	}
+}
+
+func TestRollingWindow_OverwritesOldestOnOverflow(t *testing.T) {
+	rw := newRollingWindow(3)
+	rw.Push(1)
+	rw.Push(2)
+	rw.Push(3)
+	rw.Push(100) // Should evict 1, window = [2, 3, 100]
+	mean := rw.Mean()
+	want := (2.0 + 3.0 + 100.0) / 3.0
+	if math.Abs(mean-want) > epsilon {
+		t.Errorf("expected mean=%f after overflow, got %f", want, mean)
+	}
+}
+
+func TestRollingWindow_InternalIndexOverflowWrapToZero(t *testing.T) {
+	capacity := 3
+	rw := newRollingWindow(capacity)
+
+	// Assuming we have pushed ((2 * MaxInt) - 3) elements
+	rw.index = -3
+
+	// cycle back to index=0, ensure our length is still valid
+	rw.Push(1)
+	rw.Push(2)
+	rw.Push(3)
+
+	if rw.Len() != capacity {
+		t.Errorf("expected length=%d after overflowing internal index back to 0. Got %d\n", capacity, rw.Len())
+	}
+
+	// This is because we trick our length algorithm by adding back capacity if the next index == 0
+	if rw.index != capacity {
+		t.Errorf("expected internal index = %d after reaching 0. Got %d\n", capacity, rw.index)
+	}
+}
+
+func TestRollingWindow_InternalIndexOverflow(t *testing.T) {
+	capacity := 3
+	rw := newRollingWindow(capacity)
+
+	// set index to the 0 position relative to max int
+	rw.index = math.MaxInt - 1
+
+	// advance to MaxInt, advance to -MaxInt (overflow), andvance to -MaxInt + 1
+	rw.Push(1)
+	rw.Push(2)
+	rw.Push(3)
+
+	set := newSet(1, 2, 3)
+
+	rw.Each(func(value float64) {
+		v := int(value)
+		if !set.has(v) {
+			t.Errorf("Failed to find value: %d in set.\n", v)
+		}
+
+		set.remove(v)
+	})
+
+	// rewrite
+	rw.Push(4)
+	rw.Push(5)
+	rw.Push(6)
+
+	set = newSet(4, 5, 6)
+	rw.Each(func(value float64) {
+		v := int(value)
+		if !set.has(v) {
+			t.Errorf("Failed to find value: %d in set.\n", v)
+		}
+
+		set.remove(v)
+	})
+
+}
+
+func TestRollingWindow_PartialCapacityMean(t *testing.T) {
+	rw := newRollingWindow(10)
+	for range 5 {
+		rw.Push(5.0)
+	}
+
+	mean := rw.Mean()
+	if mean != 5.0 {
+		t.Errorf("Expected mean = 5.0. Got %f\n", mean)
+	}
+}

+ 227 - 0
core/pkg/util/monitor/memory/memorylimitstats.go

@@ -0,0 +1,227 @@
+package memory
+
+import "sync"
+
+// MemoryLimitConfig contains configuration values used to calculate the soft
+// memory limit based on heap usage over time.
+type MemoryLimitConfig struct {
+	// LimitRatio is the ratio applied to memory limit values calculated. This
+	// is generally set to 90% of the proposed limit. ie: 0.9
+	LimitRatio float64
+
+	// MinSamples is the required number of samples that must be collected before
+	// calculating a memory limit.
+	MinSamples int
+
+	// WindowSize is the total number of smoothed samples to maintain when calculating
+	// the proposed limit. This is generally set based on the timescale in which
+	// samples are added.
+	WindowSize int
+
+	// SmoothingFactor is a value between 0 and 1 which defines how weight importance
+	// should be distributed between a previous average value and the current observation.
+	SmoothingFactor float64
+
+	// BreachWindowSize is the total number of recent raw samples are maintained used for
+	// breach detection. This occurs when the number of samples geather than the current
+	// memory limit exceeds a threshold.
+	BreachWindowSize int
+
+	// BreachThreshold is a limit of raw samples allowed to exceed the memory limit. If this
+	// threshold is reached, the samples are recalibrated.
+	BreachThreshold int
+
+	// CumulativeSumSlack is also known as the K-Factor (drift tolerance) in cumulative sum control
+	// charts uses the allowable slack range in deviations. If the deviations exceed the allowable
+	// slack, then they're used to calculate the sum. This is generally set from 0.5 to 1.0 standard
+	// deviations to filter out process noise.
+	CumulativeSumSlack float64
+
+	// CumulativeSumThreshold is a scaler applied to the "baseline" mean (set once there are enough
+	// samples to be considered "stable"). If the cumulative sum ever surpasses this baseline * threshold,
+	// the samples will be recalibrated.
+	CumulativeSumThreshold float64
+}
+
+// DfaultMemoryLimitConfig creates the recommendded values to use for detecting soft memory limit updates
+func DefaultMemoryLimitConfig() *MemoryLimitConfig {
+	return &MemoryLimitConfig{
+		LimitRatio:             0.90,
+		MinSamples:             30,
+		WindowSize:             60,
+		SmoothingFactor:        0.30,
+		BreachWindowSize:       10,
+		BreachThreshold:        3,
+		CumulativeSumSlack:     0.05,
+		CumulativeSumThreshold: 5.0,
+	}
+}
+
+// MemoryLimitStats is a run-time memory statistics collector that maintains a soft memory limit
+// value based on configurable input parameters. It is designed to adjust the soft limit based on
+// meaningful changes to overall heap allocation, leveraging expontential moving average windows,
+// confidence interval gates, breach detection, and cumulative sum control chart to detect meaningful
+// deviations from the mean.
+type MemoryLimitStats struct {
+	lock   sync.Mutex
+	config *MemoryLimitConfig
+
+	// expontential moving average calculation
+	ema *exponentialMovingAverage
+
+	// ring buffers for tracking exponential moving averages and raw samples
+	window *rollingWindow
+	raw    *rollingWindow
+	breach *rollingWindow
+
+	// cusum calculation for detecting positive shifts in memory usage
+	cusum *cumulativeSum
+
+	// tracked value storage for the soft memory limit proposal which
+	// stores the previous limit as well as the current limit
+	softLimit *trackedValue
+}
+
+// NewMemoryLimitStats creates a new `MemoryLimitStats` instance with the provided
+// `MemoryLimitConfig`. If the provided config is `nil`, then the default configuration
+// values are used.
+func NewMemoryLimitStats(config *MemoryLimitConfig) *MemoryLimitStats {
+	if config == nil {
+		config = DefaultMemoryLimitConfig()
+	}
+
+	return &MemoryLimitStats{
+		config:    config,
+		ema:       newExponentialMovingAverage(config.SmoothingFactor),
+		window:    newRollingWindow(config.WindowSize),
+		raw:       newRollingWindow(config.MinSamples),
+		breach:    newRollingWindow(config.BreachWindowSize),
+		cusum:     newCumulativeSum(config.CumulativeSumSlack),
+		softLimit: newTrackedValue(),
+	}
+}
+
+// Record ingests the total heap memory usage (in bytes), and returns
+// (newSoftLimit, true) when the soft limit has been updated, or
+// (currentSoftLimit, false) when no change occurred.
+//
+// A return value of (0, false) means the monitor is still collecting samples
+// and no limit has been committed yet.
+func (mls *MemoryLimitStats) Record(heapBytes uint64) (softLimit uint64, updated bool) {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	sample := float64(heapBytes)
+	smoothed := mls.ema.Update(sample)
+
+	mls.window.Push(smoothed)
+	mls.raw.Push(sample)
+	mls.breach.Push(sample)
+
+	// Check that the minimum number of sammples exist in the window before
+	// calculating the memory limit
+	totalSamples := mls.window.Len()
+	if totalSamples < mls.config.MinSamples {
+		return uint64(mls.softLimit.Value()), false
+	}
+
+	// NOTE: We could calculate the mean and stddev here, and determine if the data
+	// NOTE: matches a confidence interval, but this might be too strict. See the
+	// NOTE: method: mls.window.IsConfidenceSatisfied(...) method.
+
+	// Pull the P99 sample from the smoothed sample window
+	p99 := mls.window.Percentile(99)
+	candidate := p99 * mls.config.LimitRatio
+
+	// Ensure we've already set a soft limit before running breach
+	// detection or CUSUM deviation tests.
+	if mls.softLimit.IsSet() {
+		if mls.isBreachDetected() {
+			mls.recalibrate()
+			return uint64(mls.softLimit.Value()), false
+		}
+
+		// update cumulative sum and check for recalibration
+		mls.cusum.Update(sample)
+		if mls.cusum.IsRecalibrationRequired(mls.config.CumulativeSumThreshold) {
+			mls.recalibrate()
+			return uint64(mls.softLimit.Value()), false
+		}
+
+		// this will only end up running once after the min samples threshold
+		// is passed, and sets the baseline mean for the cusum calculations
+		mean := mls.raw.Mean()
+		mls.cusum.Calibrate(mean)
+	}
+
+	// update the soft limit to the candidate sample
+	updated = mls.softLimit.Set(candidate)
+	softLimit = uint64(mls.softLimit.Value())
+	return
+}
+
+// SoftLimit returns the current soft limit without recording a sample.
+// Returns 0 if the monitor is still collecting data samples.
+func (mls *MemoryLimitStats) SoftMemoryLimit() uint64 {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	return uint64(mls.softLimit.Value())
+}
+
+// TotalSamples returns the total number of samples _currently_ being used to
+// calculate the memory limit. The samples will reset if a deviation threshold
+// was reached in order to re-establish stability in the data set.
+func (mls *MemoryLimitStats) TotalSamples() int {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	return mls.window.Len()
+}
+
+// Reset clears all state and samples collected.
+func (mls *MemoryLimitStats) Reset() {
+	mls.lock.Lock()
+	defer mls.lock.Unlock()
+
+	mls.window.Clear()
+	mls.raw.Clear()
+	mls.breach.Clear()
+	mls.softLimit.Clear()
+	mls.ema.Reset()
+	mls.cusum.Reset()
+}
+
+// isBreachDetected iterates through the breach sample window and tallies the
+// total number of samples that exceed the p99 smoothed memory usage sample.
+func (mls *MemoryLimitStats) isBreachDetected() bool {
+	if !mls.softLimit.IsSet() {
+		return false
+	}
+
+	// due to the nature of breach detection, we want to compare
+	// against the smoothed p99 sample, so unroll the ratio
+	p99 := mls.softLimit.Value() / mls.config.LimitRatio
+
+	// Tally the total number of recent raw samples that are
+	// greater than the p99 smoothed sample
+	count := 0
+	mls.breach.Each(func(value float64) {
+		if value > p99 {
+			count++
+		}
+	})
+
+	return count >= mls.config.BreachThreshold
+}
+
+// recalibrate dumps the existing samples and calculations, but will preserve
+// the previous softLimit value until a new soft limit is set.
+func (mls *MemoryLimitStats) recalibrate() {
+	mls.window.Clear()
+	mls.raw.Clear()
+	mls.breach.Clear()
+	mls.ema.Reset()
+	mls.cusum.Reset()
+	mls.softLimit.Reset()
+}

+ 146 - 0
core/pkg/util/monitor/memory/memorylimitstats_test.go

@@ -0,0 +1,146 @@
+package memory_test
+
+import (
+	"fmt"
+	"math/rand"
+	"testing"
+
+	"github.com/mbolt35/bingen-file-loader/core/util/monitor/memory"
+)
+
+func TestObservationMode(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	// Feed fewer than MinSamples — should never return updated=true.
+	for i := 0; i < config.MinSamples-1; i++ {
+		_, updated := m.Record(100 * 1024 * 1024) // 100 MiB
+		if updated {
+			t.Fatalf("sample %d: got updated=true before MinSamples reached", i)
+		}
+	}
+	if got := m.SoftMemoryLimit(); got != 0 {
+		t.Fatalf("expected SoftLimit 0 during observation, got %d", got)
+	}
+}
+
+func TestLimitCommittedAfterMinSamples(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	const alloc = 200 * 1024 * 1024 // 200 MiB, perfectly stable
+	var lastLimit uint64
+	var sawUpdate bool
+
+	for i := 0; i < config.MinSamples+10; i++ {
+		limit, updated := m.Record(alloc)
+		if updated {
+			sawUpdate = true
+			lastLimit = limit
+		}
+	}
+
+	if !sawUpdate {
+		t.Fatal("expected at least one limit update after MinSamples")
+	}
+
+	// Soft limit should be ~90% of the stable allocation.
+	expected := uint64(float64(alloc) * 0.90)
+	delta := int64(lastLimit) - int64(expected)
+	if delta < 0 {
+		delta = -delta
+	}
+	// Allow 1% tolerance.
+	if delta > int64(expected)/100 {
+		t.Fatalf("soft limit %d too far from expected %d (delta %d)", lastLimit, expected, delta)
+	}
+}
+
+func TestElasticRecalibrationOnGrowth(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	config.BreachWindowSize = 5
+	config.BreachThreshold = 3
+	m := memory.NewMemoryLimitStats(config)
+
+	// Phase 1: stable at 100 MiB — establish a limit.
+	for i := 0; i < config.MinSamples+20; i++ {
+		m.Record(100 * 1024 * 1024)
+	}
+	limitBefore := m.SoftMemoryLimit()
+	if limitBefore == 0 {
+		t.Fatal("expected a non-zero limit after phase 1")
+	}
+
+	fmt.Printf("Before Breach...\n")
+
+	// Phase 2: spike to 300 MiB repeatedly — should trigger recalibration.
+	for i := 0; i < config.BreachThreshold+1; i++ {
+		m.Record(500 * 1024 * 1024)
+	}
+
+	fmt.Printf("Crossed Threshold here\n")
+
+	// Phase 3: feed enough samples at new level to re-commit.
+	var recalibrated bool
+	for i := 0; i < config.MinSamples+20; i++ {
+		limit, updated := m.Record(800 * 1024 * 1024)
+		if updated && limit > limitBefore {
+			recalibrated = true
+			break
+		}
+	}
+	if !recalibrated {
+		t.Fatal("expected the soft limit to grow after sustained high usage")
+	}
+}
+
+func TestReset(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	for i := 0; i < config.MinSamples+5; i++ {
+		m.Record(128 * 1024 * 1024)
+	}
+	if m.SoftMemoryLimit() == 0 {
+		t.Fatal("expected non-zero soft limit before reset")
+	}
+
+	m.Reset()
+
+	if m.SoftMemoryLimit() != 0 {
+		t.Fatal("expected zero soft limit after reset")
+	}
+	if m.TotalSamples() != 0 {
+		t.Fatal("expected zero sample count after reset")
+	}
+}
+
+func TestNoisyInputStability(t *testing.T) {
+	config := memory.DefaultMemoryLimitConfig()
+	m := memory.NewMemoryLimitStats(config)
+
+	rng := rand.New(rand.NewSource(42))
+	base := float64(256 * 1024 * 1024) // 256 MiB
+
+	var limits []uint64
+	for i := 0; i < 200; i++ {
+		// ±10% noise around base
+		noise := (rng.Float64()*0.2 - 0.1) * base
+		_, _ = m.Record(uint64(base + noise))
+		if l := m.SoftMemoryLimit(); l > 0 {
+			limits = append(limits, l)
+		}
+	}
+
+	if len(limits) == 0 {
+		t.Fatal("expected at least one committed limit")
+	}
+
+	// The final limit should be in a sensible range: 75–95% of base.
+	last := limits[len(limits)-1]
+	lo := uint64(base * 0.75)
+	hi := uint64(base * 0.95)
+	if last < lo || last > hi {
+		t.Fatalf("final limit %d outside expected range [%d, %d]", last, lo, hi)
+	}
+}

+ 123 - 0
core/pkg/util/monitor/memorylimiter.go

@@ -0,0 +1,123 @@
+package monitor
+
+import (
+	"fmt"
+	"math"
+	"runtime"
+	"runtime/debug"
+	"sync"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/util/atomic"
+	"github.com/opencost/opencost/core/pkg/util/monitor/memory"
+)
+
+var (
+	once          sync.Once
+	memoryLimiter *MemoryLimiter
+)
+
+// MemoryLimiter is a heap usage monitor for the go runtime which will attempt to
+// dynamically set a GOMEMLIMIT value to best fit the heap usage. It will only
+// adjust the GOMEMLIMIT if the usage analysis results in an increase, and won't
+// try to "best fit" the current usage. It takes into account the initial GOMEMLIMIT
+// value as the baseline.
+type MemoryLimiter struct {
+	runState atomic.AtomicRunState
+	monitor  *memory.MemoryLimitStats
+}
+
+// Start begins collecting heap allocation samples for automatically adjusting the go soft memory limit
+// for heap usage.
+func (ml *MemoryLimiter) Start(interval time.Duration) error {
+	ml.runState.WaitForReset()
+
+	if !ml.runState.Start() {
+		return fmt.Errorf("memory limiter was already started")
+	}
+
+	// main limiter driver
+	go func() {
+		var memStats runtime.MemStats
+		var prevLimit uint64
+
+		// determine if mem limit was set prior by passing a negative
+		// value to SetMemoryLimit, which will return the current value
+		// without making any changes -- the default is MaxInt64
+		goMemLimit := debug.SetMemoryLimit(-1)
+		if goMemLimit == math.MaxInt64 {
+			prevLimit = 0
+		} else {
+			prevLimit = uint64(goMemLimit)
+		}
+
+		// take initial heap measurement
+		runtime.ReadMemStats(&memStats)
+		ml.monitor.Record(memStats.HeapAlloc)
+
+		for {
+			select {
+			case <-ml.runState.OnStop():
+				ml.runState.Reset()
+				return
+
+			case <-time.After(interval):
+			}
+
+			// in the event that someone updates the limit outside of this monitor
+			// we want to make sure that we synchronize the correct value
+			goMemLimit = debug.SetMemoryLimit(-1)
+			if goMemLimit != math.MaxInt64 && goMemLimit != int64(prevLimit) {
+				prevLimit = uint64(goMemLimit)
+			}
+
+			// record and determine if we should update the memory limit
+			runtime.ReadMemStats(&memStats)
+			if softLimit, updated := ml.monitor.Record(memStats.HeapAlloc); updated {
+				// we only allow the limit to increase for now, as this best reflects a
+				// max stable set of samples. Worth observation and potentially updating
+				// in the future
+				if softLimit != 0 && softLimit > prevLimit {
+					prevLimit = softLimit
+					log.Debugf("Updating Go Memory Limit: %dmb", int64(softLimit/1024.0/1024.0))
+					debug.SetMemoryLimit(int64(softLimit))
+				}
+			}
+		}
+	}()
+
+	return nil
+}
+
+// Stops automatically adjusting the memory limiter
+func (ml *MemoryLimiter) Stop() error {
+	if !ml.runState.Stop() {
+		return fmt.Errorf("could not stop memory limiter - in the state of stopping or already stopped")
+	}
+	return nil
+}
+
+// returns the singleton instance of the memory limiter
+func getMemoryLimiter() *MemoryLimiter {
+	once.Do(func() {
+		config := memory.DefaultMemoryLimitConfig()
+		memoryLimiter = &MemoryLimiter{
+			monitor: memory.NewMemoryLimitStats(config),
+		}
+	})
+
+	return memoryLimiter
+}
+
+// DefaultMemoryLimiterSampleInterval is the sample interval in which the auto limiter
+// gathers heap usage.
+const DefaultMemoryLimiterSampleInterval = time.Second
+
+func StartMemoryLimiter() error {
+	return getMemoryLimiter().Start(DefaultMemoryLimiterSampleInterval)
+}
+
+func StopMemoryLimiter() error {
+	return getMemoryLimiter().Stop()
+}