Alex Meijer 3 недель назад
Родитель
Сommit
83042eab95

+ 0 - 177
core/pkg/opencost/file_string_table.go

@@ -1,177 +0,0 @@
-package opencost
-
-import (
-	"fmt"
-	"io"
-	"os"
-
-	util "github.com/opencost/opencost/core/pkg/util"
-)
-
-// fileStringRef maps a bingen string-table index to a payload stored in a temp file.
-type fileStringRef struct {
-	off    int64
-	length int
-}
-
-// FileStringTable holds string-table payloads on disk and resolves indices on demand.
-type FileStringTable struct {
-	f    *os.File
-	refs []fileStringRef
-}
-
-// NewFileStringTableFromBuffer reads exactly tl length-prefixed (uint16) string payloads from buff
-// and appends each payload to a new temp file. It does not retain full strings in memory.
-func NewFileStringTableFromBuffer(buff *util.Buffer, tl int) (*FileStringTable, error) {
-	os.MkdirAll("/var/lib/clickhouse/tmp", 0755)
-	f, err := os.CreateTemp("/var/lib/clickhouse/tmp", "opencost-bgst-*")
-	if err != nil {
-		return nil, fmt.Errorf("opencost: create string table file: %w", err)
-	}
-
-	t := &FileStringTable{f: f, refs: make([]fileStringRef, tl)}
-	var writeErr error
-	defer func() {
-		if writeErr != nil {
-			_ = t.Close()
-		}
-	}()
-
-	for i := 0; i < tl; i++ {
-		payload, err := buff.ReadStringBytes()
-		if err != nil {
-			writeErr = err
-			return nil, fmt.Errorf("opencost: read string table entry %d: %w", i, err)
-		}
-		var off int64
-		if len(payload) > 0 {
-			off, err = f.Seek(0, io.SeekEnd)
-			if err != nil {
-				writeErr = err
-				return nil, fmt.Errorf("opencost: seek string table file: %w", err)
-			}
-			if _, err := f.Write(payload); err != nil {
-				writeErr = err
-				return nil, fmt.Errorf("opencost: write string table entry %d: %w", i, err)
-			}
-		}
-		t.refs[i] = fileStringRef{off: off, length: len(payload)}
-	}
-
-	return t, nil
-}
-
-// Len returns the number of strings in the table.
-func (t *FileStringTable) Len() int {
-	if t == nil {
-		return 0
-	}
-	return len(t.refs)
-}
-
-// StringAt returns the string for wire index i, reading from the backing file.
-func (t *FileStringTable) StringAt(i int) (string, error) {
-	if t == nil || t.f == nil {
-		return "", fmt.Errorf("opencost: closed or nil string table")
-	}
-	if i < 0 || i >= len(t.refs) {
-		return "", fmt.Errorf("opencost: string table index %d out of range [0,%d)", i, len(t.refs))
-	}
-	ref := t.refs[i]
-	if ref.length == 0 {
-		return "", nil
-	}
-	buf := make([]byte, ref.length)
-	n, err := t.f.ReadAt(buf, ref.off)
-	if err != nil {
-		return "", err
-	}
-	if n != ref.length {
-		return "", fmt.Errorf("opencost: short read in string table at %d", i)
-	}
-	return string(buf), nil
-}
-
-// Close releases the backing file (and removes the temp file).
-func (t *FileStringTable) Close() error {
-	if t == nil || t.f == nil {
-		return nil
-	}
-	path := t.f.Name()
-	err := t.f.Close()
-	t.f = nil
-	t.refs = nil
-	if path != "" {
-		_ = os.Remove(path)
-	}
-	return err
-}
-
-func newDecodingContextFromBytes(data []byte) (*DecodingContext, error) {
-	buff := util.NewBufferFromBytes(data)
-	if !isBinaryTag(data, BinaryTagStringTable) {
-		return &DecodingContext{Buffer: buff}, nil
-	}
-
-	buff.ReadBytes(len(BinaryTagStringTable))
-	tl := buff.ReadInt()
-	if tl <= 0 {
-		return &DecodingContext{Buffer: buff}, nil
-	}
-
-	ft, err := NewFileStringTableFromBuffer(buff, tl)
-	if err != nil {
-		return nil, err
-	}
-	return &DecodingContext{Buffer: buff, FileTable: ft}, nil
-}
-
-// IsStringTable returns true if a non-empty string table is present (in-memory and/or file-backed).
-func (dc *DecodingContext) IsStringTable() bool {
-	return dc != nil && (len(dc.Table) > 0 || (dc.FileTable != nil && dc.FileTable.Len() > 0))
-}
-
-// CloseFileTable closes and removes the backing file for the string table, if any.
-func (dc *DecodingContext) CloseFileTable() {
-	if dc == nil || dc.FileTable == nil {
-		return
-	}
-	_ = dc.FileTable.Close()
-	dc.FileTable = nil
-}
-
-func (dc *DecodingContext) tableString(i int) string {
-	if len(dc.Table) > 0 {
-		if i < 0 || i >= len(dc.Table) {
-			panic(fmt.Sprintf("opencost: string table index %d out of range [0,%d)", i, len(dc.Table)))
-		}
-		return dc.Table[i]
-	}
-	if dc.FileTable == nil {
-		panic(fmt.Sprintf("opencost: string table lookup with no file table (index %d)", i))
-	}
-	s, err := dc.FileTable.StringAt(i)
-	if err != nil {
-		panic(err)
-	}
-	return s
-}
-
-func newDecodingContextFromReader(reader io.Reader) (*DecodingContext, error) {
-	buff := util.NewBufferFromReader(reader)
-	if !isReaderBinaryTag(buff, BinaryTagStringTable) {
-		return &DecodingContext{Buffer: buff}, nil
-	}
-
-	buff.ReadBytes(len(BinaryTagStringTable))
-	tl := buff.ReadInt()
-	if tl <= 0 {
-		return &DecodingContext{Buffer: buff}, nil
-	}
-
-	ft, err := NewFileStringTableFromBuffer(buff, tl)
-	if err != nil {
-		return nil, err
-	}
-	return &DecodingContext{Buffer: buff, FileTable: ft}, nil
-}

+ 0 - 37
core/pkg/opencost/file_string_table_test.go

@@ -1,37 +0,0 @@
-package opencost
-
-import (
-	"testing"
-
-	util "github.com/opencost/opencost/core/pkg/util"
-)
-
-func TestFileStringTableFromBuffer_RoundTrip(t *testing.T) {
-	bw := util.NewBuffer()
-	bw.WriteString("hello")
-	bw.WriteString("")
-	bw.WriteString("world")
-
-	br := util.NewBufferFromBytes(bw.Bytes())
-	ft, err := NewFileStringTableFromBuffer(br, 3)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer ft.Close()
-
-	if ft.Len() != 3 {
-		t.Fatalf("len: %d", ft.Len())
-	}
-	s0, err := ft.StringAt(0)
-	if err != nil || s0 != "hello" {
-		t.Fatalf("0: %q %v", s0, err)
-	}
-	s1, err := ft.StringAt(1)
-	if err != nil || s1 != "" {
-		t.Fatalf("1: %q %v", s1, err)
-	}
-	s2, err := ft.StringAt(2)
-	if err != nil || s2 != "world" {
-		t.Fatalf("2: %q %v", s2, err)
-	}
-}

+ 0 - 272
core/pkg/util/stringutil/filebank.go

@@ -1,272 +0,0 @@
-package stringutil
-
-import (
-	"encoding/binary"
-	"fmt"
-	"io"
-	"os"
-	"sort"
-	"sync"
-	"time"
-)
-
-// fileLruEntry tracks the map key (lookup key) separately from the canonical value
-// so eviction can persist and rehydrate correctly when key != value.
-type fileLruEntry struct {
-	key   string
-	value string
-	used  int64
-}
-
-type fileStringBank struct {
-	lock        sync.Mutex
-	stop        chan struct{}
-	maxBytes    int
-	currentSize int
-	f           *os.File
-	m           map[string]*fileLruEntry
-	// spill maps a lookup key to a byte offset in f for strings evicted from m.
-	spill map[string]int64
-}
-
-// NewFileStringBank returns a StringBank that keeps an LRU in memory and persists
-// evicted strings in a length-prefixed record file at path. Lookups consult memory,
-// then the spill index, then allocate and cache.
-func NewFileStringBank(path string, maxBytes int, evictionInterval time.Duration) (StringBank, error) {
-	ff, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o644)
-	if err != nil {
-		return nil, fmt.Errorf("stringutil: open file string bank: %w", err)
-	}
-	stop := make(chan struct{})
-	bank := &fileStringBank{
-		f:        ff,
-		m:        make(map[string]*fileLruEntry),
-		spill:    make(map[string]int64),
-		maxBytes: maxBytes,
-		stop:     stop,
-	}
-
-	go func() {
-		for {
-			select {
-			case <-stop:
-				return
-			case <-time.After(evictionInterval):
-			}
-			bank.lock.Lock()
-			evictFileBank(bank, maxBytes)
-			bank.lock.Unlock()
-		}
-	}()
-
-	return bank, nil
-}
-
-func evictFileBank(bank *fileStringBank, maxBytes int) {
-	if bank.currentSize <= maxBytes {
-		return
-	}
-
-	arr := make([]*fileLruEntry, 0, len(bank.m))
-	for _, v := range bank.m {
-		arr = append(arr, v)
-	}
-
-	sort.Slice(arr, func(i, j int) bool {
-		return arr[i].used < arr[j].used
-	})
-	for _, old := range arr {
-		if bank.currentSize <= maxBytes {
-			return
-		}
-		if err := bank.persistSpill(old); err != nil {
-			// Best effort: leave entry in memory on I/O failure to avoid losing data.
-			continue
-		}
-		delete(bank.m, old.key)
-		bank.currentSize -= fileEntrySize(old)
-	}
-}
-
-func fileEntrySize(e *fileLruEntry) int {
-	return len(e.key) + len(e.value)
-}
-
-func (bank *fileStringBank) persistSpill(e *fileLruEntry) error {
-	off, err := bank.f.Seek(0, io.SeekEnd)
-	if err != nil {
-		return err
-	}
-	payload := []byte(e.value)
-	var hdr [4]byte
-	binary.BigEndian.PutUint32(hdr[:], uint32(len(payload)))
-	if _, err := bank.f.Write(hdr[:]); err != nil {
-		return err
-	}
-
-	if _, err := bank.f.Write(payload); err != nil {
-		return err
-	}
-	// if err := bank.f.Sync(); err != nil {
-	// 	return err
-	// }
-	bank.spill[e.key] = off
-	return nil
-}
-
-func (bank *fileStringBank) readSpill(offset int64) (string, error) {
-	if _, err := bank.f.Seek(offset, io.SeekStart); err != nil {
-		return "", err
-	}
-	var hdr [4]byte
-	if _, err := io.ReadFull(bank.f, hdr[:]); err != nil {
-		return "", err
-	}
-	n := binary.BigEndian.Uint32(hdr[:])
-	buf := make([]byte, n)
-	if _, err := io.ReadFull(bank.f, buf); err != nil {
-		return "", err
-	}
-	return string(buf), nil
-}
-
-func (bank *fileStringBank) loadFromSpill(key string) (string, bool) {
-	off, ok := bank.spill[key]
-	if !ok {
-		return "", false
-	}
-	s, err := bank.readSpill(off)
-	if err != nil {
-		return "", false
-	}
-	return s, true
-}
-
-func nOldestFileEntries(arr []*fileLruEntry, n int) []*fileLruEntry {
-	if n <= 0 {
-		return []*fileLruEntry{}
-	}
-	if n >= len(arr) {
-		return arr
-	}
-	adapted := make([]*lruEntry, len(arr))
-	for i := range arr {
-		adapted[i] = &lruEntry{used: arr[i].used}
-	}
-	oldestIdx := make(map[*lruEntry]int, len(arr))
-	for i := range arr {
-		oldestIdx[adapted[i]] = i
-	}
-	oldest := nOldest(adapted, n)
-	out := make([]*fileLruEntry, 0, len(oldest))
-	for _, o := range oldest {
-		out = append(out, arr[oldestIdx[o]])
-	}
-	return out
-}
-
-// Stop ends the background eviction goroutine. Close closes the backing file.
-func (bank *fileStringBank) Stop() {
-	bank.lock.Lock()
-	defer bank.lock.Unlock()
-	if bank.stop != nil {
-		close(bank.stop)
-		bank.stop = nil
-	}
-}
-
-// Close releases the file handle; Stop is implied for the eviction loop.
-func (bank *fileStringBank) Close() error {
-	bank.Stop()
-	bank.lock.Lock()
-	defer bank.lock.Unlock()
-	var err error
-	if bank.f != nil {
-		err = bank.f.Close()
-		bank.f = nil
-	}
-	return err
-}
-
-func (bank *fileStringBank) LoadOrStore(key, value string) (string, bool) {
-	bank.lock.Lock()
-	defer bank.lock.Unlock()
-
-	if v, ok := bank.m[key]; ok {
-		v.used = time.Now().UnixMilli()
-		return v.value, true
-	}
-	if s, ok := bank.loadFromSpill(key); ok {
-		bank.m[key] = &fileLruEntry{
-			key:   key,
-			value: s,
-			used:  time.Now().UnixMilli(),
-		}
-		bank.currentSize += len(key) + len(s)
-		delete(bank.spill, key)
-		if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
-			evictFileBank(bank, bank.maxBytes)
-		}
-		return s, true
-	}
-
-	bank.m[key] = &fileLruEntry{
-		key:   key,
-		value: value,
-		used:  time.Now().UnixMilli(),
-	}
-	bank.currentSize += len(key) + len(value)
-	if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
-		evictFileBank(bank, bank.maxBytes)
-	}
-	return value, false
-}
-
-func (bank *fileStringBank) LoadOrStoreFunc(key string, f func() string) (string, bool) {
-	bank.lock.Lock()
-	defer bank.lock.Unlock()
-
-	if v, ok := bank.m[key]; ok {
-		v.used = time.Now().UnixMilli()
-		return v.value, true
-	}
-	if s, ok := bank.loadFromSpill(key); ok {
-		bank.m[key] = &fileLruEntry{
-			key:   key,
-			value: s,
-			used:  time.Now().UnixMilli(),
-		}
-		bank.currentSize += len(key) + len(s)
-		delete(bank.spill, key)
-		if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
-			evictFileBank(bank, bank.maxBytes)
-		}
-		return s, true
-	}
-
-	value := f()
-	bank.m[value] = &fileLruEntry{
-		key:   value,
-		value: value,
-		used:  time.Now().UnixMilli(),
-	}
-	bank.currentSize += len(value) + len(value)
-	if bank.currentSize > (bank.maxBytes + (bank.maxBytes / 2)) {
-		evictFileBank(bank, bank.maxBytes)
-	}
-	return value, false
-}
-
-func (bank *fileStringBank) Clear() {
-	bank.lock.Lock()
-	defer bank.lock.Unlock()
-
-	bank.m = make(map[string]*fileLruEntry)
-	bank.spill = make(map[string]int64)
-	bank.currentSize = 0
-	if bank.f != nil {
-		if err := bank.f.Truncate(0); err == nil {
-			_, _ = bank.f.Seek(0, io.SeekStart)
-		}
-	}
-}

+ 0 - 156
core/pkg/util/stringutil/filebank_test.go

@@ -1,156 +0,0 @@
-package stringutil
-
-import (
-	"os"
-	"path/filepath"
-	"testing"
-	"time"
-)
-
-func TestFileBank_LoadOrStore_MissAndHit(t *testing.T) {
-	path := filepath.Join(t.TempDir(), "strings.dat")
-	bank, err := NewFileStringBank(path, 10, time.Minute)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer func() { _ = bank.(*fileStringBank).Close() }()
-
-	v, loaded := bank.LoadOrStore("hello", "hello")
-	if loaded {
-		t.Errorf("first LoadOrStore: expected loaded=false, got true")
-	}
-	if v != "hello" {
-		t.Errorf("first LoadOrStore: expected value %q, got %q", "hello", v)
-	}
-
-	v, loaded = bank.LoadOrStore("hello", "world")
-	if !loaded {
-		t.Errorf("second LoadOrStore: expected loaded=true, got false")
-	}
-	if v != "hello" {
-		t.Errorf("second LoadOrStore: expected cached value %q, got %q", "hello", v)
-	}
-}
-
-func TestFileBank_EvictionPersistsAndReloads(t *testing.T) {
-	path := filepath.Join(t.TempDir(), "strings.dat")
-	maxBytes := 4 // 2 entries of size len(key)+len(value)==2 each
-	bank, err := NewFileStringBank(path, maxBytes, 200*time.Millisecond)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer func() { _ = bank.(*fileStringBank).Close() }()
-
-	bank.LoadOrStore("a", "a")
-	time.Sleep(50 * time.Millisecond)
-	bank.LoadOrStore("b", "b")
-	time.Sleep(50 * time.Millisecond)
-	bank.LoadOrStore("c", "c")
-	time.Sleep(50 * time.Millisecond)
-	// force eviction over capacity
-	bank.LoadOrStore("d", "d")
-
-	time.Sleep(400 * time.Millisecond)
-
-	fb := bank.(*fileStringBank)
-	fb.lock.Lock()
-	inMemBytes := fb.currentSize
-	fb.lock.Unlock()
-	if inMemBytes > maxBytes {
-		t.Fatalf("expected in-memory bytes <= %d after eviction, got %d", maxBytes, inMemBytes)
-	}
-
-	// Oldest entries should be recoverable from file-backed spill.
-	v, loaded := bank.LoadOrStore("a", "a")
-	if !loaded {
-		t.Error("expected hit for 'a' via spill after eviction")
-	}
-	if v != "a" {
-		t.Errorf("expected value %q from spill, got %q", "a", v)
-	}
-
-	fb.lock.Lock()
-	_, stillSpill := fb.spill["a"]
-	fb.lock.Unlock()
-	if stillSpill {
-		t.Error("after promoting 'a' into cache, it should be removed from spill index")
-	}
-}
-
-func TestFileBank_EvictionUsesByteSizeNotEntryCount(t *testing.T) {
-	path := filepath.Join(t.TempDir(), "strings.dat")
-	maxBytes := 10
-	bank, err := NewFileStringBank(path, maxBytes, 150*time.Millisecond)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer func() { _ = bank.(*fileStringBank).Close() }()
-
-	bank.LoadOrStore("k1", "aaaaaa") // size 8
-	time.Sleep(30 * time.Millisecond)
-	bank.LoadOrStore("k2", "bb") // size 4, total 12 > 10
-
-	time.Sleep(250 * time.Millisecond)
-
-	fb := bank.(*fileStringBank)
-	fb.lock.Lock()
-	defer fb.lock.Unlock()
-	if fb.currentSize > maxBytes {
-		t.Fatalf("expected in-memory bytes <= %d, got %d", maxBytes, fb.currentSize)
-	}
-}
-
-func TestFileBank_LoadOrStoreFunc_FactoryCalledOnMissOnly(t *testing.T) {
-	path := filepath.Join(t.TempDir(), "strings.dat")
-	bank, err := NewFileStringBank(path, 10, time.Minute)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer func() { _ = bank.(*fileStringBank).Close() }()
-
-	calls := 0
-	factory := func() string {
-		calls++
-		return "k"
-	}
-
-	bank.LoadOrStoreFunc("k", factory)
-	bank.LoadOrStoreFunc("k", factory)
-
-	if calls != 1 {
-		t.Errorf("factory should be called exactly once, got %d calls", calls)
-	}
-}
-
-func TestFileBank_Clear(t *testing.T) {
-	path := filepath.Join(t.TempDir(), "strings.dat")
-	bank, err := NewFileStringBank(path, 10, time.Minute)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer func() { _ = bank.(*fileStringBank).Close() }()
-
-	bank.LoadOrStore("x", "x")
-	bank.Clear()
-
-	st, err := os.Stat(path)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if st.Size() != 0 {
-		t.Errorf("expected truncated file after Clear, size=%d", st.Size())
-	}
-
-	_, loaded := bank.LoadOrStore("x", "y")
-	if loaded {
-		t.Error("expected miss after Clear")
-	}
-}
-
-func TestFileBank_NewOpenError(t *testing.T) {
-	// Non-directory path that cannot be created as a file parent.
-	_, err := NewFileStringBank("/nonexistent/dir/bank.dat", 3, time.Minute)
-	if err == nil {
-		t.Fatal("expected error for invalid path")
-	}
-}