Преглед изворни кода

use cache size rather than entries

Alex Meijer пре 1 месец
родитељ
комит
90a5cac616
2 измењених фајлова са 65 додато и 25 уклоњено
  1. 37 20
      core/pkg/util/stringutil/filebank.go
  2. 28 5
      core/pkg/util/stringutil/filebank_test.go

+ 37 - 20
core/pkg/util/stringutil/filebank.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"os"
+	"sort"
 	"sync"
 	"time"
 )
@@ -18,11 +19,12 @@ type fileLruEntry struct {
 }
 
 type fileStringBank struct {
-	lock     sync.Mutex
-	stop     chan struct{}
-	capacity int
-	f        *os.File
-	m        map[string]*fileLruEntry
+	lock        sync.Mutex
+	stop        chan struct{}
+	maxBytes    int
+	currentSize int
+	f           *os.File
+	m           map[string]*fileLruEntry
 	// spill maps a lookup key to a byte offset in f for strings evicted from m.
 	spill map[string]int64
 }
@@ -30,7 +32,7 @@ type fileStringBank struct {
 // NewFileStringBank returns a StringBank that keeps an LRU in memory and persists
 // evicted strings in a length-prefixed record file at path. Lookups consult memory,
 // then the spill index, then allocate and cache.
-func NewFileStringBank(path string, capacity int, evictionInterval time.Duration) (StringBank, error) {
+func NewFileStringBank(path string, maxBytes int, evictionInterval time.Duration) (StringBank, error) {
 	ff, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o644)
 	if err != nil {
 		return nil, fmt.Errorf("stringutil: open file string bank: %w", err)
@@ -40,7 +42,7 @@ func NewFileStringBank(path string, capacity int, evictionInterval time.Duration
 		f:        ff,
 		m:        make(map[string]*fileLruEntry),
 		spill:    make(map[string]int64),
-		capacity: capacity,
+		maxBytes: maxBytes,
 		stop:     stop,
 	}
 
@@ -52,7 +54,7 @@ func NewFileStringBank(path string, capacity int, evictionInterval time.Duration
 			case <-time.After(evictionInterval):
 			}
 			bank.lock.Lock()
-			evictFileBank(bank, capacity)
+			evictFileBank(bank, maxBytes)
 			bank.lock.Unlock()
 		}
 	}()
@@ -60,8 +62,8 @@ func NewFileStringBank(path string, capacity int, evictionInterval time.Duration
 	return bank, nil
 }
 
-func evictFileBank(bank *fileStringBank, capacity int) {
-	if len(bank.m) <= capacity {
+func evictFileBank(bank *fileStringBank, maxBytes int) {
+	if bank.currentSize <= maxBytes {
 		return
 	}
 
@@ -70,16 +72,26 @@ func evictFileBank(bank *fileStringBank, capacity int) {
 		arr = append(arr, v)
 	}
 
-	oldest := nOldestFileEntries(arr, len(bank.m)-capacity)
-	for _, old := range oldest {
+	sort.Slice(arr, func(i, j int) bool {
+		return arr[i].used < arr[j].used
+	})
+	for _, old := range arr {
+		if bank.currentSize <= maxBytes {
+			return
+		}
 		if err := bank.persistSpill(old); err != nil {
 			// Best effort: leave entry in memory on I/O failure to avoid losing data.
 			continue
 		}
 		delete(bank.m, old.key)
+		bank.currentSize -= fileEntrySize(old)
 	}
 }
 
+func fileEntrySize(e *fileLruEntry) int {
+	return len(e.key) + len(e.value)
+}
+
 func (bank *fileStringBank) persistSpill(e *fileLruEntry) error {
 	off, err := bank.f.Seek(0, io.SeekEnd)
 	if err != nil {
@@ -190,9 +202,10 @@ func (bank *fileStringBank) LoadOrStore(key, value string) (string, bool) {
 			value: s,
 			used:  time.Now().UnixMilli(),
 		}
+		bank.currentSize += len(key) + len(s)
 		delete(bank.spill, key)
-		if len(bank.m) > (bank.capacity + (bank.capacity / 2)) {
-			evictFileBank(bank, bank.capacity)
+		if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
+			evictFileBank(bank, bank.maxBytes)
 		}
 		return s, true
 	}
@@ -202,8 +215,9 @@ func (bank *fileStringBank) LoadOrStore(key, value string) (string, bool) {
 		value: value,
 		used:  time.Now().UnixMilli(),
 	}
-	if len(bank.m) > (bank.capacity + (bank.capacity / 2)) {
-		evictFileBank(bank, bank.capacity)
+	bank.currentSize += len(key) + len(value)
+	if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
+		evictFileBank(bank, bank.maxBytes)
 	}
 	return value, false
 }
@@ -222,9 +236,10 @@ func (bank *fileStringBank) LoadOrStoreFunc(key string, f func() string) (string
 			value: s,
 			used:  time.Now().UnixMilli(),
 		}
+		bank.currentSize += len(key) + len(s)
 		delete(bank.spill, key)
-		if len(bank.m) > (bank.capacity + (bank.capacity / 2)) {
-			evictFileBank(bank, bank.capacity)
+		if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
+			evictFileBank(bank, bank.maxBytes)
 		}
 		return s, true
 	}
@@ -235,8 +250,9 @@ func (bank *fileStringBank) LoadOrStoreFunc(key string, f func() string) (string
 		value: value,
 		used:  time.Now().UnixMilli(),
 	}
-	if len(bank.m) > (bank.capacity + (bank.capacity / 2)) {
-		evictFileBank(bank, bank.capacity)
+	bank.currentSize += len(value) + len(value)
+	if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
+		evictFileBank(bank, bank.maxBytes)
 	}
 	return value, false
 }
@@ -247,6 +263,7 @@ func (bank *fileStringBank) Clear() {
 
 	bank.m = make(map[string]*fileLruEntry)
 	bank.spill = make(map[string]int64)
+	bank.currentSize = 0
 	if bank.f != nil {
 		if err := bank.f.Truncate(0); err == nil {
 			_, _ = bank.f.Seek(0, io.SeekStart)

+ 28 - 5
core/pkg/util/stringutil/filebank_test.go

@@ -34,8 +34,8 @@ func TestFileBank_LoadOrStore_MissAndHit(t *testing.T) {
 
 func TestFileBank_EvictionPersistsAndReloads(t *testing.T) {
 	path := filepath.Join(t.TempDir(), "strings.dat")
-	capacity := 2
-	bank, err := NewFileStringBank(path, capacity, 200*time.Millisecond)
+	maxBytes := 4 // 2 entries of size len(key)+len(value)==2 each
+	bank, err := NewFileStringBank(path, maxBytes, 200*time.Millisecond)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -54,10 +54,10 @@ func TestFileBank_EvictionPersistsAndReloads(t *testing.T) {
 
 	fb := bank.(*fileStringBank)
 	fb.lock.Lock()
-	inMem := len(fb.m)
+	inMemBytes := fb.currentSize
 	fb.lock.Unlock()
-	if inMem > capacity {
-		t.Fatalf("expected in-memory size <= %d after eviction, got %d", capacity, inMem)
+	if inMemBytes > maxBytes {
+		t.Fatalf("expected in-memory bytes <= %d after eviction, got %d", maxBytes, inMemBytes)
 	}
 
 	// Oldest entries should be recoverable from file-backed spill.
@@ -77,6 +77,29 @@ func TestFileBank_EvictionPersistsAndReloads(t *testing.T) {
 	}
 }
 
+func TestFileBank_EvictionUsesByteSizeNotEntryCount(t *testing.T) {
+	path := filepath.Join(t.TempDir(), "strings.dat")
+	maxBytes := 10
+	bank, err := NewFileStringBank(path, maxBytes, 150*time.Millisecond)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() { _ = bank.(*fileStringBank).Close() }()
+
+	bank.LoadOrStore("k1", "aaaaaa") // size 8
+	time.Sleep(30 * time.Millisecond)
+	bank.LoadOrStore("k2", "bb") // size 4, total 12 > 10
+
+	time.Sleep(250 * time.Millisecond)
+
+	fb := bank.(*fileStringBank)
+	fb.lock.Lock()
+	defer fb.lock.Unlock()
+	if fb.currentSize > maxBytes {
+		t.Fatalf("expected in-memory bytes <= %d, got %d", maxBytes, fb.currentSize)
+	}
+}
+
 func TestFileBank_LoadOrStoreFunc_FactoryCalledOnMissOnly(t *testing.T) {
 	path := filepath.Join(t.TempDir(), "strings.dat")
 	bank, err := NewFileStringBank(path, 10, time.Minute)