Просмотр исходного кода

Bingen 0.1.1 Streaming Writer Support (#3836)

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Matt Bolt 2 дней назад
Родитель
Сommit
e4bd3e6392
39 измененных файлов с 2702 добавлено и 464 удалено
  1. 1 1
      core/pkg/exporter/controller.go
  2. 100 2
      core/pkg/exporter/decoder_test.go
  3. 90 6
      core/pkg/exporter/encoder.go
  4. 30 0
      core/pkg/exporter/exporter.go
  5. 106 0
      core/pkg/exporter/exporter_test.go
  6. 0 1
      core/pkg/model/kubemodel/device.go
  7. 0 1
      core/pkg/model/kubemodel/device_usage.go
  8. 1 1
      core/pkg/model/kubemodel/kubemodel.go
  9. 676 118
      core/pkg/model/kubemodel/kubemodel_codecs.go
  10. 0 1
      core/pkg/model/kubemodel/node.go
  11. 0 1
      core/pkg/model/kubemodel/owner.go
  12. 0 1
      core/pkg/model/kubemodel/pv.go
  13. 0 1
      core/pkg/model/kubemodel/pvc.go
  14. 0 1
      core/pkg/model/kubemodel/service.go
  15. 32 4
      core/pkg/opencost/exporter/controllers.go
  16. 5 0
      core/pkg/opencost/exporter/exporter_test.go
  17. 79 0
      core/pkg/opencost/exporter/exporters.go
  18. 355 133
      core/pkg/opencost/opencost_codecs.go
  19. 26 0
      core/pkg/storage/azurestorage.go
  20. 25 0
      core/pkg/storage/bucketstorage.go
  21. 32 0
      core/pkg/storage/clusterstorage.go
  22. 112 0
      core/pkg/storage/clusterstorage_test.go
  23. 19 0
      core/pkg/storage/filestorage.go
  24. 6 0
      core/pkg/storage/filestorage_test.go
  25. 10 0
      core/pkg/storage/gcsstorage.go
  26. 44 0
      core/pkg/storage/memorystorage.go
  27. 5 0
      core/pkg/storage/memorystorage_test.go
  28. 5 0
      core/pkg/storage/prefixedbucketstorage.go
  29. 10 0
      core/pkg/storage/prefixedbucketstorage_test.go
  30. 29 1
      core/pkg/storage/s3storage.go
  31. 4 0
      core/pkg/storage/storage.go
  32. 84 0
      core/pkg/storage/test.go
  33. 273 82
      core/pkg/util/buffer.go
  34. 45 0
      core/pkg/util/buffer_test.go
  35. 110 8
      core/pkg/util/bufferhelper.go
  36. 24 0
      core/pkg/util/fileutil/locks_unix.go
  37. 15 0
      core/pkg/util/fileutil/locks_windows.go
  38. 54 0
      core/pkg/util/fileutil/writer.go
  39. 295 101
      modules/collector-source/pkg/metric/metric_codecs.go

+ 1 - 1
core/pkg/exporter/controller.go

@@ -115,7 +115,7 @@ func NewComputeExportController[T any](
 		source:     source,
 		resolution: resolution,
 		exporter:   exporter,
-		typeName:   reflect.TypeOf((*T)(nil)).Elem().String(),
+		typeName:   reflect.TypeFor[T]().String(),
 	}
 }
 

+ 100 - 2
core/pkg/exporter/decoder_test.go

@@ -1,6 +1,7 @@
 package exporter
 
 import (
+	"compress/gzip"
 	"reflect"
 	"testing"
 	"time"
@@ -10,6 +11,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/model"
 	"github.com/opencost/opencost/core/pkg/model/pb"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/storage"
 	"github.com/opencost/opencost/core/pkg/util"
 	"github.com/opencost/opencost/core/pkg/util/json"
 	"google.golang.org/protobuf/proto"
@@ -196,12 +198,12 @@ func TestGzipDecoder(t *testing.T) {
 	if err != nil {
 		t.Errorf("failed to marshal diagnostic: %s", err.Error())
 	}
-	diagCompressed, err := gZipEncode(diagRaw)
+	diagCompressed, err := gZipEncode(diagRaw, gzip.BestSpeed)
 	if err != nil {
 		t.Errorf("failed to compress diagnostic: %s", err.Error())
 	}
 
-	badCompressed, err := gZipEncode(badBytes)
+	badCompressed, err := gZipEncode(badBytes, gzip.BestSpeed)
 	if err != nil {
 		t.Errorf("failed to compress bad bytes: %s", err.Error())
 	}
@@ -289,6 +291,102 @@ func TestProtobufDecoder(t *testing.T) {
 	testProtoBufDecoder(t, ProtobufDecoder, labelsResponseTests)
 }
 
+func TestProtobufEncoderDecoderRoundTrip(t *testing.T) {
+	badBytes := generateBadBytes()
+
+	now := time.Now().UTC().Truncate(24 * time.Hour)
+	start := now.Add(-(24 * 5) * time.Hour)
+
+	store := storage.NewMemoryStorage()
+	writer, err := store.WriteStream("test.pb")
+	if err != nil {
+		t.Fatalf("failed to open writer: %s", err)
+		return
+	}
+
+	customCostSet := model.GenerateMockCustomCostSet(start, start.Add(24*time.Hour))
+
+	enc := NewProtobufEncoder[pb.CustomCostResponse]()
+	err = enc.EncodeTo(writer, customCostSet)
+	if err != nil {
+		_ = writer.Close()
+		t.Fatalf("Failed to encode to writer: %s", err)
+		return
+	}
+
+	if err = writer.Close(); err != nil {
+		t.Fatalf("failed to flush/close the writer; %s", err)
+		return
+	}
+
+	// load raw bytes from memory file system
+	customCostSetRaw, err := store.Read("test.pb")
+	if err != nil {
+		t.Errorf("failed to load custom cost set raw binary from memory disk: %s", err)
+		return
+	}
+
+	customCostTests := []decoderTestCase[pb.CustomCostResponse]{
+		{
+			name:    "custom cost valid",
+			data:    customCostSetRaw,
+			want:    customCostSet,
+			wantErr: false,
+		},
+		{
+			name:    "custom cost invalid",
+			data:    badBytes,
+			want:    nil,
+			wantErr: true,
+		},
+	}
+
+	testProtoBufDecoder(t, ProtobufDecoder, customCostTests)
+
+	labelsResponse := model.GenerateMockLabelResponse(start, pb.Resolution_RESOLUTION_1D)
+	labelsEnc := NewProtobufEncoder[pb.LabelsResponse]()
+	labelsWriter, err := store.WriteStream("test-labels.pb")
+	if err != nil {
+		t.Fatalf("failed to open labels writer: %s", err)
+		return
+	}
+
+	err = labelsEnc.EncodeTo(labelsWriter, labelsResponse)
+	if err != nil {
+		_ = labelsWriter.Close()
+		t.Fatalf("Failed to encode to labels writer: %s", err)
+		return
+	}
+
+	if err = labelsWriter.Close(); err != nil {
+		t.Fatalf("failed to flush/close the labels writer; %s", err)
+		return
+	}
+
+	labelsResponseRaw, err := store.Read("test-labels.pb")
+	if err != nil {
+		t.Fatalf("failed to marshal labels response: %s", err)
+		return
+	}
+
+	labelsResponseTests := []decoderTestCase[pb.LabelsResponse]{
+		{
+			name:    "labels response valid",
+			data:    labelsResponseRaw,
+			want:    labelsResponse,
+			wantErr: false,
+		},
+		{
+			name:    "labels response invalid",
+			data:    badBytes,
+			want:    nil,
+			wantErr: true,
+		},
+	}
+
+	testProtoBufDecoder(t, ProtobufDecoder, labelsResponseTests)
+}
+
 func testProtoBufDecoder[T any, U ProtoMessagePtr[T]](t *testing.T, decoder Decoder[T], testCases []decoderTestCase[T]) {
 	for _, tt := range testCases {
 		t.Run(tt.name, func(t *testing.T) {

+ 90 - 6
core/pkg/exporter/encoder.go

@@ -5,6 +5,7 @@ import (
 	"compress/gzip"
 	"encoding"
 	"fmt"
+	"io"
 
 	"github.com/opencost/opencost/core/pkg/util/json"
 	"google.golang.org/protobuf/encoding/protojson"
@@ -15,6 +16,10 @@ import (
 type Encoder[T any] interface {
 	Encode(*T) ([]byte, error)
 
+	// EncodeTo performs a streaming write to an io.Writer instead of writing and returning the full
+	// binary encoding.
+	EncodeTo(io.Writer, *T) error
+
 	// FileExt returns the file extension for the encoded data. This can be used by a pathing strategy
 	// to append the file extension when exporting the data. Returning an empty string will typically
 	// omit the file extension completely.
@@ -25,6 +30,7 @@ type Encoder[T any] interface {
 // encoding.BinaryMarshaler and are pointers to T.
 type BinaryMarshalerPtr[T any] interface {
 	encoding.BinaryMarshaler
+	MarshalBinaryTo(io.Writer) error
 	*T
 }
 
@@ -56,6 +62,13 @@ func (b *BingenEncoder[T, U]) Encode(data *T) ([]byte, error) {
 	return bingenData.MarshalBinary()
 }
 
+// EncodeTo performs a streaming write to an io.Writer instead of writing and returning the full
+// binary encoding.
+func (b *BingenEncoder[T, U]) EncodeTo(writer io.Writer, data *T) error {
+	var bingenData U = data
+	return bingenData.MarshalBinaryTo(writer)
+}
+
 // FileExt returns the configured file extension for the encoded data. This may be an empty
 // string when no file extension is configured, or a non-empty value such as "bingen".
 func (b *BingenEncoder[T, U]) FileExt() string {
@@ -76,6 +89,13 @@ func (j *JSONEncoder[T]) Encode(data *T) ([]byte, error) {
 	return json.Marshal(data)
 }
 
+// EncodeTo performs a streaming write to an io.Writer instead of writing and returning the full
+// binary encoding.
+func (j *JSONEncoder[T]) EncodeTo(writer io.Writer, data *T) error {
+	jsonWriter := json.NewEncoder(writer)
+	return jsonWriter.Encode(data)
+}
+
 // FileExt returns the file extension for the encoded data. In this case, it returns "json" to indicate
 // that the data is in JSON format.
 func (j *JSONEncoder[T]) FileExt() string {
@@ -84,13 +104,21 @@ func (j *JSONEncoder[T]) FileExt() string {
 
 type GZipEncoder[T any] struct {
 	encoder Encoder[T]
+	level   int
 }
 
 // NewGZipEncoder creates a new GZip encoder which wraps the provided encoder.
 // The encoder is used to encode the data before compressing it with GZip.
 func NewGZipEncoder[T any](encoder Encoder[T]) Encoder[T] {
+	return NewGZipEncoderWithLevel(encoder, gzip.DefaultCompression)
+}
+
+// NewGZipEncoderWithLevel creates a new GZip encoder which wraps the provided encoder,
+// and uses the specified encoding level when gzipping.
+func NewGZipEncoderWithLevel[T any](encoder Encoder[T], level int) Encoder[T] {
 	return &GZipEncoder[T]{
 		encoder: encoder,
+		level:   level,
 	}
 }
 
@@ -101,23 +129,43 @@ func (gz *GZipEncoder[T]) Encode(data *T) ([]byte, error) {
 		return nil, fmt.Errorf("GZipEncoder: nested encode failure: %w", err)
 	}
 
-	compressed, err := gZipEncode(encoded)
+	compressed, err := gZipEncode(encoded, gz.level)
 	if err != nil {
 		return nil, fmt.Errorf("GZipEncoder: failed to compress encoded data: %w", err)
 	}
 	return compressed, nil
 }
 
-func gZipEncode(data []byte) ([]byte, error) {
+// EncodeTo performs a streaming write to an io.Writer instead of writing and returning the full
+// binary encoding.
+func (gz *GZipEncoder[T]) EncodeTo(writer io.Writer, data *T) error {
+	gzWriter, err := gzip.NewWriterLevel(writer, gz.level)
+	if err != nil {
+		return fmt.Errorf("failed to create gzip writer: %w", err)
+	}
+	if err := gz.encoder.EncodeTo(gzWriter, data); err != nil {
+		_ = gzWriter.Close()
+		return fmt.Errorf("failed to encode to gzip writer: %w", err)
+	}
+
+	return gzWriter.Close()
+}
+
+func gZipEncode(data []byte, level int) ([]byte, error) {
 	var buf bytes.Buffer
 
-	gzWriter, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
+	gzWriter, err := gzip.NewWriterLevel(&buf, level)
 	if err != nil {
 		return nil, err
 	}
 
-	gzWriter.Write(data)
-	gzWriter.Close()
+	if _, err := gzWriter.Write(data); err != nil {
+		_ = gzWriter.Close()
+		return nil, err
+	}
+	if err := gzWriter.Close(); err != nil {
+		return nil, err
+	}
 
 	return buf.Bytes(), nil
 }
@@ -125,7 +173,11 @@ func gZipEncode(data []byte) ([]byte, error) {
 // FileExt returns the file extension for the encoded data. In this case, it returns the wrapped encoder's
 // file extension with ".gz" appended to indicate that the data is compressed with GZip.
 func (gz *GZipEncoder[T]) FileExt() string {
-	return gz.encoder.FileExt() + ".gz"
+	prev := gz.encoder.FileExt()
+	if prev == "" {
+		return "gz"
+	}
+	return prev + ".gz"
 }
 
 // ProtoMessagePtr [T] is a generic constraint to ensure types passed to the encoder implement
@@ -155,6 +207,21 @@ func (p *ProtobufEncoder[T, U]) Encode(data *T) ([]byte, error) {
 	return raw, nil
 }
 
+// EncodeTo performs a streaming write to an io.Writer instead of writing and returning the full
+// binary encoding.
+func (p *ProtobufEncoder[T, U]) EncodeTo(writer io.Writer, data *T) error {
+	var message U = data
+	bytes, err := proto.Marshal(message)
+	if err != nil {
+		return fmt.Errorf("failed to encode protobuf message: %w", err)
+	}
+	if _, err = writer.Write(bytes); err != nil {
+		return fmt.Errorf("failed to write encoded message to writer: %w", err)
+	}
+
+	return nil
+}
+
 // FileExt returns the file extension for the encoded data. In this case, it returns an empty string
 // to indicate that there is no specific file extension for the binary encoded data.
 func (p *ProtobufEncoder[T, U]) FileExt() string {
@@ -181,6 +248,23 @@ func (p *ProtoJsonEncoder[T, U]) Encode(data *T) ([]byte, error) {
 	return raw, nil
 }
 
+// EncodeTo performs a streaming write to an io.Writer instead of writing and returning the full
+// binary encoding.
+func (p *ProtoJsonEncoder[T, U]) EncodeTo(writer io.Writer, data *T) error {
+	var message U = data
+	// protojson doesn't have a way to marshal directly to an io.Writer, so we'll encode as normal,
+	// and write the resulting data out to the writer
+	bytes, err := protojson.Marshal(message)
+	if err != nil {
+		return fmt.Errorf("failed to marshal protojson: %w", err)
+	}
+	_, err = writer.Write(bytes)
+	if err != nil {
+		return fmt.Errorf("failed to write encoded protojson to writer: %w", err)
+	}
+	return nil
+}
+
 // FileExt returns the file extension for the encoded data. In this case, it returns an empty string
 // to indicate that there is no specific file extension for the binary encoded data.
 func (p *ProtoJsonEncoder[T, U]) FileExt() string {

+ 30 - 0
core/pkg/exporter/exporter.go

@@ -73,6 +73,7 @@ type ComputeStorageExporter[T any] struct {
 	encoder    Encoder[T]
 	storage    storage.Storage
 	validator  validator.ExportValidator[T]
+	streaming  bool
 }
 
 // NewComputeStorageExporter creates a new ComputeStorageExporter instance, which is responsible for exporting
@@ -84,12 +85,14 @@ func NewComputeStorageExporter[T any](
 	encoder Encoder[T],
 	storage storage.Storage,
 	validator validator.ExportValidator[T],
+	streaming bool,
 ) ComputeExporter[T] {
 	return &ComputeStorageExporter[T]{
 		paths:     paths,
 		encoder:   encoder,
 		storage:   storage,
 		validator: validator,
+		streaming: streaming,
 	}
 }
 
@@ -115,6 +118,16 @@ func (se *ComputeStorageExporter[T]) Export(window opencost.Window, data *T) err
 		return nil
 	}
 
+	// stream the data structure to the storage path if we select streaming
+	if se.streaming {
+		return se.streamingUpload(path, data)
+	}
+
+	// otherwise, just encode and write the encoded result directly
+	return se.encodeAndUpload(path, data)
+}
+
+func (se *ComputeStorageExporter[T]) encodeAndUpload(path string, data *T) error {
 	bin, err := se.encoder.Encode(data)
 	if err != nil {
 		return fmt.Errorf("failed to encode data: %w", err)
@@ -128,3 +141,20 @@ func (se *ComputeStorageExporter[T]) Export(window opencost.Window, data *T) err
 
 	return nil
 }
+
+func (se *ComputeStorageExporter[T]) streamingUpload(path string, data *T) error {
+	writer, err := se.storage.WriteStream(path)
+	if err != nil {
+		return fmt.Errorf("failed to create streaming storage writer: %w", err)
+	}
+
+	if err = se.encoder.EncodeTo(writer, data); err != nil {
+		_ = writer.Close()
+		return fmt.Errorf("failed to stream encoding for exporter: %w", err)
+	}
+
+	if err = writer.Close(); err != nil {
+		return fmt.Errorf("failed to flush and close writer after write: %w", err)
+	}
+	return nil
+}

+ 106 - 0
core/pkg/exporter/exporter_test.go

@@ -1,6 +1,8 @@
 package exporter
 
 import (
+	"bytes"
+	"compress/gzip"
 	"testing"
 	"time"
 
@@ -70,12 +72,61 @@ func TestStorageExporters(t *testing.T) {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
 
+		encoder := NewBingenEncoder[opencost.AllocationSet]()
+		export := NewComputeStorageExporter(
+			p,
+			encoder,
+			store,
+			validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
+			false,
+		)
+
+		start := time.Now().UTC().Truncate(res)
+		end := start.Add(res)
+
+		toExport := opencost.GenerateMockAllocationSet(start)
+		err = export.Export(opencost.NewClosedWindow(start, end), toExport)
+		if err != nil {
+			t.Fatalf("failed to export data: %v", err)
+		}
+
+		expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "")
+
+		data, err := store.Read(expectedPath)
+		if err != nil {
+			t.Fatalf("failed to read data from store: %v", err)
+		}
+
+		if len(data) == 0 {
+			t.Fatalf("expected data to be non-empty, got empty")
+		}
+
+		var as *opencost.AllocationSet = new(opencost.AllocationSet)
+		err = as.UnmarshalBinary(data)
+		if err != nil {
+			t.Fatalf("failed to unmarshal data: %v", err)
+		}
+
+		if as.IsEmpty() {
+			t.Fatalf("expected allocation set to be non-empty, got empty")
+		}
+	})
+
+	t.Run("test streaming compute storage exporter", func(t *testing.T) {
+		res := 24 * time.Hour
+		store := storage.NewMemoryStorage()
+		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
 		encoder := NewBingenEncoder[opencost.AllocationSet]()
 		export := NewComputeStorageExporter[opencost.AllocationSet](
 			p,
 			encoder,
 			store,
 			validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
+			true,
 		)
 
 		start := time.Now().UTC().Truncate(res)
@@ -108,4 +159,59 @@ func TestStorageExporters(t *testing.T) {
 			t.Fatalf("expected allocation set to be non-empty, got empty")
 		}
 	})
+
+	t.Run("test compressed streaming compute storage exporter", func(t *testing.T) {
+		res := 24 * time.Hour
+		store := storage.NewMemoryStorage()
+		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
+		encoder := NewGZipEncoderWithLevel(NewBingenEncoder[opencost.AllocationSet](), gzip.BestSpeed)
+		export := NewComputeStorageExporter(
+			p,
+			encoder,
+			store,
+			validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
+			true,
+		)
+
+		start := time.Now().UTC().Truncate(res)
+		end := start.Add(res)
+
+		toExport := opencost.GenerateMockAllocationSet(start)
+		err = export.Export(opencost.NewClosedWindow(start, end), toExport)
+		if err != nil {
+			t.Fatalf("failed to export data: %v", err)
+		}
+
+		expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "gz")
+		t.Logf("Reading from path: %s\n", expectedPath)
+
+		data, err := store.Read(expectedPath)
+		if err != nil {
+			t.Fatalf("failed to read data from store: %v", err)
+		}
+
+		if len(data) == 0 {
+			t.Fatalf("expected data to be non-empty, got empty")
+		}
+
+		reader, err := gzip.NewReader(bytes.NewReader(data))
+		if err != nil {
+			t.Fatalf("failed to create gzip reader")
+		}
+		defer reader.Close()
+
+		var as *opencost.AllocationSet = new(opencost.AllocationSet)
+		err = as.UnmarshalBinaryFromReader(reader)
+		if err != nil {
+			t.Fatalf("failed to unmarshal data: %v", err)
+		}
+
+		if as.IsEmpty() {
+			t.Fatalf("expected allocation set to be non-empty, got empty")
+		}
+	})
 }

+ 0 - 1
core/pkg/model/kubemodel/device.go

@@ -6,7 +6,6 @@ import (
 	"time"
 )
 
-// @bingen:generate:Device
 type Device struct {
 	UID               string      `json:"uid"`            // Device UUID (hardware identifier)
 	Type              string      `json:"type,omitempty"` // Device type (e.g., "device", "tpu")

+ 0 - 1
core/pkg/model/kubemodel/device_usage.go

@@ -6,7 +6,6 @@ import (
 	"time"
 )
 
-// @bingen:generate:DeviceUsage
 type DeviceUsage struct {
 	ContainerUID          string      `json:"containerUid"`
 	DeviceUID             string      `json:"deviceUid"`

+ 1 - 1
core/pkg/model/kubemodel/kubemodel.go

@@ -6,7 +6,7 @@ import (
 
 // TODO: should we add a lock so that we can safely modify KubeModelSet in parallel?
 
-// @bingen:generate[stringtable]:KubeModelSet
+// @bingen:generate[streamable,stringtable]:KubeModelSet
 type KubeModelSet struct {
 	Metadata               *Metadata                         `json:"meta"`                   // @bingen:field[version=1]
 	Window                 Window                            `json:"window"`                 // @bingen:field[version=1]

Разница между файлами не показана из-за своего большого размера
+ 676 - 118
core/pkg/model/kubemodel/kubemodel_codecs.go


+ 0 - 1
core/pkg/model/kubemodel/node.go

@@ -5,7 +5,6 @@ import (
 	"time"
 )
 
-// @bingen:generate:Node
 // Node represents a Kubernetes node with capacity-based resource tracking.
 // All resource measures (CPU, RAM) represent node capacity, not requests or limits.
 // This aligns with the principle that cost allocation should be based on provisioned capacity.

+ 0 - 1
core/pkg/model/kubemodel/owner.go

@@ -17,7 +17,6 @@ const (
 )
 
 // Owner represents a Kubernetes resource owner (workload controller)
-// @bingen:generate:Owner
 type Owner struct {
 	UID          string            `json:"uid"`
 	NamespaceUID string            `json:"namespaceUid"`

+ 0 - 1
core/pkg/model/kubemodel/pv.go

@@ -5,7 +5,6 @@ import (
 	"time"
 )
 
-// @bingen:generate:PersistentVolume
 type PersistentVolume struct {
 	// Version 1 fields
 	UID          string            `json:"uid"`

+ 0 - 1
core/pkg/model/kubemodel/pvc.go

@@ -5,7 +5,6 @@ import (
 	"time"
 )
 
-// @bingen:generate:PersistentVolumeClaim
 type PersistentVolumeClaim struct {
 	// Version 1 fields
 	UID                string            `json:"uid"`

+ 0 - 1
core/pkg/model/kubemodel/service.go

@@ -19,7 +19,6 @@ type ServicePort struct {
 	Protocol   string `json:"protocol"`
 }
 
-// @bingen:generate:Service
 // Service represents a Kubernetes Service with network traffic tracking for cost allocation.
 //
 // Network Cost Allocation Strategy:

+ 32 - 4
core/pkg/opencost/exporter/controllers.go

@@ -37,6 +37,8 @@ type PipelinesExportConfig struct {
 	AssetPipelineResolutons           []time.Duration
 	NetworkInsightPipelineResolutions []time.Duration
 	KubeModelPipelineResolutions      []time.Duration
+	Streaming                         bool
+	Compression                       ExportCompressionLevel
 }
 
 // defaultPipelineExportResolutions returns the default export configuration for the pipeline
@@ -58,6 +60,8 @@ func NewPipelinesExportConfig(clusterUID, clusterName string) PipelinesExportCon
 		AssetPipelineResolutons:           defaultPipelineExportResolutions(),
 		NetworkInsightPipelineResolutions: defaultPipelineExportResolutions(),
 		KubeModelPipelineResolutions:      defaultPipelineExportResolutions(),
+		Streaming:                         false,
+		Compression:                       ExportCompressionLevelNone,
 	}
 }
 
@@ -91,7 +95,13 @@ func NewPipelineExportControllers(store storage.Storage, cm ComputePipelineSourc
 		}
 
 		// Use ClusterName for "clusterId" here to maintain legacy pattern
-		allocController, err := NewComputePipelineExportController(config.ClusterName, store, allocSource, res)
+		var allocController *export.ComputeExportController[opencost.AllocationSet]
+		var err error
+		if config.Streaming {
+			allocController, err = NewStreamingComputePipelineExportController(config.ClusterName, store, allocSource, res, config.Compression)
+		} else {
+			allocController, err = NewComputePipelineExportController(config.ClusterName, store, allocSource, res)
+		}
 		if err != nil {
 			log.Errorf("Failed to create allocation export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue
@@ -111,7 +121,13 @@ func NewPipelineExportControllers(store storage.Storage, cm ComputePipelineSourc
 		}
 
 		// Use ClusterName for "clusterId" here to maintain legacy pattern
-		assetController, err := NewComputePipelineExportController(config.ClusterName, store, assetSource, res)
+		var assetController *export.ComputeExportController[opencost.AssetSet]
+		var err error
+		if config.Streaming {
+			assetController, err = NewStreamingComputePipelineExportController(config.ClusterName, store, assetSource, res, config.Compression)
+		} else {
+			assetController, err = NewComputePipelineExportController(config.ClusterName, store, assetSource, res)
+		}
 		if err != nil {
 			log.Errorf("Failed to create asset export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue
@@ -131,7 +147,13 @@ func NewPipelineExportControllers(store storage.Storage, cm ComputePipelineSourc
 		}
 
 		// Use ClusterName for "clusterId" here to maintain legacy pattern
-		networkInsightController, err := NewComputePipelineExportController(config.ClusterName, store, networkInsightSource, res)
+		var networkInsightController *export.ComputeExportController[opencost.NetworkInsightSet]
+		var err error
+		if config.Streaming {
+			networkInsightController, err = NewStreamingComputePipelineExportController(config.ClusterName, store, networkInsightSource, res, config.Compression)
+		} else {
+			networkInsightController, err = NewComputePipelineExportController(config.ClusterName, store, networkInsightSource, res)
+		}
 		if err != nil {
 			log.Errorf("Failed to create network insight export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue
@@ -150,7 +172,13 @@ func NewPipelineExportControllers(store storage.Storage, cm ComputePipelineSourc
 			continue
 		}
 
-		kubeModelController, err := NewComputePipelineExportController(config.ClusterUID, store, kubeModelSource, res)
+		var kubeModelController *export.ComputeExportController[kubemodel.KubeModelSet]
+		var err error
+		if config.Streaming {
+			kubeModelController, err = NewStreamingComputePipelineExportController(config.ClusterUID, store, kubeModelSource, res, config.Compression)
+		} else {
+			kubeModelController, err = NewComputePipelineExportController(config.ClusterUID, store, kubeModelSource, res)
+		}
 		if err != nil {
 			log.Errorf("Failed to create KubeModel export controller for resolution: %s - %v", timeutil.DurationString(res), err)
 			continue

+ 5 - 0
core/pkg/opencost/exporter/exporter_test.go

@@ -1,6 +1,7 @@
 package exporter
 
 import (
+	"io"
 	"testing"
 	"time"
 
@@ -138,6 +139,10 @@ type UnknownSet struct{}
 func (u *UnknownSet) MarshalBinary() ([]byte, error) {
 	return []byte{}, nil
 }
+func (u *UnknownSet) MarshalBinaryTo(writer io.Writer) error {
+	return nil
+}
+
 func (u *UnknownSet) UnmarshalBinary(data []byte) error {
 	return nil
 }

+ 79 - 0
core/pkg/opencost/exporter/exporters.go

@@ -1,6 +1,7 @@
 package exporter
 
 import (
+	"compress/gzip"
 	"fmt"
 	"time"
 
@@ -12,6 +13,28 @@ import (
 	"github.com/opencost/opencost/core/pkg/util/typeutil"
 )
 
+// ExportCompressionLevel is an enumeration value for allowing a streaming compute exporter to enable
+// compression at specific gzip levels.
+type ExportCompressionLevel int
+
+// IsValid returns false when the integer value of the `ExportCompressionLevel` isn't a valid input.
+func (ecl ExportCompressionLevel) IsValid() bool {
+	// level is default or none
+	if ecl == ExportCompressionLevelNone || ecl == ExportCompressionLevelDefault {
+		return true
+	}
+
+	// level is within 1-9 bounds
+	return ecl >= ExportCompressionLevelBestSpeed && ecl <= ExportCompressionLevelBestCompression
+}
+
+const (
+	ExportCompressionLevelNone            ExportCompressionLevel = gzip.NoCompression
+	ExportCompressionLevelBestSpeed       ExportCompressionLevel = gzip.BestSpeed
+	ExportCompressionLevelBestCompression ExportCompressionLevel = gzip.BestCompression
+	ExportCompressionLevelDefault         ExportCompressionLevel = gzip.DefaultCompression
+)
+
 // NewComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
 // by window for a specific pipeline.
 func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
@@ -34,6 +57,45 @@ func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validat
 		export.NewBingenEncoder[T, U](),
 		store,
 		validator.NewSetValidator[T, S](resolution),
+		false,
+	), nil
+}
+
+// NewStreamingComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
+// by window for a specific pipeline.
+func NewStreamingComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
+	clusterId string,
+	resolution time.Duration,
+	store storage.Storage,
+	compressionLevel ExportCompressionLevel,
+) (export.ComputeExporter[T], error) {
+	pipelineName := pipelines.NameFor[T]()
+	if pipelineName == "" {
+		return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
+	}
+
+	pathing, err := pathing.NewDefaultStoragePathFormatter(clusterId, pipelineName, &resolution)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create path formatter: %w", err)
+	}
+
+	if !compressionLevel.IsValid() {
+		return nil, fmt.Errorf("invalid compression level passed: %d is not a valid compression level", int(compressionLevel))
+	}
+
+	var encoder export.Encoder[T]
+	if compressionLevel != ExportCompressionLevelNone {
+		encoder = export.NewGZipEncoderWithLevel(export.NewBingenEncoder[T, U](), int(compressionLevel))
+	} else {
+		encoder = export.NewBingenEncoder[T, U]()
+	}
+
+	return export.NewComputeStorageExporter(
+		pathing,
+		encoder,
+		store,
+		validator.NewSetValidator[T, S](resolution),
+		true,
 	), nil
 }
 
@@ -52,3 +114,20 @@ func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S
 
 	return export.NewComputeExportController(source, exporter, resolution), nil
 }
+
+// NewStreamingComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to stream/export the
+// computed data using the provided source, storage, resolution, and source resolution.
+func NewStreamingComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
+	clusterId string,
+	store storage.Storage,
+	source export.ComputeSource[T],
+	resolution time.Duration,
+	compressionLevel ExportCompressionLevel,
+) (*export.ComputeExportController[T], error) {
+	exporter, err := NewStreamingComputePipelineExporter[T, U, S](clusterId, resolution, store, compressionLevel)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create compute exporter: %w", err)
+	}
+
+	return export.NewComputeExportController(source, exporter, resolution), nil
+}

Разница между файлами не показана из-за своего большого размера
+ 355 - 133
core/pkg/opencost/opencost_codecs.go


+ 26 - 0
core/pkg/storage/azurestorage.go

@@ -337,6 +337,32 @@ func (b *AzureStorage) Write(name string, data []byte) error {
 	return nil
 }
 
+// WriteStream uses the relative path of the storage combined with the provided path
+// to write a new file or overwrite an existing file. The returned `io.WriteCloser` _must_
+// be closed to complete the write.
+func (b *AzureStorage) WriteStream(name string) (io.WriteCloser, error) {
+	name = trimLeading(name)
+	ctx := context.Background()
+
+	log.Debugf("AzureStorage::WriteStream::HTTPS(%s)", name)
+
+	r, w := io.Pipe()
+	blobClient := b.containerClient.NewBlockBlobClient(name)
+	doneCh := make(chan error, 1)
+
+	go func() {
+		_, err := blobClient.UploadStream(ctx, r, &blockblob.UploadStreamOptions{
+			BlockSize:   4 * 1024 * 1024,
+			Concurrency: 4,
+		})
+		wrapped := errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
+		r.CloseWithError(wrapped)
+		doneCh <- wrapped
+	}()
+
+	return newAsyncPipeWriter(w, doneCh), nil
+}
+
 // Remove uses the relative path of the storage combined with the provided path to
 // remove a file from storage permanently.
 func (b *AzureStorage) Remove(name string) error {

+ 25 - 0
core/pkg/storage/bucketstorage.go

@@ -2,6 +2,7 @@ package storage
 
 import (
 	"fmt"
+	"io"
 	"strings"
 
 	"github.com/pkg/errors"
@@ -65,6 +66,30 @@ func NewBucketStorage(config []byte) (Storage, error) {
 	return storage, nil
 }
 
+// asyncPipeWriter wraps *io.PipeWriter so that Close() blocks until the
+// background upload goroutine finishes and surfaces any upload error to the caller.
+type asyncPipeWriter struct {
+	*io.PipeWriter
+	done <-chan error
+}
+
+// newAsyncPipeWriter creates a new async pipe writer that implements io.WriteCloser that
+// handles asynchronous closing of an io.PipeWriter
+func newAsyncPipeWriter(writer *io.PipeWriter, done <-chan error) *asyncPipeWriter {
+	return &asyncPipeWriter{
+		PipeWriter: writer,
+		done:       done,
+	}
+}
+
+// Close propagates any errors that were received on the pipewriter or reader.
+func (apw *asyncPipeWriter) Close() error {
+	if err := apw.PipeWriter.Close(); err != nil {
+		return err
+	}
+	return <-apw.done
+}
+
 // trimLeading removes a leading / from the file name
 func trimLeading(file string) string {
 	if len(file) == 0 {

+ 32 - 0
core/pkg/storage/clusterstorage.go

@@ -339,6 +339,38 @@ func (c *ClusterStorage) Write(path string, data []byte) error {
 	return nil
 }
 
+func (c *ClusterStorage) WriteStream(path string) (io.WriteCloser, error) {
+	log.Debugf("ClusterStorage::WriteStream::%s(%s)", strings.ToUpper(c.scheme()), path)
+
+	fn := func(resp *http.Response) error {
+		return nil
+	}
+
+	args := map[string]string{
+		"path": path,
+	}
+
+	r, w := io.Pipe()
+	doneCh := make(chan error, 1)
+
+	go func() {
+		err := c.makeRequest(
+			http.MethodPut,
+			c.getURL("clusterStorage/write", args),
+			r,
+			fn,
+		)
+		var uploadErr error
+		if err != nil {
+			uploadErr = fmt.Errorf("ClusterStorage: WriteStream: %w", err)
+			r.CloseWithError(uploadErr)
+		}
+		doneCh <- uploadErr
+	}()
+
+	return newAsyncPipeWriter(w, doneCh), nil
+}
+
 func (c *ClusterStorage) Remove(path string) error {
 	log.Debugf("ClusterStorage::Remove::%s(%s)", strings.ToUpper(c.scheme()), path)
 

+ 112 - 0
core/pkg/storage/clusterstorage_test.go

@@ -1,6 +1,7 @@
 package storage
 
 import (
+	"bytes"
 	"crypto/tls"
 	"encoding/json"
 	"io"
@@ -171,3 +172,114 @@ func TestClusterStorage_ReadStream(t *testing.T) {
 		t.Fatalf("stream contents mismatch: got %q want %q", string(data), string(expected))
 	}
 }
+
+func TestClusterStorage_WriteStream(t *testing.T) {
+	writeHandler := func(captured *[]byte) http.HandlerFunc {
+		return func(w http.ResponseWriter, r *http.Request) {
+			if r.Method != http.MethodPut || r.URL.Path != "/clusterStorage/write" {
+				w.WriteHeader(http.StatusNotFound)
+				return
+			}
+			var err error
+			*captured, err = io.ReadAll(r.Body)
+			if err != nil {
+				w.WriteHeader(http.StatusInternalServerError)
+				return
+			}
+			w.WriteHeader(http.StatusOK)
+		}
+	}
+
+	t.Run("single chunk reaches server and Close returns nil", func(t *testing.T) {
+		want := []byte("hello from stream")
+		var received []byte
+
+		srv := httptest.NewServer(writeHandler(&received))
+		defer srv.Close()
+
+		cs := newClusterStorageFromURL(t, srv.URL)
+
+		w, err := cs.WriteStream("some/path")
+		if err != nil {
+			t.Fatalf("WriteStream: %s", err)
+		}
+		if _, err = w.Write(want); err != nil {
+			_ = w.Close()
+			t.Fatalf("Write: %s", err)
+		}
+		if err = w.Close(); err != nil {
+			t.Fatalf("Close: %s", err)
+		}
+
+		// Checking received immediately after Close() proves it is synchronous.
+		if !bytes.Equal(received, want) {
+			t.Errorf("body mismatch: got %q, want %q", received, want)
+		}
+	})
+
+	t.Run("multi-chunk write concatenates correctly", func(t *testing.T) {
+		chunks := [][]byte{[]byte("alpha"), []byte("beta"), []byte("gamma")}
+		want := bytes.Join(chunks, nil)
+		var received []byte
+
+		srv := httptest.NewServer(writeHandler(&received))
+		defer srv.Close()
+
+		cs := newClusterStorageFromURL(t, srv.URL)
+
+		w, err := cs.WriteStream("some/path")
+		if err != nil {
+			t.Fatalf("WriteStream: %s", err)
+		}
+		for _, chunk := range chunks {
+			if _, err = w.Write(chunk); err != nil {
+				_ = w.Close()
+				t.Fatalf("Write: %s", err)
+			}
+		}
+		if err = w.Close(); err != nil {
+			t.Fatalf("Close: %s", err)
+		}
+
+		if !bytes.Equal(received, want) {
+			t.Errorf("body mismatch: got %q, want %q", received, want)
+		}
+	})
+
+	t.Run("server error propagates through Close", func(t *testing.T) {
+		srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+			_, _ = io.Copy(io.Discard, r.Body)
+			w.WriteHeader(http.StatusInternalServerError)
+		}))
+		defer srv.Close()
+
+		cs := newClusterStorageFromURL(t, srv.URL)
+
+		w, err := cs.WriteStream("some/path")
+		if err != nil {
+			t.Fatalf("WriteStream: %s", err)
+		}
+		_, _ = w.Write([]byte("data"))
+		if err = w.Close(); err == nil {
+			t.Fatalf("expected error from Close on server error, got nil")
+		}
+	})
+}
+
+// newClusterStorageFromURL constructs a ClusterStorage pointed at the given test server URL.
+func newClusterStorageFromURL(t *testing.T, rawURL string) *ClusterStorage {
+	t.Helper()
+	u, err := url.Parse(rawURL)
+	if err != nil {
+		t.Fatalf("parsing test server URL: %s", err)
+	}
+	port, err := strconv.Atoi(u.Port())
+	if err != nil {
+		t.Fatalf("parsing test server port: %s", err)
+	}
+	return &ClusterStorage{
+		client: &http.Client{},
+		host:   u.Hostname(),
+		port:   port,
+	}
+}

+ 19 - 0
core/pkg/storage/filestorage.go

@@ -184,6 +184,25 @@ func (fs *FileStorage) Write(path string, data []byte) error {
 	return nil
 }
 
+// WriteStream uses the relative path of the storage combined with the provided path
+// to write a new file or overwrite an existing file.
+//
+// It takes advantage of flock() based locking to improve safety. The returned `io.WriteCloser`
+// must be closed.
+func (fs *FileStorage) WriteStream(path string) (io.WriteCloser, error) {
+	f, err := fs.prepare(path)
+	if err != nil {
+		return nil, errors.Wrap(err, "Failed to prepare path")
+	}
+
+	w, err := fileutil.NewLockedFileWriter(f)
+	if err != nil {
+		return nil, fmt.Errorf("failed to load file writer for: %s - %w", f, err)
+	}
+
+	return w, nil
+}
+
 // Remove uses the relative path of the storage combined with the provided path to
 // remove a file from storage permanently.
 func (fs *FileStorage) Remove(path string) error {

+ 6 - 0
core/pkg/storage/filestorage_test.go

@@ -47,3 +47,9 @@ func TestFileStorage_ReadStream(t *testing.T) {
 	store := NewFileStorage(storeBaseDir)
 	TestStorageReadStream(t, store)
 }
+
+func TestFileStorage_WriteStream(t *testing.T) {
+	storeBaseDir := t.TempDir()
+	store := NewFileStorage(storeBaseDir)
+	TestStorageWriteStream(t, store)
+}

+ 10 - 0
core/pkg/storage/gcsstorage.go

@@ -236,6 +236,16 @@ func (gs *GCSStorage) Write(name string, data []byte) error {
 	return nil
 }
 
+// WriteStream uses the relative path of the storage combined with the provided path
+// to write a new file or overwrite an existing file.
+func (gs *GCSStorage) WriteStream(name string) (io.WriteCloser, error) {
+	name = trimLeading(name)
+	log.Debugf("GCSStorage::WriteStream::HTTPS(%s)", name)
+
+	ctx := context.Background()
+	return gs.bucket.Object(name).NewWriter(ctx), nil
+}
+
 // Remove uses the relative path of the storage combined with the provided path to
 // remove a file from storage permanently.
 func (gs *GCSStorage) Remove(name string) error {

+ 44 - 0
core/pkg/storage/memorystorage.go

@@ -132,6 +132,50 @@ func (ms *MemoryStorage) Write(path string, data []byte) error {
 	return nil
 }
 
+// WriteStream creates a new relative path and returns the io.WriteCloser that can be used to
+// write into the storage path. Close() blocks until all data has been committed to storage.
+func (ms *MemoryStorage) WriteStream(path string) (io.WriteCloser, error) {
+	r, w := io.Pipe()
+	var wg sync.WaitGroup
+
+	wg.Go(func() {
+		data, err := io.ReadAll(r)
+		if err != nil {
+			r.CloseWithError(err)
+			return
+		}
+
+		ms.lock.Lock()
+		defer ms.lock.Unlock()
+
+		paths, pFile := memfile.Split(path)
+
+		f := memfile.NewMemoryFile(pFile, data)
+		currentDir := memfile.CreateSubdirectory(ms.fileTree, paths)
+
+		currentDir.AddFile(f)
+		ms.directPaths[path] = f
+	})
+
+	return &memWriteCloser{
+		PipeWriter: w,
+		wg:         &wg,
+	}, nil
+}
+
+// memWriteCloser wraps *io.PipeWriter so that Close() blocks until the drain goroutine
+// has finished committing data to the in-memory store.
+type memWriteCloser struct {
+	*io.PipeWriter
+	wg *sync.WaitGroup
+}
+
+func (m *memWriteCloser) Close() error {
+	err := m.PipeWriter.Close()
+	m.wg.Wait()
+	return err
+}
+
 // Remove uses the relative path of the storage combined with the provided path to
 // remove a file from storage permanently.
 func (ms *MemoryStorage) Remove(path string) error {

+ 5 - 0
core/pkg/storage/memorystorage_test.go

@@ -390,3 +390,8 @@ func TestMemoryStorage_ReadStream(t *testing.T) {
 	store := NewMemoryStorage()
 	TestStorageReadStream(t, store)
 }
+
+func TestMemoryStorage_WriteStream(t *testing.T) {
+	store := NewMemoryStorage()
+	TestStorageWriteStream(t, store)
+}

+ 5 - 0
core/pkg/storage/prefixedbucketstorage.go

@@ -106,3 +106,8 @@ func (pbs *PrefixedBucketStorage) Stat(name string) (*StorageInfo, error) {
 func (pbs *PrefixedBucketStorage) Write(name string, data []byte) error {
 	return pbs.storage.Write(conditionalPrefix(pbs.prefix, name), data)
 }
+
+// WriteStream uploads the contents written to the returned writer as an object into the bucket.
+func (pbs *PrefixedBucketStorage) WriteStream(name string) (io.WriteCloser, error) {
+	return pbs.storage.WriteStream(conditionalPrefix(pbs.prefix, name))
+}

+ 10 - 0
core/pkg/storage/prefixedbucketstorage_test.go

@@ -21,3 +21,13 @@ func TestPrefixedBucketStorage_ReadStream(t *testing.T) {
 
 	TestStorageReadStream(t, store)
 }
+
+func TestPrefixedBucketStorage_WriteStream(t *testing.T) {
+	base := NewMemoryStorage()
+	store, err := NewPrefixedBucketStorage(base, "myprefix")
+	if err != nil {
+		t.Fatalf("failed to create prefixed storage: %s", err)
+	}
+
+	TestStorageWriteStream(t, store)
+}

+ 29 - 1
core/pkg/storage/s3storage.go

@@ -448,7 +448,6 @@ func (s3 *S3Storage) Write(name string, data []byte) error {
 	// the sub-parts. To remain consistent with other storage implementations,
 	// we would rather attempt to lower cost fast upload and fast-fail.
 	var partSize uint64 = 0
-
 	r := bytes.NewReader(data)
 	_, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
 		PartSize:             partSize,
@@ -463,6 +462,35 @@ func (s3 *S3Storage) Write(name string, data []byte) error {
 	return nil
 }
 
+// Upload the contents of the reader as an object into the bucket. The returned `io.WriteCloser` must
+// be closed to finalize the write.
+func (s3 *S3Storage) WriteStream(name string) (io.WriteCloser, error) {
+	name = trimLeading(name)
+
+	log.Debugf("S3Storage::WriteStream::%s(%s)", s3.protocol(), name)
+
+	ctx := context.Background()
+	sse, err := s3.getServerSideEncryption(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	r, w := io.Pipe()
+	doneCh := make(chan error, 1)
+
+	go func() {
+		_, err = s3.client.PutObject(ctx, s3.name, name, r, -1, minio.PutObjectOptions{
+			ServerSideEncryption: sse,
+			UserMetadata:         s3.putUserMetadata,
+		})
+		wrapped := errors.Wrap(err, "upload s3 object")
+		r.CloseWithError(wrapped)
+		doneCh <- wrapped
+	}()
+
+	return newAsyncPipeWriter(w, doneCh), nil
+}
+
 // Attributes returns information about the specified object.
 func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
 	name = trimLeading(name)

+ 4 - 0
core/pkg/storage/storage.go

@@ -53,6 +53,10 @@ type Storage interface {
 	// to write a new file or overwrite an existing file.
 	Write(path string, data []byte) error
 
+	// WriteStream creates a new relative path and returns the `io.WriteCloser` that can be used to
+	// write into the storage path. Ensure that Close() is run on the returned writer.
+	WriteStream(path string) (io.WriteCloser, error)
+
 	// Remove uses the relative path of the storage combined with the provided path to
 	// remove a file from storage permanently.
 	Remove(path string) error

+ 84 - 0
core/pkg/storage/test.go

@@ -1,6 +1,7 @@
 package storage
 
 import (
+	"bytes"
 	"fmt"
 	"io"
 	"os"
@@ -501,6 +502,89 @@ func TestStorageReadToLocalFile(t *testing.T, store Storage) {
 	}
 }
 
+func TestStorageWriteStream(t *testing.T, store Storage) {
+	testName := "write_stream"
+
+	testCases := map[string]struct {
+		path     string
+		chunks   [][]byte
+		prewrite bool
+	}{
+		"single chunk": {
+			path:   path.Join(testpath, testName, "single.bin"),
+			chunks: [][]byte{[]byte("single chunk data")},
+		},
+		"multi-chunk": {
+			path:   path.Join(testpath, testName, "multi.bin"),
+			chunks: [][]byte{[]byte("alpha"), []byte("beta"), []byte("gamma")},
+		},
+		"nested path": {
+			path:   path.Join(testpath, testName, "sub/dir/data.bin"),
+			chunks: [][]byte{[]byte("nested data")},
+		},
+		"empty content": {
+			path:   path.Join(testpath, testName, "empty.bin"),
+			chunks: [][]byte{},
+		},
+		"overwrite existing": {
+			path:     path.Join(testpath, testName, "overwrite.bin"),
+			chunks:   [][]byte{[]byte("replaced content")},
+			prewrite: true,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			defer store.Remove(tc.path)
+
+			if tc.prewrite {
+				b, err := json.Marshal(tfc)
+				if err != nil {
+					t.Fatalf("marshal fixture: %s", err)
+				}
+				if err = store.Write(tc.path, b); err != nil {
+					t.Fatalf("pre-write: %s", err)
+				}
+			}
+
+			var want []byte
+			for _, chunk := range tc.chunks {
+				want = append(want, chunk...)
+			}
+
+			w, err := store.WriteStream(tc.path)
+			if err != nil {
+				t.Fatalf("WriteStream: %s", err)
+			}
+
+			for _, chunk := range tc.chunks {
+				n, writeErr := w.Write(chunk)
+				if writeErr != nil {
+					_ = w.Close()
+					t.Fatalf("Write: %s", writeErr)
+				}
+				if n != len(chunk) {
+					_ = w.Close()
+					t.Fatalf("short write: wrote %d of %d bytes", n, len(chunk))
+				}
+			}
+
+			if err = w.Close(); err != nil {
+				t.Fatalf("Close: %s", err)
+			}
+
+			got, err := store.Read(tc.path)
+			if err != nil {
+				t.Fatalf("Read after WriteStream: %s", err)
+			}
+
+			if !bytes.Equal(got, want) {
+				t.Errorf("content mismatch: got %q, want %q", got, want)
+			}
+		})
+	}
+}
+
 func TestStorageReadStream(t *testing.T, store Storage) {
 	testName := "read_stream"
 

+ 273 - 82
core/pkg/util/buffer.go

@@ -18,27 +18,43 @@ var bytePool *bufferPool = newBufferPool()
 // NonPrimitiveTypeError represents an error where the user provided a non-primitive data type for reading/writing
 var NonPrimitiveTypeError error = errors.New("Type provided to read/write does not fit inside 8 bytes.")
 
+// Mode is used to represent the 3 possible states of the buffer. note there is
+// no overlapping between states, as each Mode is handled exclusively.
+type Mode uint8
+
+const (
+	ReadWrite Mode = iota
+	ReadOnly
+	WriteOnly
+)
+
 // Buffer is a utility type which implements a very basic binary protocol for
-// writing core go types.
+// writing core go types. It can run as read-only, write-only, or read-write.
 type Buffer struct {
-	b  *bufio.Reader
-	bw *bytes.Buffer
+	r  *bufio.Reader
+	w  *bufio.Writer
+	rw *bytes.Buffer
+	m  Mode
 }
 
 // NewBuffer creates a new Buffer instance using LittleEndian ByteOrder.
 func NewBuffer() *Buffer {
-	var b bytes.Buffer
 	return &Buffer{
-		b:  nil,
-		bw: &b,
+		r:  nil,
+		w:  nil,
+		rw: new(bytes.Buffer),
+		m:  ReadWrite,
 	}
 }
 
-// NewBufferFromBytes creates a new Buffer instance using the provided byte slice.
+// NewBufferFromBytes creates a new read/write Buffer instance using the provided byte slice.
 // The new buffer assumes ownership of the byte slice.
 func NewBufferFromBytes(b []byte) *Buffer {
 	return &Buffer{
-		bw: bytes.NewBuffer(b),
+		r:  nil,
+		w:  nil,
+		rw: bytes.NewBuffer(b),
+		m:  ReadWrite,
 	}
 }
 
@@ -47,7 +63,10 @@ func NewBufferFromBytes(b []byte) *Buffer {
 func NewBufferFrom(b *Buffer) *Buffer {
 	bb := b.Bytes()
 	return &Buffer{
-		bw: bytes.NewBuffer(bb),
+		r:  nil,
+		w:  nil,
+		rw: bytes.NewBuffer(bb),
+		m:  ReadWrite,
 	}
 }
 
@@ -55,87 +74,178 @@ func NewBufferFrom(b *Buffer) *Buffer {
 // buffer is set to read-only.
 func NewBufferFromReader(reader io.Reader) *Buffer {
 	return &Buffer{
-		b:  bufio.NewReader(reader),
-		bw: nil,
+		r:  bufio.NewReader(reader),
+		w:  nil,
+		rw: nil,
+		m:  ReadOnly,
+	}
+}
+
+// NewBufferFromWriter creates a new Buffer instance using the provided io.Writer. This
+// buffer is set to write-only.
+func NewBufferFromWriter(writer io.Writer) *Buffer {
+	return &Buffer{
+		r:  nil,
+		w:  bufio.NewWriter(writer),
+		rw: nil,
+		m:  WriteOnly,
 	}
 }
 
 // WriteBool writes a bool value to the buffer
 func (b *Buffer) WriteBool(i bool) {
 	b.checkRO()
-	writeBool(b.bw, i)
+
+	if b.rw != nil {
+		writeBool(b.rw, i)
+		return
+	}
+
+	writeBuffBool(b.w, i)
 }
 
 // WriteInt writes an int value to the buffer.
 func (b *Buffer) WriteInt(i int) {
 	b.checkRO()
-	writeInt(b.bw, i)
+
+	if b.rw != nil {
+		writeInt(b.rw, i)
+		return
+	}
+
+	writeBuffInt(b.w, i)
 }
 
 // WriteInt8 writes an int8 value to the buffer.
 func (b *Buffer) WriteInt8(i int8) {
 	b.checkRO()
-	writeInt8(b.bw, i)
+
+	if b.rw != nil {
+		writeInt8(b.rw, i)
+		return
+	}
+
+	writeBuffInt8(b.w, i)
 }
 
 // WriteInt16 writes an int16 value to the buffer.
 func (b *Buffer) WriteInt16(i int16) {
 	b.checkRO()
-	writeInt16(b.bw, i)
+
+	if b.rw != nil {
+		writeInt16(b.rw, i)
+		return
+	}
+
+	writeBuffInt16(b.w, i)
 }
 
 // WriteInt32 writes an int32 value to the buffer.
 func (b *Buffer) WriteInt32(i int32) {
 	b.checkRO()
-	writeInt32(b.bw, i)
+
+	if b.rw != nil {
+		writeInt32(b.rw, i)
+		return
+	}
+
+	writeBuffInt32(b.w, i)
 }
 
 // WriteInt64 writes an int64 value to the buffer.
 func (b *Buffer) WriteInt64(i int64) {
 	b.checkRO()
-	writeInt64(b.bw, i)
+
+	if b.rw != nil {
+		writeInt64(b.rw, i)
+		return
+	}
+
+	writeBuffInt64(b.w, i)
 }
 
 // WriteUInt writes a uint value to the buffer.
 func (b *Buffer) WriteUInt(i uint) {
 	b.checkRO()
-	writeUint(b.bw, i)
+
+	if b.rw != nil {
+		writeUint(b.rw, i)
+		return
+	}
+
+	writeBuffUint(b.w, i)
 }
 
 // WriteUInt8 writes a uint8 value to the buffer.
 func (b *Buffer) WriteUInt8(i uint8) {
 	b.checkRO()
-	writeUint8(b.bw, i)
+
+	if b.rw != nil {
+		writeUint8(b.rw, i)
+		return
+	}
+
+	writeBuffUint8(b.w, i)
 }
 
 // WriteUInt16 writes a uint16 value to the buffer.
 func (b *Buffer) WriteUInt16(i uint16) {
 	b.checkRO()
-	writeUint16(b.bw, i)
+
+	if b.rw != nil {
+		writeUint16(b.rw, i)
+		return
+	}
+
+	writeBuffUint16(b.w, i)
 }
 
 // WriteUInt32 writes a uint32 value to the buffer.
 func (b *Buffer) WriteUInt32(i uint32) {
 	b.checkRO()
-	writeUint32(b.bw, i)
+
+	if b.rw != nil {
+		writeUint32(b.rw, i)
+		return
+	}
+
+	writeBuffUint32(b.w, i)
 }
 
 // WriteUInt64 writes a uint64 value to the buffer.
 func (b *Buffer) WriteUInt64(i uint64) {
 	b.checkRO()
-	writeUint64(b.bw, i)
+
+	if b.rw != nil {
+		writeUint64(b.rw, i)
+		return
+	}
+
+	writeBuffUint64(b.w, i)
 }
 
 // WriteFloat32 writes a float32 value to the buffer.
 func (b *Buffer) WriteFloat32(i float32) {
 	b.checkRO()
-	writeFloat32(b.bw, i)
+
+	if b.rw != nil {
+		writeFloat32(b.rw, i)
+		return
+	}
+
+	writeBuffFloat32(b.w, i)
 }
 
 // WriteFloat64 writes a float64 value to the buffer.
 func (b *Buffer) WriteFloat64(i float64) {
 	b.checkRO()
-	writeFloat64(b.bw, i)
+
+	if b.rw != nil {
+		writeFloat64(b.rw, i)
+		return
+	}
+
+	writeBuffFloat64(b.w, i)
 }
 
 // WriteString writes the string's length as a uint16 followed by the string contents.
@@ -147,216 +257,295 @@ func (b *Buffer) WriteString(i string) {
 	if len(s) > math.MaxUint16 {
 		s = s[:math.MaxUint16]
 	}
-	writeUint16(b.bw, uint16(len(s)))
-	b.bw.Write(s)
+
+	l := uint16(len(s))
+
+	if b.rw != nil {
+		writeUint16(b.rw, l)
+		b.rw.Write(s)
+		return
+	}
+
+	writeBuffUint16(b.w, l)
+	b.w.Write(s)
 }
 
 // WriteBytes writes the contents of the byte slice to the buffer.
 func (b *Buffer) WriteBytes(bytes []byte) {
 	b.checkRO()
-	b.bw.Write(bytes)
+
+	if b.rw != nil {
+		b.rw.Write(bytes)
+		return
+	}
+
+	b.w.Write(bytes)
 }
 
 // Bytes returns the unread portion of the underlying buffer storage. If the buffer was
 // created with an `io.Reader`, then the remaining unread bytes are drained into a byte
 // slice and returned.
 func (b *Buffer) Bytes() []byte {
-	if b.bw != nil {
-		return b.bw.Bytes()
+	b.checkWO()
+
+	if b.rw != nil {
+		return b.rw.Bytes()
 	}
 
-	bytes, err := io.ReadAll(b.b)
+	bytes, err := io.ReadAll(b.r)
 	if err != nil {
 		fmt.Fprintf(os.Stderr, "failed to read remaining bytes from Buffer: %s\n", err)
 	}
 	return bytes
 }
 
+// Peek will attempt to peek ahead if the buffer is in read-only mode.
 func (b *Buffer) Peek(length int) ([]byte, error) {
-	if b.bw != nil {
+	b.checkWO()
+
+	if b.rw != nil {
 		return nil, fmt.Errorf("unsupported Peek() operation on read/write buffer.")
 	}
-	return b.b.Peek(length)
+
+	return b.r.Peek(length)
+}
+
+// Flush will attempt to flush any pending writes if the buffer is in write-only mode.
+func (b *Buffer) Flush() {
+	if b.IsWriteOnly() {
+		if err := b.w.Flush(); err != nil {
+			fmt.Fprintf(os.Stderr, "Flushing io.Writer failed: %s\n", err)
+		}
+	}
 }
 
 // this should be inlined
 func (b *Buffer) checkRO() {
-	if b.bw == nil {
-		panic("Buffer is set to read-only")
+	if b.IsReadOnly() {
+		panic("Tried to write to a Buffer that is set to read-only")
 	}
 }
 
+func (b *Buffer) checkWO() {
+	if b.IsWriteOnly() {
+		panic("Tried to read from a Buffer that is set to write-only")
+	}
+}
+
+// IsReadOnly returns true if the buffer is set to only read mode.
+func (b *Buffer) IsReadOnly() bool {
+	return b.m == ReadOnly
+}
+
+// IsWriteOnly returns true if the buffer is set to only write mode.
+func (b *Buffer) IsWriteOnly() bool {
+	return b.m == WriteOnly
+}
+
+// IsReadWrite returns true if the buffer can be written to and read from.
+func (b *Buffer) IsReadWrite() bool {
+	return b.m == ReadWrite
+}
+
 // ReadBool reads a bool value from the buffer.
 func (b *Buffer) ReadBool() bool {
+	b.checkWO()
+
 	var i bool
-	if b.bw != nil {
-		readBool(b.bw, &i)
+	if b.rw != nil {
+		readBool(b.rw, &i)
 		return i
 	}
 
-	readBuffBool(b.b, &i)
+	readBuffBool(b.r, &i)
 	return i
 }
 
 // ReadInt reads an int value from the buffer.
 func (b *Buffer) ReadInt() int {
+	b.checkWO()
+
 	var i int
-	if b.bw != nil {
-		readInt(b.bw, &i)
+	if b.rw != nil {
+		readInt(b.rw, &i)
 		return i
 	}
 
-	readBuffInt(b.b, &i)
+	readBuffInt(b.r, &i)
 	return i
 }
 
 // ReadInt8 reads an int8 value from the buffer.
 func (b *Buffer) ReadInt8() int8 {
+	b.checkWO()
+
 	var i int8
-	if b.bw != nil {
-		readInt8(b.bw, &i)
+	if b.rw != nil {
+		readInt8(b.rw, &i)
 		return i
 	}
 
-	readBuffInt8(b.b, &i)
+	readBuffInt8(b.r, &i)
 	return i
 }
 
 // ReadInt16 reads an int16 value from the buffer.
 func (b *Buffer) ReadInt16() int16 {
+	b.checkWO()
+
 	var i int16
-	if b.bw != nil {
-		readInt16(b.bw, &i)
+	if b.rw != nil {
+		readInt16(b.rw, &i)
 		return i
 	}
 
-	readBuffInt16(b.b, &i)
+	readBuffInt16(b.r, &i)
 	return i
 }
 
 // ReadInt32 reads an int32 value from the buffer.
 func (b *Buffer) ReadInt32() int32 {
+	b.checkWO()
+
 	var i int32
-	if b.bw != nil {
-		readInt32(b.bw, &i)
+	if b.rw != nil {
+		readInt32(b.rw, &i)
 		return i
 	}
 
-	readBuffInt32(b.b, &i)
+	readBuffInt32(b.r, &i)
 	return i
 }
 
 // ReadInt64 reads an int64 value from the buffer.
 func (b *Buffer) ReadInt64() int64 {
+	b.checkWO()
+
 	var i int64
-	if b.bw != nil {
-		readInt64(b.bw, &i)
+	if b.rw != nil {
+		readInt64(b.rw, &i)
 		return i
 	}
 
-	readBuffInt64(b.b, &i)
+	readBuffInt64(b.r, &i)
 	return i
 }
 
 // ReadUInt reads a uint value from the buffer.
 func (b *Buffer) ReadUInt() uint {
+	b.checkWO()
+
 	var i uint
-	if b.bw != nil {
-		readUint(b.bw, &i)
+	if b.rw != nil {
+		readUint(b.rw, &i)
 		return i
 	}
 
-	readBuffUint(b.b, &i)
+	readBuffUint(b.r, &i)
 	return i
 }
 
 // ReadUInt8 reads a uint8 value from the buffer.
 func (b *Buffer) ReadUInt8() uint8 {
+	b.checkWO()
+
 	var i uint8
-	if b.bw != nil {
-		readUint8(b.bw, &i)
+	if b.rw != nil {
+		readUint8(b.rw, &i)
 		return i
 	}
 
-	readBuffUint8(b.b, &i)
+	readBuffUint8(b.r, &i)
 	return i
 }
 
 // ReadUInt16 reads a uint16 value from the buffer.
 func (b *Buffer) ReadUInt16() uint16 {
+	b.checkWO()
+
 	var i uint16
-	if b.bw != nil {
-		readUint16(b.bw, &i)
+	if b.rw != nil {
+		readUint16(b.rw, &i)
 		return i
 	}
 
-	readBuffUint16(b.b, &i)
+	readBuffUint16(b.r, &i)
 	return i
 }
 
 // ReadUInt32 reads a uint32 value from the buffer.
 func (b *Buffer) ReadUInt32() uint32 {
+	b.checkWO()
+
 	var i uint32
-	if b.bw != nil {
-		readUint32(b.bw, &i)
+	if b.rw != nil {
+		readUint32(b.rw, &i)
 		return i
 	}
 
-	readBuffUint32(b.b, &i)
+	readBuffUint32(b.r, &i)
 	return i
 }
 
 // ReadUInt64 reads a uint64 value from the buffer.
 func (b *Buffer) ReadUInt64() uint64 {
+	b.checkWO()
+
 	var i uint64
-	if b.bw != nil {
-		readUint64(b.bw, &i)
+	if b.rw != nil {
+		readUint64(b.rw, &i)
 		return i
 	}
 
-	readBuffUint64(b.b, &i)
+	readBuffUint64(b.r, &i)
 	return i
 }
 
 // ReadFloat32 reads a float32 value from the buffer.
 func (b *Buffer) ReadFloat32() float32 {
+	b.checkWO()
+
 	var i float32
-	if b.bw != nil {
-		readFloat32(b.bw, &i)
+	if b.rw != nil {
+		readFloat32(b.rw, &i)
 		return i
 	}
 
-	readBuffFloat32(b.b, &i)
+	readBuffFloat32(b.r, &i)
 	return i
 }
 
 // ReadFloat64 reads a float64 value from the buffer.
 func (b *Buffer) ReadFloat64() float64 {
+	b.checkWO()
+
 	var i float64
-	if b.bw != nil {
-		readFloat64(b.bw, &i)
+	if b.rw != nil {
+		readFloat64(b.rw, &i)
 		return i
 	}
 
-	readBuffFloat64(b.b, &i)
+	readBuffFloat64(b.r, &i)
 	return i
 }
 
 // ReadString reads a uint16 value from the buffer representing the string's length,
 // then uses the length to extract the exact length []byte representing the string.
 func (b *Buffer) ReadString() string {
+	b.checkWO()
+
 	var l uint16
-	if b.bw != nil {
-		readUint16(b.bw, &l)
-		return bytesToString(b.bw.Next(int(l)))
+	if b.rw != nil {
+		readUint16(b.rw, &l)
+		return bytesToString(b.rw.Next(int(l)))
 	}
 
-	readBuffUint16(b.b, &l)
+	readBuffUint16(b.r, &l)
 
 	bytes := bytePool.Get(int(l))
 	defer bytePool.Put(bytes)
 
-	_, err := readBuffFull(b.b, bytes)
+	_, err := readBuffFull(b.r, bytes)
 	if err != nil {
 		return ""
 	}
@@ -366,12 +555,14 @@ func (b *Buffer) ReadString() string {
 
 // ReadBytes reads the specified length from the buffer and returns the byte slice.
 func (b *Buffer) ReadBytes(length int) []byte {
-	if b.bw != nil {
-		return b.bw.Next(length)
+	b.checkWO()
+
+	if b.rw != nil {
+		return b.rw.Next(length)
 	}
 
 	bytes := make([]byte, length)
-	_, err := readBuffFull(b.b, bytes)
+	_, err := readBuffFull(b.r, bytes)
 	if err != nil {
 		return bytes
 	}
@@ -402,7 +593,7 @@ func (b *Buffer) ReadBytes(length int) []byte {
 //	  return bytesAsString(bytes)
 //	}
 //
-// In this case, we've create a byte array just big enough for the string, we extract the string data from the reader
+// In this case, we've created a byte array just big enough for the string, we extract the string data from the reader
 // and then cast the byte array in place to the string, and finally drop the byte array reference. This omits an additional
 // allocation if you were to use string(bytes)
 func bytesAsString(b []byte) string {

+ 45 - 0
core/pkg/util/buffer_test.go

@@ -321,6 +321,51 @@ func (sbr *randomByteReader) Read(b []byte) (int, error) {
 	return bytesCopied, err
 }
 
+func TestBufferWriterSupport(t *testing.T) {
+	byteBuff := new(bytes.Buffer)
+
+	buf := NewBufferFromWriter(byteBuff)
+	buf.WriteBool(true)
+	buf.WriteInt(42)
+	buf.WriteFloat64(3.14)
+	buf.WriteString("Testing, 1, 2, 3!")
+	buf.WriteUInt64(uint64(123456))
+	buf.WriteInt16(44)
+	buf.WriteFloat32(float32(5.0))
+	buf.Flush()
+
+	readerBuff := NewBufferFromBytes(byteBuff.Bytes())
+	b := readerBuff.ReadBool()
+	i := readerBuff.ReadInt()
+	f := readerBuff.ReadFloat64()
+	s := readerBuff.ReadString()
+	ui64 := readerBuff.ReadUInt64()
+	i16 := readerBuff.ReadInt16()
+	f32 := readerBuff.ReadFloat32()
+
+	if !b {
+		t.Errorf("expected true, got: false")
+	}
+	if i != 42 {
+		t.Errorf("expected 42, got: %d", i)
+	}
+	if f != 3.14 {
+		t.Errorf("expected 3.14, got: %f", f)
+	}
+	if s != "Testing, 1, 2, 3!" {
+		t.Errorf("expected 'Testing, 1, 2, 3!', got: '%s'", s)
+	}
+	if ui64 != uint64(123456) {
+		t.Errorf("expected 123456, got: %d", ui64)
+	}
+	if i16 != int16(44) {
+		t.Errorf("expected 44, got: %d", i16)
+	}
+	if f32 != float32(5.0) {
+		t.Errorf("expected 5.0, got: %f", f32)
+	}
+}
+
 func TestBufferReaderSupport(t *testing.T) {
 	buf := NewBuffer()
 	buf.WriteBool(true)

+ 110 - 8
core/pkg/util/bufferhelper.go

@@ -384,22 +384,18 @@ func readFull(r *bytes.Buffer, buf []byte) (n int, err error) {
 
 func writeBool(w *bytes.Buffer, data bool) error {
 	if data {
-		w.WriteByte(1)
-		return nil
+		return w.WriteByte(1)
 	}
 
-	w.WriteByte(0)
-	return nil
+	return w.WriteByte(0)
 }
 
 func writeInt8(w *bytes.Buffer, data int8) error {
-	w.WriteByte(byte(data))
-	return nil
+	return w.WriteByte(byte(data))
 }
 
 func writeUint8(w *bytes.Buffer, data uint8) error {
-	w.WriteByte(byte(data))
-	return nil
+	return w.WriteByte(byte(data))
 }
 
 func writeInt16(w *bytes.Buffer, data int16) error {
@@ -491,3 +487,109 @@ func writeFloat64(w *bytes.Buffer, data float64) error {
 	_, err := w.Write(bs)
 	return err
 }
+
+func writeBuffBool(w *bufio.Writer, data bool) error {
+	if data {
+		return w.WriteByte(1)
+	}
+
+	return w.WriteByte(0)
+}
+
+func writeBuffInt8(w *bufio.Writer, data int8) error {
+	return w.WriteByte(byte(data))
+}
+
+func writeBuffUint8(w *bufio.Writer, data uint8) error {
+	return w.WriteByte(byte(data))
+}
+
+func writeBuffInt16(w *bufio.Writer, data int16) error {
+	var b [2]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint16(bs, uint16(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffUint16(w *bufio.Writer, data uint16) error {
+	var b [2]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint16(bs, data)
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffInt32(w *bufio.Writer, data int32) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, uint32(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffUint32(w *bufio.Writer, data uint32) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, data)
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffInt(w *bufio.Writer, data int) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, uint32(int32(data)))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffUint(w *bufio.Writer, data uint) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, uint32(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffInt64(w *bufio.Writer, data int64) error {
+	var b [8]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint64(bs, uint64(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffUint64(w *bufio.Writer, data uint64) error {
+	var b [8]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint64(bs, data)
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffFloat32(w *bufio.Writer, data float32) error {
+	var b [4]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint32(bs, math.Float32bits(data))
+	_, err := w.Write(bs)
+	return err
+}
+
+func writeBuffFloat64(w *bufio.Writer, data float64) error {
+	var b [8]byte
+	bs := b[:]
+
+	binary.LittleEndian.PutUint64(bs, math.Float64bits(data))
+	_, err := w.Write(bs)
+	return err
+}

+ 24 - 0
core/pkg/util/fileutil/locks_unix.go

@@ -14,6 +14,30 @@ import (
 	"github.com/opencost/opencost/core/pkg/log"
 )
 
+// LockFile directly attempts to flock EX the file instance provided.
+func LockFile(f *os.File) error {
+	if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil {
+		return fmt.Errorf("unexpected error flock()-ing with EX: %w", err)
+	}
+	return nil
+}
+
+// ReadLockFile directly attempts to flock SH the file instance provided.
+func ReadLockFile(f *os.File) error {
+	if err := syscall.Flock(int(f.Fd()), syscall.LOCK_SH); err != nil {
+		return fmt.Errorf("unexpected error flock()-ing FD: %d directly: %w", f.Fd(), err)
+	}
+	return nil
+}
+
+// UnlockFile directly attempts to flock UN the file instance provided.
+func UnlockFile(f *os.File) error {
+	if err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN); err != nil {
+		return fmt.Errorf("unexpected error flock()-ing FD %d with UN: %w", f.Fd(), err)
+	}
+	return nil
+}
+
 // WriteLockedFD uses the flock() syscall to safely write to an open file as
 // long as other users of the file are also using flock()-based access.
 //

+ 15 - 0
core/pkg/util/fileutil/locks_windows.go

@@ -22,3 +22,18 @@ func ReadLockedFD(f *os.File) ([]byte, error) {
 func ReadLocked(filename string) ([]byte, error) {
 	return nil, fmt.Errorf("ReadLocked is not implemented on Windows. Please open an issue.")
 }
+
+// LockFile directly attempts to flock EX the file instance provided.
+func LockFile(f *os.File) error {
+	return fmt.Errorf("LockFile is not implemented on Windows. Please open an issue.")
+}
+
+// ReadLockFile directly attempts to flock SH the file instance provided.
+func ReadLockFile(f *os.File) error {
+	return fmt.Errorf("ReadLockFile is not implemented on Windows. Please open an issue.")
+}
+
+// UnlockFile directly attempts to flock UN the file instance provided.
+func UnlockFile(f *os.File) error {
+	return fmt.Errorf("UnlockFile is not implemented on Windows. Please open an issue.")
+}

+ 54 - 0
core/pkg/util/fileutil/writer.go

@@ -0,0 +1,54 @@
+package fileutil
+
+import (
+	"io"
+	"os"
+)
+
+// LockedFileWriter is an io.WriteCloser implementation of a writer that will use flock
+// for file locking during the write and unlock on close.
+type LockedFileWriter struct {
+	f *os.File
+}
+
+// Creates a new FLocking file writer that will flock a file on open, and unlock when the writer is
+// closed.
+func NewLockedFileWriter(path string) (io.WriteCloser, error) {
+	f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0666)
+	if err != nil {
+		return nil, err
+	}
+
+	if err := LockFile(f); err != nil {
+		f.Close()
+		return nil, err
+	}
+
+	if err := f.Truncate(0); err != nil {
+		UnlockFile(f)
+		f.Close()
+		return nil, err
+	}
+
+	if _, err := f.Seek(0, io.SeekStart); err != nil {
+		UnlockFile(f)
+		f.Close()
+		return nil, err
+	}
+
+	return &LockedFileWriter{
+		f: f,
+	}, nil
+}
+
+func (lf *LockedFileWriter) Write(p []byte) (int, error) {
+	return lf.f.Write(p)
+}
+
+func (lf *LockedFileWriter) Close() error {
+	if err := UnlockFile(lf.f); err != nil {
+		lf.f.Close()
+		return err
+	}
+	return lf.f.Close()
+}

+ 295 - 101
modules/collector-source/pkg/metric/metric_codecs.go

@@ -12,11 +12,13 @@
 package metric
 
 import (
+	"cmp"
 	"fmt"
 	"io"
 	"iter"
 	"os"
 	"reflect"
+	"slices"
 	"strings"
 	"sync"
 	"time"
@@ -28,16 +30,12 @@ import (
 const (
 	// GeneratorPackageName is the package the generator is targetting
 	GeneratorPackageName string = "metric"
-)
+	StringHeaderSize            = int64(unsafe.Sizeof(""))
 
-// BinaryTags represent the formatting tag used for specific optimization features
-const (
 	// BinaryTagStringTable is written and/or read prior to the existence of a string
 	// table (where each index is encoded as a string entry in the resource
 	BinaryTagStringTable string = "BGST"
-)
 
-const (
 	// DefaultCodecVersion is used for any resources listed in the Default version set
 	DefaultCodecVersion uint8 = 1
 )
@@ -60,14 +58,19 @@ type BingenConfiguration struct {
 
 	// FileBackedStringTableDir is the directory to write the string table files for reading.
 	FileBackedStringTableDir string
+
+	// FileBackedStringTableMemoMaxBytes limits in-memory memoization for file-backed table lookups.
+	// 0 disables memoization.
+	FileBackedStringTableMemoMaxBytes int64
 }
 
 // DefaultBingenConfiguration creates the default implementation of the bingen configuration
 // and returns it.
 func DefaultBingenConfiguration() *BingenConfiguration {
 	return &BingenConfiguration{
-		FileBackedStringTableEnabled: false,
-		FileBackedStringTableDir:     os.TempDir(),
+		FileBackedStringTableEnabled:      false,
+		FileBackedStringTableDir:          os.TempDir(),
+		FileBackedStringTableMemoMaxBytes: 0,
 	}
 }
 
@@ -99,12 +102,19 @@ func BingenFileBackedStringTableDir() string {
 	return bingenConfig.FileBackedStringTableDir
 }
 
+// BingenFileBackedStringTableMemoMaxBytes returns the maximum bytes used for file-backed memo cache.
+func BingenFileBackedStringTableMemoMaxBytes() int64 {
+	bingenConfigLock.RLock()
+	defer bingenConfigLock.RUnlock()
+
+	return bingenConfig.FileBackedStringTableMemoMaxBytes
+}
+
 //--------------------------------------------------------------------------
 //  Type Map
 //--------------------------------------------------------------------------
 
-// Generated type map for resolving interface implementations to
-// to concrete types
+// Generated type map for resolving interface implementations to to concrete types
 var typeMap map[string]reflect.Type = map[string]reflect.Type{
 	"Update":    reflect.TypeFor[Update](),
 	"UpdateSet": reflect.TypeFor[UpdateSet](),
@@ -136,21 +146,6 @@ func isReaderBinaryTag(buff *util.Buffer, tag string) bool {
 	return string(data[:len(tag)]) == tag
 }
 
-// appendBytes combines a and b into a new byte array
-func appendBytes(a []byte, b []byte) []byte {
-	al := len(a)
-	bl := len(b)
-	tl := al + bl
-
-	// allocate a new byte array for the combined
-	// use native copy for speedy byte copying
-	result := make([]byte, tl)
-	copy(result, a)
-	copy(result[al:], b)
-
-	return result
-}
-
 // typeToString determines the basic properties of the type, the qualifier, package path, and
 // type name, and returns the qualified type
 func typeToString(f interface{}) string {
@@ -263,33 +258,33 @@ type BingenFieldInfo struct {
 //  String Table Writer
 //--------------------------------------------------------------------------
 
-// StringTableWriter maps strings to specific indices for encoding
-type StringTableWriter struct {
-	l       sync.Mutex
+// StringTableWriter is the interface used to write the string table for encoding.
+type StringTableWriter interface {
+	// AddOrGet adds a string to the string table and returns the new index or
+	// an existing index.
+	AddOrGet(s string) int
+
+	// WriteTo will write the StringTable data (with the header) to the provided
+	// Buffer starting a the current write position
+	WriteTo(b *util.Buffer)
+}
+
+// IndexedStringTableWriter maps strings to specific indices for encoding
+type IndexedStringTableWriter struct {
 	indices map[string]int
 	next    int
 }
 
-// NewStringTableWriter Creates a new StringTableWriter instance with provided contents
-func NewStringTableWriter(contents ...string) *StringTableWriter {
-	st := &StringTableWriter{
-		indices: make(map[string]int, len(contents)),
-		next:    len(contents),
-	}
-
-	for i, entry := range contents {
-		st.indices[entry] = i
+// NewIndexedStringTableWriter Creates a new IndexedStringTableWriter instance.
+func NewIndexedStringTableWriter() *IndexedStringTableWriter {
+	return &IndexedStringTableWriter{
+		indices: make(map[string]int),
+		next:    0,
 	}
-
-	return st
 }
 
-// AddOrGet atomically retrieves a string entry's index if it exist. Otherwise, it will
-// add the entry and return the index.
-func (st *StringTableWriter) AddOrGet(s string) int {
-	st.l.Lock()
-	defer st.l.Unlock()
-
+// AddOrGet retrieves a string entry's index if it exists. Otherwise, it adds the entry and returns the new index.
+func (st *IndexedStringTableWriter) AddOrGet(s string) int {
 	if ind, ok := st.indices[s]; ok {
 		return ind
 	}
@@ -302,10 +297,7 @@ func (st *StringTableWriter) AddOrGet(s string) int {
 }
 
 // ToSlice Converts the contents to a string array for encoding.
-func (st *StringTableWriter) ToSlice() []string {
-	st.l.Lock()
-	defer st.l.Unlock()
-
+func (st *IndexedStringTableWriter) ToSlice() []string {
 	if st.next == 0 {
 		return []string{}
 	}
@@ -318,18 +310,95 @@ func (st *StringTableWriter) ToSlice() []string {
 }
 
 // ToBytes Converts the contents to a binary encoded representation
-func (st *StringTableWriter) ToBytes() []byte {
+func (st *IndexedStringTableWriter) ToBytes() []byte {
 	buff := util.NewBuffer()
-	buff.WriteBytes([]byte(BinaryTagStringTable)) // bingen table header
+	st.WriteTo(buff)
+	return buff.Bytes()
+}
+
+// WriteTo will write the StringTable data (with the header) to the provided
+// Buffer starting a the current write position
+func (st *IndexedStringTableWriter) WriteTo(buff *util.Buffer) {
+	// bingen string table header
+	buff.WriteBytes([]byte(BinaryTagStringTable))
 
+	// get an ordered string slice to encode
 	strs := st.ToSlice()
 
 	buff.WriteInt(len(strs)) // table length
 	for _, s := range strs {
 		buff.WriteString(s)
 	}
+}
 
-	return buff.Bytes()
+type indexed struct {
+	s     string
+	count uint64
+	index int
+}
+
+func newIndexed(s string, index int) *indexed {
+	return &indexed{
+		s:     s,
+		count: 1,
+		index: index,
+	}
+}
+
+// PrepassStringTableWriter maps strings to specific indices for encoding, sorted by the total
+// number of times they're accessed
+type PrepassStringTableWriter struct {
+	prepass map[string]*indexed
+	next    int
+}
+
+// NewPrepassStringTableWriter creates a new PrepassStringTableWriter instance.
+func NewPrepassStringTableWriter() *PrepassStringTableWriter {
+	return &PrepassStringTableWriter{
+		prepass: make(map[string]*indexed),
+	}
+}
+
+// AddOrGet retrieves a string entry's index if it exists. Otherwise, it adds the entry and returns the new index.
+func (st *PrepassStringTableWriter) AddOrGet(s string) int {
+	if ind, ok := st.prepass[s]; ok {
+		ind.count += 1
+		return ind.index
+	}
+
+	current := st.next
+	st.next++
+
+	st.prepass[s] = newIndexed(s, current)
+	return current
+}
+
+// WriteSortedTo sorts the string table by the number of accesses, writes the table in that
+// order, then returns a new StringTableWriter implementation that can be used for the new
+// sorted order index lookups.
+func (st *PrepassStringTableWriter) WriteSortedTo(buff *util.Buffer) StringTableWriter {
+	sl := make([]*indexed, st.next)
+	for _, ind := range st.prepass {
+		sl[ind.index] = ind
+	}
+
+	slices.SortFunc(sl, func(a *indexed, b *indexed) int {
+		return -cmp.Compare(a.count, b.count)
+	})
+
+	sti := NewIndexedStringTableWriter()
+	for _, ind := range sl {
+		sti.AddOrGet(ind.s)
+	}
+
+	sti.WriteTo(buff)
+	return sti
+}
+
+// WriteTo will write the StringTable data (with the header) to the provided
+// Buffer starting a the current write position
+func (st *PrepassStringTableWriter) WriteTo(buff *util.Buffer) {
+	panic("Prepass StringTableWriter cannot write directly")
 }
 
 //--------------------------------------------------------------------------
@@ -350,7 +419,7 @@ type StringTableReader interface {
 
 // SliceStringTableReader is a basic pre-loaded []string that provides index-based access.
 // The cost of this implementation is holding all strings in memory, which provides faster
-// lookup performance for memory usage.
+// lookup performance at the expense of memory usage.
 type SliceStringTableReader struct {
 	table []string
 }
@@ -411,11 +480,12 @@ type fileStringRef struct {
 type FileStringTableReader struct {
 	f    *os.File
 	refs []fileStringRef
+	memo []string
 }
 
 // NewFileStringTableFromBuffer reads exactly tl length-prefixed (uint16) string payloads from buffer
 // and appends each payload to a new temp file. It does not retain full strings in memory.
-func NewFileStringTableReaderFrom(buffer *util.Buffer, dir string) StringTableReader {
+func NewFileStringTableReaderFrom(buffer *util.Buffer, dir string, memoMaxBytes int64) StringTableReader {
 	// helper func to cast a string in-place to a byte slice.
 	// NOTE: Return value is READ-ONLY. DO NOT MODIFY!
 	byteSliceFor := func(s string) []byte {
@@ -469,9 +539,40 @@ func NewFileStringTableReaderFrom(buffer *util.Buffer, dir string) StringTableRe
 		}
 	}
 
+	var memo []string
+
+	// Pre-load cache with strings up to memoMaxBytes, respecting string boundaries
+	if memoMaxBytes > 0 && len(refs) > 0 {
+		memo = make([]string, len(refs))
+		var cumulativeSize int64
+		for i, ref := range refs {
+			// Check if adding this string would exceed the limit
+			if cumulativeSize+int64(ref.length)+StringHeaderSize > memoMaxBytes {
+				// Would exceed limit, stop here
+				break
+			}
+
+			// Read string from file and cache it
+			if ref.length > 0 {
+				b := make([]byte, ref.length)
+				_, err := f.ReadAt(b, ref.off)
+				if err != nil {
+					// If we can't read, skip this entry but continue
+					continue
+				}
+
+				// Cast the allocated bytes to a string in-place
+				str := unsafe.String(unsafe.SliceData(b), len(b))
+				memo[i] = str
+				cumulativeSize += int64(ref.length) + StringHeaderSize
+			}
+		}
+	}
+
 	return &FileStringTableReader{
 		f:    f,
 		refs: refs,
+		memo: memo,
 	}
 }
 
@@ -489,14 +590,19 @@ func (fstr *FileStringTableReader) At(index int) string {
 		return ""
 	}
 
+	// Check cache first
+	if fstr.memo != nil && len(fstr.memo) > index && fstr.memo[index] != "" {
+		return fstr.memo[index]
+	}
+
+	// Cache miss - read from file
 	b := make([]byte, ref.length)
 	_, err := fstr.f.ReadAt(b, ref.off)
 	if err != nil {
 		return ""
 	}
 
-	// cast the allocated bytes to a string in-place, as we
-	// were the ones that allocated the bytes
+	// Cast the allocated bytes to a string in-place, as we were the ones that allocated the bytes
 	return unsafe.String(unsafe.SliceData(b), len(b))
 }
 
@@ -519,6 +625,7 @@ func (fstr *FileStringTableReader) Close() error {
 	err := fstr.f.Close()
 	fstr.f = nil
 	fstr.refs = nil
+	fstr.memo = nil
 
 	if path != "" {
 		_ = os.Remove(path)
@@ -535,7 +642,49 @@ func (fstr *FileStringTableReader) Close() error {
 // and table data
 type EncodingContext struct {
 	Buffer *util.Buffer
-	Table  *StringTableWriter
+	Table  StringTableWriter
+}
+
+// NewEncodingContext creates a new EncodingContext instance that will create a new []byte buffer
+// for writing, and return the context
+func NewEncodingContext(tableWriter StringTableWriter) *EncodingContext {
+	return &EncodingContext{
+		Buffer: util.NewBuffer(),
+		Table:  tableWriter,
+	}
+}
+
+// NewEncodingContextFromWriter creates a new EncodingContext instance that will create a new Buffer
+// from the provided io.Writer and StringTableWriter.
+func NewEncodingContextFromWriter(writer io.Writer, tableWriter StringTableWriter) *EncodingContext {
+	return &EncodingContext{
+		Buffer: util.NewBufferFromWriter(writer),
+		Table:  tableWriter,
+	}
+}
+
+// NewEncodingContextFromBuffer creates a new EncodingContext instance that will leverage an existing
+// Buffer and StringTableWriter.
+func NewEncodingContextFromBuffer(buffer *util.Buffer, tableWriter StringTableWriter) *EncodingContext {
+	return &EncodingContext{
+		Buffer: buffer,
+		Table:  tableWriter,
+	}
+}
+
+// ToBytes returns the encoded string table bytes (if applicable) combined with the encoded buffer bytes. If
+// a string table is being used, the string table bytes will be written first to ensure correct ordering for
+// decoding.
+func (ec *EncodingContext) ToBytes() []byte {
+	encBytes := ec.Buffer.Bytes()
+	if ec.Table != nil {
+		buff := util.NewBuffer()
+		ec.Table.WriteTo(buff)
+		buff.WriteBytes(encBytes)
+		return buff.Bytes()
+	}
+
+	return encBytes
 }
 
 // IsStringTable returns true if the table is available
@@ -583,7 +732,7 @@ func NewDecodingContextFromReader(reader io.Reader) *DecodingContext {
 
 		// create correct string table implementation
 		if IsBingenFileBackedStringTableEnabled() {
-			table = NewFileStringTableReaderFrom(buff, BingenFileBackedStringTableDir())
+			table = NewFileStringTableReaderFrom(buff, BingenFileBackedStringTableDir(), BingenFileBackedStringTableMemoMaxBytes())
 		} else {
 			table = NewSliceStringTableReaderFrom(buff)
 		}
@@ -630,18 +779,25 @@ type BinDecoder interface {
 // MarshalBinary serializes the internal properties of this Update instance
 // into a byte array
 func (target *Update) MarshalBinary() (data []byte, err error) {
-	ctx := &EncodingContext{
-		Buffer: util.NewBuffer(),
-		Table:  nil,
-	}
+	ctx := NewEncodingContext(nil)
 
 	e := target.MarshalBinaryWithContext(ctx)
 	if e != nil {
 		return nil, e
 	}
 
-	encBytes := ctx.Buffer.Bytes()
-	return encBytes, nil
+	return ctx.ToBytes(), nil
+}
+
+// MarshalBinary serializes the internal properties of this Update instance
+// into an io.Writer.
+func (target *Update) MarshalBinaryTo(writer io.Writer) error {
+	buff := util.NewBufferFromWriter(writer)
+	defer buff.Flush()
+
+	ctx := NewEncodingContextFromBuffer(buff, nil)
+
+	return target.MarshalBinaryWithContext(ctx)
 }
 
 // MarshalBinaryWithContext serializes the internal properties of this Update instance
@@ -653,9 +809,9 @@ func (target *Update) MarshalBinaryWithContext(ctx *EncodingContext) (err error)
 			if e, ok := r.(error); ok {
 				err = e
 			} else if s, ok := r.(string); ok {
-				err = fmt.Errorf("Unexpected panic: %s", s)
+				err = fmt.Errorf("unexpected panic: %s", s)
 			} else {
-				err = fmt.Errorf("Unexpected panic: %+v", r)
+				err = fmt.Errorf("unexpected panic: %+v", r)
 			}
 		}
 	}()
@@ -669,6 +825,7 @@ func (target *Update) MarshalBinaryWithContext(ctx *EncodingContext) (err error)
 	} else {
 		buff.WriteString(target.Name) // write string
 	}
+
 	if target.Labels == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
 	} else {
@@ -683,17 +840,21 @@ func (target *Update) MarshalBinaryWithContext(ctx *EncodingContext) (err error)
 			} else {
 				buff.WriteString(v) // write string
 			}
+
 			if ctx.IsStringTable() {
 				c := ctx.Table.AddOrGet(z)
 				buff.WriteInt(c) // write table index
 			} else {
 				buff.WriteString(z) // write string
 			}
+
 		}
 		// --- [end][write][map](map[string]string) ---
 
 	}
+
 	buff.WriteFloat64(target.Value) // write float64
+
 	if target.AdditionalInfo == nil {
 		buff.WriteUInt8(uint8(0)) // write nil byte
 	} else {
@@ -708,16 +869,19 @@ func (target *Update) MarshalBinaryWithContext(ctx *EncodingContext) (err error)
 			} else {
 				buff.WriteString(vv) // write string
 			}
+
 			if ctx.IsStringTable() {
 				e := ctx.Table.AddOrGet(zz)
 				buff.WriteInt(e) // write table index
 			} else {
 				buff.WriteString(zz) // write string
 			}
+
 		}
 		// --- [end][write][map](map[string]string) ---
 
 	}
+
 	return nil
 }
 
@@ -726,6 +890,7 @@ func (target *Update) MarshalBinaryWithContext(ctx *EncodingContext) (err error)
 func (target *Update) UnmarshalBinary(data []byte) error {
 	ctx := NewDecodingContextFromBytes(data)
 	defer ctx.Close()
+
 	err := target.UnmarshalBinaryWithContext(ctx)
 	if err != nil {
 		return err
@@ -739,6 +904,7 @@ func (target *Update) UnmarshalBinary(data []byte) error {
 func (target *Update) UnmarshalBinaryFromReader(reader io.Reader) error {
 	ctx := NewDecodingContextFromReader(reader)
 	defer ctx.Close()
+
 	err := target.UnmarshalBinaryWithContext(ctx)
 	if err != nil {
 		return err
@@ -756,9 +922,9 @@ func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err erro
 			if e, ok := r.(error); ok {
 				err = e
 			} else if s, ok := r.(string); ok {
-				err = fmt.Errorf("Unexpected panic: %s", s)
+				err = fmt.Errorf("unexpected panic: %s", s)
 			} else {
-				err = fmt.Errorf("Unexpected panic: %+v", r)
+				err = fmt.Errorf("unexpected panic: %+v", r)
 			}
 		}
 	}()
@@ -767,7 +933,7 @@ func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err erro
 	version := buff.ReadUInt8()
 
 	if version > DefaultCodecVersion {
-		return fmt.Errorf("Invalid Version Unmarshaling Update. Expected %d or less, got %d", DefaultCodecVersion, version)
+		return fmt.Errorf("Invalid Version Unmarshalling Update. Expected %d or less, got %d", DefaultCodecVersion, version)
 	}
 
 	var b string
@@ -786,7 +952,7 @@ func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err erro
 		// --- [begin][read][map](map[string]string) ---
 		e := buff.ReadInt() // map len
 		d := make(map[string]string, e)
-		for i := 0; i < e; i++ {
+		for range e {
 			var v string
 			var g string
 			if ctx.IsStringTable() {
@@ -815,6 +981,7 @@ func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err erro
 		// --- [end][read][map](map[string]string) ---
 
 	}
+
 	o := buff.ReadFloat64() // read float64
 	target.Value = o
 
@@ -824,7 +991,7 @@ func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err erro
 		// --- [begin][read][map](map[string]string) ---
 		q := buff.ReadInt() // map len
 		p := make(map[string]string, q)
-		for j := 0; j < q; j++ {
+		for range q {
 			var vv string
 			var s string
 			if ctx.IsStringTable() {
@@ -853,6 +1020,7 @@ func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err erro
 		// --- [end][read][map](map[string]string) ---
 
 	}
+
 	return nil
 }
 
@@ -863,20 +1031,37 @@ func (target *Update) UnmarshalBinaryWithContext(ctx *DecodingContext) (err erro
 // MarshalBinary serializes the internal properties of this UpdateSet instance
 // into a byte array
 func (target *UpdateSet) MarshalBinary() (data []byte, err error) {
-	ctx := &EncodingContext{
-		Buffer: util.NewBuffer(),
-		Table:  NewStringTableWriter(),
-	}
+	ctx := NewEncodingContext(NewIndexedStringTableWriter())
 
 	e := target.MarshalBinaryWithContext(ctx)
 	if e != nil {
 		return nil, e
 	}
 
-	encBytes := ctx.Buffer.Bytes()
-	sTableBytes := ctx.Table.ToBytes()
-	merged := appendBytes(sTableBytes, encBytes)
-	return merged, nil
+	return ctx.ToBytes(), nil
+}
+
+// MarshalBinary serializes the internal properties of this UpdateSet instance
+// into an io.Writer.
+func (target *UpdateSet) MarshalBinaryTo(writer io.Writer) error {
+	buff := util.NewBufferFromWriter(writer)
+	defer buff.Flush()
+
+	// run a pre-pass to collect all strings into the string table and discard all writes to the main
+	// buffer. Then, we write the string table, sorted by number of repeated uses (descending), to the
+	// main buffer, and use the resulting table as part of the context for the main pass.
+	prepass := NewPrepassStringTableWriter()
+	prepassCtx := NewEncodingContextFromWriter(io.Discard, prepass)
+
+	e := target.MarshalBinaryWithContext(prepassCtx)
+	if e != nil {
+		return e
+	}
+
+	tableWriter := prepass.WriteSortedTo(buff)
+	ctx := NewEncodingContextFromBuffer(buff, tableWriter)
+
+	return target.MarshalBinaryWithContext(ctx)
 }
 
 // MarshalBinaryWithContext serializes the internal properties of this UpdateSet instance
@@ -888,9 +1073,9 @@ func (target *UpdateSet) MarshalBinaryWithContext(ctx *EncodingContext) (err err
 			if e, ok := r.(error); ok {
 				err = e
 			} else if s, ok := r.(string); ok {
-				err = fmt.Errorf("Unexpected panic: %s", s)
+				err = fmt.Errorf("unexpected panic: %s", s)
 			} else {
-				err = fmt.Errorf("Unexpected panic: %+v", r)
+				err = fmt.Errorf("unexpected panic: %+v", r)
 			}
 		}
 	}()
@@ -913,8 +1098,9 @@ func (target *UpdateSet) MarshalBinaryWithContext(ctx *EncodingContext) (err err
 		buff.WriteUInt8(uint8(1)) // write non-nil byte
 
 		// --- [begin][write][slice]([]Update) ---
-		buff.WriteInt(len(target.Updates)) // array length
-		for i := 0; i < len(target.Updates); i++ {
+		buff.WriteInt(len(target.Updates)) // slice length
+		for i := range target.Updates {
+
 			// --- [begin][write][struct](Update) ---
 			buff.WriteInt(0) // [compatibility, unused]
 			errB := target.Updates[i].MarshalBinaryWithContext(ctx)
@@ -927,6 +1113,7 @@ func (target *UpdateSet) MarshalBinaryWithContext(ctx *EncodingContext) (err err
 		// --- [end][write][slice]([]Update) ---
 
 	}
+
 	return nil
 }
 
@@ -935,6 +1122,7 @@ func (target *UpdateSet) MarshalBinaryWithContext(ctx *EncodingContext) (err err
 func (target *UpdateSet) UnmarshalBinary(data []byte) error {
 	ctx := NewDecodingContextFromBytes(data)
 	defer ctx.Close()
+
 	err := target.UnmarshalBinaryWithContext(ctx)
 	if err != nil {
 		return err
@@ -948,6 +1136,7 @@ func (target *UpdateSet) UnmarshalBinary(data []byte) error {
 func (target *UpdateSet) UnmarshalBinaryFromReader(reader io.Reader) error {
 	ctx := NewDecodingContextFromReader(reader)
 	defer ctx.Close()
+
 	err := target.UnmarshalBinaryWithContext(ctx)
 	if err != nil {
 		return err
@@ -965,9 +1154,9 @@ func (target *UpdateSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (err e
 			if e, ok := r.(error); ok {
 				err = e
 			} else if s, ok := r.(string); ok {
-				err = fmt.Errorf("Unexpected panic: %s", s)
+				err = fmt.Errorf("unexpected panic: %s", s)
 			} else {
-				err = fmt.Errorf("Unexpected panic: %+v", r)
+				err = fmt.Errorf("unexpected panic: %+v", r)
 			}
 		}
 	}()
@@ -976,13 +1165,13 @@ func (target *UpdateSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (err e
 	version := buff.ReadUInt8()
 
 	if version > DefaultCodecVersion {
-		return fmt.Errorf("Invalid Version Unmarshaling UpdateSet. Expected %d or less, got %d", DefaultCodecVersion, version)
+		return fmt.Errorf("Invalid Version Unmarshalling UpdateSet. Expected %d or less, got %d", DefaultCodecVersion, version)
 	}
 
 	// --- [begin][read][reference](time.Time) ---
-	a := &time.Time{}
-	b := buff.ReadInt()    // byte array length
-	c := buff.ReadBytes(b) // byte array
+	a := new(time.Time)
+	b := buff.ReadInt() // byte array length
+	c := buff.ReadBytes(b)
 	errA := a.UnmarshalBinary(c)
 	if errA != nil {
 		return errA
@@ -994,11 +1183,12 @@ func (target *UpdateSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (err e
 		target.Updates = nil
 	} else {
 		// --- [begin][read][slice]([]Update) ---
-		e := buff.ReadInt() // array len
+		e := buff.ReadInt() // slice len
 		d := make([]Update, e)
-		for i := 0; i < e; i++ {
+		for i := range e {
+
 			// --- [begin][read][struct](Update) ---
-			g := &Update{}
+			g := new(Update)
 			buff.ReadInt() // [compatibility, unused]
 			errB := g.UnmarshalBinaryWithContext(ctx)
 			if errB != nil {
@@ -1013,6 +1203,7 @@ func (target *UpdateSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (err e
 		// --- [end][read][slice]([]Update) ---
 
 	}
+
 	return nil
 }
 
@@ -1022,7 +1213,7 @@ func (target *UpdateSet) UnmarshalBinaryWithContext(ctx *DecodingContext) (err e
 
 // UpdateSetStream is a single use field stream for the contents of an UpdateSet instance. Instead of creating an instance and populating
 // the fields on that instance, we provide a streaming iterator which yields (BingenFieldInfo, *BingenValue) tuples for each
-// stremable element. All slices and maps will be flattened one depth and each element streamed individually.
+// streamable element. All slices and maps will be flattened one depth and each element streamed individually.
 type UpdateSetStream struct {
 	reader io.Reader
 	ctx    *DecodingContext
@@ -1064,7 +1255,7 @@ func (stream *UpdateSetStream) Stream() iter.Seq2[BingenFieldInfo, *BingenValue]
 		version := buff.ReadUInt8()
 
 		if version > DefaultCodecVersion {
-			stream.err = fmt.Errorf("Invalid Version Unmarshaling UpdateSet. Expected %d or less, got %d", DefaultCodecVersion, version)
+			stream.err = fmt.Errorf("Invalid Version Unmarshalling UpdateSet. Expected %d or less, got %d", DefaultCodecVersion, version)
 			return
 		}
 
@@ -1074,40 +1265,42 @@ func (stream *UpdateSetStream) Stream() iter.Seq2[BingenFieldInfo, *BingenValue]
 		}
 
 		// --- [begin][read][reference](time.Time) ---
-		b := &time.Time{}
-		c := buff.ReadInt()    // byte array length
-		d := buff.ReadBytes(c) // byte array
+		b := new(time.Time)
+		c := buff.ReadInt() // byte array length
+		d := buff.ReadBytes(c)
 		errA := b.UnmarshalBinary(d)
 		if errA != nil {
 			stream.err = errA
 			return
+
 		}
 		a := *b
 		// --- [end][read][reference](time.Time) ---
-
 		if !yield(fi, singleV(a)) {
 			return
 		}
+
 		fi = BingenFieldInfo{
 			Type: reflect.TypeFor[[]Update](),
 			Name: "Updates",
 		}
-
 		if buff.ReadUInt8() == uint8(0) {
 			if !yield(fi, nil) {
 				return
 			}
 		} else {
 			// --- [begin][read][streaming-slice]([]Update) ---
-			e := buff.ReadInt() // array len
-			for i := 0; i < e; i++ {
+			e := buff.ReadInt() // slice len
+			for i := range e {
+
 				// --- [begin][read][struct](Update) ---
-				g := &Update{}
+				g := new(Update)
 				buff.ReadInt() // [compatibility, unused]
 				errB := g.UnmarshalBinaryWithContext(ctx)
 				if errB != nil {
 					stream.err = errB
 					return
+
 				}
 				f := *g
 				// --- [end][read][struct](Update) ---
@@ -1119,5 +1312,6 @@ func (stream *UpdateSetStream) Stream() iter.Seq2[BingenFieldInfo, *BingenValue]
 			// --- [end][read][streaming-slice]([]Update) ---
 
 		}
+
 	}
 }

Некоторые файлы не были показаны из-за большого количества измененных файлов