Quellcode durchsuchen

Rework some of the export API, and unit tests galore!

Matt Bolt vor 1 Jahr
Ursprung
Commit
b6751c0624

+ 2 - 2
core/pkg/diagnostics/exporter/exporter.go

@@ -9,14 +9,14 @@ import (
 )
 
 // NewDiagnosticExporter creates a new `StorageExporter[DiagnosticsRunReport]` instance for exporting diagnostic run events.
-func NewDiagnosticExporter(clusterId string, applicationName string, storage storage.Storage) *exporter.StorageExporter[diagnostics.DiagnosticsRunReport] {
+func NewDiagnosticExporter(clusterId string, applicationName string, storage storage.Storage) exporter.EventExporter[diagnostics.DiagnosticsRunReport] {
 	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, diagnostics.DiagnosticsEventName, applicationName)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil
 	}
 
-	return exporter.NewStorageExporter(
+	return exporter.NewEventStorageExporter(
 		diagnostics.DiagnosticsEventName,
 		pathing,
 		NewDiagnosticsEncoder(),

+ 20 - 4
core/pkg/exporter/controller.go

@@ -11,6 +11,7 @@ import (
 	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/util/atomic"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
+	"github.com/opencost/opencost/core/pkg/util/typeutil"
 )
 
 // ExportController is a controller interface that is responsible for exporting data on a specific interval.
@@ -31,13 +32,13 @@ type ExportController interface {
 type EventExportController[T any] struct {
 	runState atomic.AtomicRunState
 	source   ExportSource[T]
-	exporter Exporter[T]
+	exporter EventExporter[T]
 	typeName string
 }
 
 // NewEventExportController creates a new `EventExportController[T]` instance which is used to export timestamped events of type T
 // on a specific interval.
-func NewEventExportController[T any](source ExportSource[T], exporter Exporter[T]) *EventExportController[T] {
+func NewEventExportController[T any](source ExportSource[T], exporter EventExporter[T]) *EventExportController[T] {
 	return &EventExportController[T]{
 		source:   source,
 		exporter: exporter,
@@ -78,7 +79,7 @@ func (cd *EventExportController[T]) Start(interval time.Duration) bool {
 				continue
 			}
 
-			err := cd.exporter.Export(evt)
+			err := cd.exporter.Export(t, evt)
 			if err != nil {
 				log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
 			}
@@ -109,11 +110,12 @@ type ComputeExportController[T any] struct {
 func NewComputeExportController[T any](
 	source ComputeSource[T],
 	exporter ComputeExporter[T],
+	resolution time.Duration,
 	sourceResolution time.Duration,
 ) *ComputeExportController[T] {
 	return &ComputeExportController[T]{
 		source:           source,
-		resolution:       exporter.Resolution(),
+		resolution:       resolution,
 		sourceResolution: sourceResolution,
 		exporter:         exporter,
 		typeName:         reflect.TypeOf((*T)(nil)).Elem().String(),
@@ -277,11 +279,17 @@ func (g *ComputeExportControllerGroup[T]) Name() string {
 }
 
 func (g *ComputeExportControllerGroup[T]) Start(interval time.Duration) bool {
+	if len(g.controllers) == 0 {
+		log.Warnf("ComputeExportControllerGroup[%s] has no controllers to start", typeutil.TypeOf[T]())
+		return false
+	}
+
 	for _, c := range g.controllers {
 		if !c.Start(interval) {
 			return false
 		}
 	}
+
 	return true
 }
 
@@ -290,3 +298,11 @@ func (g *ComputeExportControllerGroup[T]) Stop() {
 		c.Stop()
 	}
 }
+
+func (g *ComputeExportControllerGroup[T]) Resolutions() []time.Duration {
+	resolutions := make([]time.Duration, 0, len(g.controllers))
+	for _, c := range g.controllers {
+		resolutions = append(resolutions, c.resolution)
+	}
+	return resolutions
+}

+ 16 - 25
core/pkg/exporter/exporter.go

@@ -12,30 +12,36 @@ import (
 )
 
 // Exporter[T] is a generic interface for exporting T instances to a specific storage destination.
-type Exporter[T any] interface {
+type Exporter[TimeUnit any, T any] interface {
 	// Export performs the export operation for the provided data.
-	Export(data *T) error
+	Export(time TimeUnit, data *T) error
 }
 
-// StorageExporter[T] is an implementation of an Exporter[T] that writes data to a storage backend using
+// EventExporter[T] is an alias type of an Exporter[time.Time, T] that writes data that is timestamped.
+type EventExporter[T any] Exporter[time.Time, T]
+
+// ComputeExporter[T] is an alias type of an Exporter[opencost.Window, T] that writes data for a specific window.
+type ComputeExporter[T any] Exporter[opencost.Window, T]
+
+// EventStorageExporter[T] is an implementation of an Exporter[T] that writes data to a storage backend using
 // the `github.com/opencost/opencost/core/pkg/storage` package, a pathing strategy, and an encoder.
-type StorageExporter[T any] struct {
+type EventStorageExporter[T any] struct {
 	pipeline string
 	paths    pathing.StoragePathFormatter[time.Time]
 	encoder  Encoder[T]
 	storage  storage.Storage
 }
 
-// NewStorageExporter creates a new StorageExporter instance, which is responsible for exporting data to a storage backend.
+// NewEventStorageExporter creates a new StorageExporter instance, which is responsible for exporting data to a storage backend.
 // It uses a pathing strategy to determine the storage location, an encoder to convert the data to binary format, and
 // a storage backend to write the data.
-func NewStorageExporter[T any](
+func NewEventStorageExporter[T any](
 	pipeline string,
 	paths pathing.StoragePathFormatter[time.Time],
 	encoder Encoder[T],
 	storage storage.Storage,
-) *StorageExporter[T] {
-	return &StorageExporter[T]{
+) EventExporter[T] {
+	return &EventStorageExporter[T]{
 		pipeline: pipeline,
 		paths:    paths,
 		encoder:  encoder,
@@ -45,8 +51,7 @@ func NewStorageExporter[T any](
 
 // Export performs the export operation for the provided data. It encodes the data using the encoder and writes it to
 // the storage backend using the pathing strategy.
-func (se *StorageExporter[T]) Export(data *T) error {
-	t := time.Now().UTC()
+func (se *EventStorageExporter[T]) Export(t time.Time, data *T) error {
 	path := se.paths.ToFullPath("", t, se.encoder.FileExt())
 
 	bin, err := se.encoder.Encode(data)
@@ -63,15 +68,6 @@ func (se *StorageExporter[T]) Export(data *T) error {
 	return nil
 }
 
-// ComputeExporter[T] is an interface that exports windowed data of type T using a specific resolution.
-type ComputeExporter[T any] interface {
-	// Export performs the export operation for the provided data.
-	Export(window opencost.Window, data *T) error
-
-	// Resolution contains the resolution of the data being exported
-	Resolution() time.Duration
-}
-
 // ComputeStorageExporter[T] is an implementation of ComputeExporter[T] that writes data to a storage backend using
 // `github.com/opencost/opencost/core/pkg/storage`, a pathing strategy, and an encoder.
 type ComputeStorageExporter[T any] struct {
@@ -94,7 +90,7 @@ func NewComputeStorageExporter[T any](
 	encoder Encoder[T],
 	storage storage.Storage,
 	validator validator.ExportValidator[T],
-) *ComputeStorageExporter[T] {
+) ComputeExporter[T] {
 	return &ComputeStorageExporter[T]{
 		pipeline:   pipeline,
 		resolution: resolution,
@@ -140,8 +136,3 @@ func (se *ComputeStorageExporter[T]) Export(window opencost.Window, data *T) err
 
 	return nil
 }
-
-// Resolution returns the resolution of the data being exported.
-func (se *ComputeStorageExporter[T]) Resolution() time.Duration {
-	return se.resolution
-}

+ 111 - 0
core/pkg/exporter/exporter_test.go

@@ -0,0 +1,111 @@
+package exporter
+
+import (
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/exporter/validator"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/json"
+)
+
+const (
+	TestPipelineName = "test-pipeline"
+	TestClusterId    = "test-cluster"
+	TestEventName    = "test-event-path"
+)
+
+type TestData struct {
+	Message string `json:"message"`
+}
+
+func TestStorageExporters(t *testing.T) {
+	t.Run("test event storage exporter", func(t *testing.T) {
+		store := storage.NewMemoryStorage()
+		p, err := pathing.NewEventStoragePathFormatter("", TestClusterId, TestEventName)
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
+		encoder := NewJSONEncoder[TestData]()
+		export := NewEventStorageExporter(TestPipelineName, p, encoder, store)
+
+		ts := time.Now().UTC().Truncate(time.Minute)
+
+		export.Export(ts, &TestData{
+			Message: "TestMessage-1",
+		})
+
+		expectedPath := p.ToFullPath("", ts, "json")
+
+		data, err := store.Read(expectedPath)
+		if err != nil {
+			t.Fatalf("failed to read data from store: %v", err)
+		}
+
+		if len(data) == 0 {
+			t.Fatalf("expected data to be non-empty, got empty")
+		}
+
+		var td *TestData = new(TestData)
+		if err := json.Unmarshal(data, td); err != nil {
+			t.Fatalf("failed to unmarshal data: %v", err)
+		}
+
+		if td.Message != "TestMessage-1" {
+			t.Fatalf("expected message to be 'TestMessage-1', got '%s'", td.Message)
+		}
+	})
+
+	t.Run("test compute storage exporter", func(t *testing.T) {
+		res := 24 * time.Hour
+		store := storage.NewMemoryStorage()
+		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, &res)
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
+		encoder := NewBingenEncoder[opencost.AllocationSet]()
+		export := NewComputeStorageExporter[opencost.AllocationSet](
+			pipelines.AllocationPipelineName,
+			24*time.Hour,
+			p,
+			encoder,
+			store,
+			validator.NewSetValidator[opencost.AllocationSet](24*time.Hour),
+		)
+
+		start := time.Now().UTC().Truncate(res)
+		end := start.Add(res)
+
+		toExport := opencost.GenerateMockAllocationSet(start)
+		err = export.Export(opencost.NewClosedWindow(start, end), toExport)
+		if err != nil {
+			t.Fatalf("failed to export data: %v", err)
+		}
+
+		expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "")
+
+		data, err := store.Read(expectedPath)
+		if err != nil {
+			t.Fatalf("failed to read data from store: %v", err)
+		}
+
+		if len(data) == 0 {
+			t.Fatalf("expected data to be non-empty, got empty")
+		}
+
+		var as *opencost.AllocationSet = new(opencost.AllocationSet)
+		err = as.UnmarshalBinary(data)
+		if err != nil {
+			t.Fatalf("failed to unmarshal data: %v", err)
+		}
+
+		if as.IsEmpty() {
+			t.Fatalf("expected allocation set to be non-empty, got empty")
+		}
+	})
+}

+ 37 - 1
core/pkg/exporter/pathing/path_test.go

@@ -102,6 +102,7 @@ func TestEventPathFormatter(t *testing.T) {
 		rootPath  string
 		clusterID string
 		event     string
+		subPaths  []string
 		prefix    string
 		fileExt   string
 		expected  string
@@ -113,6 +114,7 @@ func TestEventPathFormatter(t *testing.T) {
 			rootPath:  "/tmp",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
+			subPaths:  []string{},
 			prefix:    "",
 			fileExt:   "json",
 			expected:  "/tmp/federated/cluster-a/heartbeat/20240101124000.json",
@@ -122,15 +124,27 @@ func TestEventPathFormatter(t *testing.T) {
 			rootPath:  "",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
+			subPaths:  []string{},
 			prefix:    "",
 			fileExt:   "json",
 			expected:  "federated/cluster-a/heartbeat/20240101124000.json",
 		},
+		{
+			name:      "with root path with file extension with sub-paths",
+			rootPath:  "/tmp",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			subPaths:  []string{"foo", "bar"},
+			prefix:    "",
+			fileExt:   "json",
+			expected:  "/tmp/federated/cluster-a/heartbeat/foo/bar/20240101124000.json",
+		},
 		{
 			name:      "without file extension",
 			rootPath:  "",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
+			subPaths:  []string{},
 			prefix:    "",
 			fileExt:   "",
 			expected:  "federated/cluster-a/heartbeat/20240101124000",
@@ -140,24 +154,46 @@ func TestEventPathFormatter(t *testing.T) {
 			rootPath:  "",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
+			subPaths:  []string{},
 			prefix:    "test",
 			fileExt:   "json",
 			expected:  "federated/cluster-a/heartbeat/test.20240101124000.json",
 		},
+		{
+			name:      "with prefix with file extension with sub-paths",
+			rootPath:  "",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			subPaths:  []string{"foo", "bar", "baz"},
+			prefix:    "test",
+			fileExt:   "json",
+			expected:  "federated/cluster-a/heartbeat/foo/bar/baz/test.20240101124000.json",
+		},
 		{
 			name:      "with prefix without file extension",
 			rootPath:  "",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
+			subPaths:  []string{},
 			prefix:    "test",
 			fileExt:   "",
 			expected:  "federated/cluster-a/heartbeat/test.20240101124000",
 		},
+		{
+			name:      "with prefix without file extension with sub-paths",
+			rootPath:  "",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			subPaths:  []string{"foo"},
+			prefix:    "test",
+			fileExt:   "",
+			expected:  "federated/cluster-a/heartbeat/foo/test.20240101124000",
+		},
 	}
 
 	for _, tc := range testCases {
 		t.Run(tc.name, func(t *testing.T) {
-			pathing, err := NewEventStoragePathFormatter(tc.rootPath, tc.clusterID, tc.event)
+			pathing, err := NewEventStoragePathFormatter(tc.rootPath, tc.clusterID, tc.event, tc.subPaths...)
 			if err != nil {
 				t.Fatalf("Unexpected error: %v", err)
 			}

+ 2 - 2
core/pkg/heartbeat/exporter/exporter.go

@@ -9,14 +9,14 @@ import (
 )
 
 // NewHeartbeatExporter creates a new `StorageExporter[Heartbeat]` instance for exporting Heartbeat events.
-func NewHeartbeatExporter(clusterId string, applicationName string, storage storage.Storage) *exporter.StorageExporter[heartbeat.Heartbeat] {
+func NewHeartbeatExporter(clusterId string, applicationName string, storage storage.Storage) exporter.EventExporter[heartbeat.Heartbeat] {
 	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, heartbeat.HeartbeatEventName, applicationName)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil
 	}
 
-	return exporter.NewStorageExporter(
+	return exporter.NewEventStorageExporter(
 		heartbeat.HeartbeatEventName,
 		pathing,
 		NewHeartbeatEncoder(),

+ 4 - 0
pkg/costmodel/costmodel.go

@@ -1796,3 +1796,7 @@ func computeIdleAllocations(allocSet *opencost.AllocationSet, assetSet *opencost
 
 	return idleSet, nil
 }
+
+func (cm *CostModel) GetDataSource() source.OpenCostDataSource {
+	return cm.DataSource
+}

+ 9 - 6
pkg/exporter/allocation/source.go

@@ -6,17 +6,20 @@ import (
 	"github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/pipelines"
-	"github.com/opencost/opencost/pkg/costmodel"
 )
 
+type AllocationSource interface {
+	ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error)
+}
+
 type AllocationComputeSource struct {
-	cm *costmodel.CostModel
+	src AllocationSource
 }
 
-// NewAllocationComputeSource creates an `exporter.ComputeSource[opencost.AssetSet]` implementation
-func NewAllocationComputeSource(cm *costmodel.CostModel) exporter.ComputeSource[opencost.AllocationSet] {
+// NewAllocationComputeSource creates an `exporter.ComputeSource[opencost.AllocationSet]` implementation
+func NewAllocationComputeSource(src AllocationSource) exporter.ComputeSource[opencost.AllocationSet] {
 	return &AllocationComputeSource{
-		cm: cm,
+		src: src,
 	}
 }
 
@@ -31,7 +34,7 @@ func (acs *AllocationComputeSource) CanCompute(start, end time.Time) bool {
 
 // Compute should compute a single T for the given time range, optionally using the given resolution.
 func (acs *AllocationComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
-	return acs.cm.ComputeAllocation(start, end, resolution)
+	return acs.src.ComputeAllocation(start, end, resolution)
 }
 
 // Name returns the name of the ComputeSource

+ 8 - 5
pkg/exporter/asset/source.go

@@ -6,17 +6,20 @@ import (
 	"github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/pipelines"
-	"github.com/opencost/opencost/pkg/costmodel"
 )
 
+type AssetSource interface {
+	ComputeAssets(start, end time.Time) (*opencost.AssetSet, error)
+}
+
 type AssetsComputeSource struct {
-	cm *costmodel.CostModel
+	src AssetSource
 }
 
 // NewAssetsComputeSource creates an `exporter.ComputeSource[opencost.AssetSet]` implementation
-func NewAssetsComputeSource(cm *costmodel.CostModel) exporter.ComputeSource[opencost.AssetSet] {
+func NewAssetsComputeSource(src AssetSource) exporter.ComputeSource[opencost.AssetSet] {
 	return &AssetsComputeSource{
-		cm: cm,
+		src: src,
 	}
 }
 
@@ -31,7 +34,7 @@ func (acs *AssetsComputeSource) CanCompute(start, end time.Time) bool {
 
 // Compute should compute a single T for the given time range, optionally using the given resolution.
 func (acs *AssetsComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.AssetSet, error) {
-	return acs.cm.ComputeAssets(start, end)
+	return acs.src.ComputeAssets(start, end)
 }
 
 // Name returns the name of the ComputeSource

+ 31 - 9
pkg/exporter/controllers.go

@@ -6,13 +6,25 @@ import (
 	export "github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/source"
 	"github.com/opencost/opencost/core/pkg/storage"
-	"github.com/opencost/opencost/pkg/costmodel"
+	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/pkg/exporter/allocation"
 	"github.com/opencost/opencost/pkg/exporter/asset"
 	"github.com/opencost/opencost/pkg/exporter/networkinsight"
 )
 
+// ComputePipelineSource is an interface that defines methods for computing all pipeline data.
+// For all intents and purposes, this represents costmodel.CostModel. To interface allows tests to
+// mock the costmodel.CostModel and return a different source for the pipeline.
+type ComputePipelineSource interface {
+	allocation.AllocationSource
+	asset.AssetSource
+	networkinsight.NetworkInsightSource
+
+	GetDataSource() source.OpenCostDataSource
+}
+
 // PipelinesExportConfig is a configuration struct that contains the export resolutions for
 // allocation, assets, and network insights pipelines.
 type PipelinesExportConfig struct {
@@ -49,12 +61,12 @@ type PipelineExportControllers struct {
 
 // NewPipelineExportControllers creates a new PipelineExportControllers instance with the given cluster ID, storage implementation, cost model, and configuration.
 // Setting the config to nil will use the default hourly and daily export resolutions for each pipeline.
-func NewPipelineExportControllers(clusterId string, store storage.Storage, cm *costmodel.CostModel, config *PipelinesExportConfig) *PipelineExportControllers {
+func NewPipelineExportControllers(clusterId string, store storage.Storage, cm ComputePipelineSource, config *PipelinesExportConfig) *PipelineExportControllers {
 	if config == nil {
 		config = DefaultPipelinesExportConfig()
 	}
 
-	mins := int(cm.DataSource.Resolution().Minutes())
+	mins := int(cm.GetDataSource().Resolution().Minutes())
 	if mins <= 0 {
 		mins = 1
 	}
@@ -72,8 +84,11 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm *c
 			continue
 		}
 
-		allocExporter := NewAllocationStorageExporter(clusterId, res, store)
-		allocController := export.NewComputeExportController(allocSource, allocExporter, sourceResolution)
+		allocController, err := NewComputePipelineExportController(clusterId, store, allocSource, res, sourceResolution)
+		if err != nil {
+			log.Errorf("Failed to create allocation export controller for resolution: %s - %v", timeutil.DurationString(res), err)
+			continue
+		}
 
 		allocExportControllers = append(allocExportControllers, allocController)
 	}
@@ -87,8 +102,12 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm *c
 			log.Warnf("Configured asset pipeline resolution %dm is less than source resolution %dm. Not configuring the exporter for this resolution.", int64(res.Minutes()), int64(sourceResolution.Minutes()))
 			continue
 		}
-		assetExporter := NewAssetsStorageExporter(clusterId, res, store)
-		assetController := export.NewComputeExportController(assetSource, assetExporter, sourceResolution)
+
+		assetController, err := NewComputePipelineExportController(clusterId, store, assetSource, res, sourceResolution)
+		if err != nil {
+			log.Errorf("Failed to create asset export controller for resolution: %s - %v", timeutil.DurationString(res), err)
+			continue
+		}
 
 		assetExportControllers = append(assetExportControllers, assetController)
 	}
@@ -103,8 +122,11 @@ func NewPipelineExportControllers(clusterId string, store storage.Storage, cm *c
 			continue
 		}
 
-		networkInsightExporter := NewNetworkInsightStorageExporter(clusterId, res, store)
-		networkInsightController := export.NewComputeExportController(networkInsightSource, networkInsightExporter, sourceResolution)
+		networkInsightController, err := NewComputePipelineExportController(clusterId, store, networkInsightSource, res, sourceResolution)
+		if err != nil {
+			log.Errorf("Failed to create network insight export controller for resolution: %s - %v", timeutil.DurationString(res), err)
+			continue
+		}
 
 		networkInsightExportControllers = append(networkInsightExportControllers, networkInsightController)
 	}

+ 0 - 24
pkg/exporter/encoder.go

@@ -1,24 +0,0 @@
-package exporter
-
-import (
-	export "github.com/opencost/opencost/core/pkg/exporter"
-	"github.com/opencost/opencost/core/pkg/opencost"
-)
-
-// NewAllocationEncoder creates an `export.Encoder[opencost.AllocationSet]` implementation for
-// encoding AllocationSet data.
-func NewAllocationEncoder() export.Encoder[opencost.AllocationSet] {
-	return export.NewBingenEncoder[opencost.AllocationSet]()
-}
-
-// NewAssetsEncoder creates an `export.Encoder[opencost.AssetSet]` implementation for
-// encoding AssetSet data.
-func NewAssetsEncoder() export.Encoder[opencost.AssetSet] {
-	return export.NewBingenEncoder[opencost.AssetSet]()
-}
-
-// NewNetworkInsightEncoder creates an `export.Encoder[opencost.NetworkInsightSet]` implementation for
-// encoding NetworkInsightSet data.
-func NewNetworkInsightEncoder() export.Encoder[opencost.NetworkInsightSet] {
-	return export.NewBingenEncoder[opencost.NetworkInsightSet]()
-}

+ 397 - 0
pkg/exporter/exporter_test.go

@@ -0,0 +1,397 @@
+package exporter
+
+import (
+	"testing"
+	"time"
+
+	"github.com/julienschmidt/httprouter"
+	"github.com/opencost/opencost/core/pkg/clusters"
+	"github.com/opencost/opencost/core/pkg/diagnostics"
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+	"github.com/opencost/opencost/core/pkg/source"
+	"github.com/opencost/opencost/core/pkg/storage"
+)
+
+const (
+	TestClusterId  = "test-cluster"
+	TestResolution = 24 * time.Hour
+)
+
+type GenerateMockSet[T any] func(start, end time.Time) *T
+
+type MockSource[T any] struct {
+	generate GenerateMockSet[T]
+}
+
+func (ms *MockSource[T]) CanCompute(start, end time.Time) bool {
+	return true
+}
+func (ms *MockSource[T]) Compute(start, end time.Time, resolution time.Duration) (*T, error) {
+	return ms.generate(start, end), nil
+}
+func (ms *MockSource[T]) Name() string {
+	return pipelines.NameFor[T]()
+}
+
+func NewMockAllocationSource() exporter.ComputeSource[opencost.AllocationSet] {
+	return &MockSource[opencost.AllocationSet]{
+		generate: func(start, end time.Time) *opencost.AllocationSet { return opencost.GenerateMockAllocationSet(start) },
+	}
+}
+
+func NewMockAssetSource() exporter.ComputeSource[opencost.AssetSet] {
+	return &MockSource[opencost.AssetSet]{
+		generate: func(start, end time.Time) *opencost.AssetSet {
+			return opencost.GenerateMockAssetSet(start, TestResolution)
+		},
+	}
+}
+
+func NewMockNetworkInsightSource() exporter.ComputeSource[opencost.NetworkInsightSet] {
+	return &MockSource[opencost.NetworkInsightSet]{
+		generate: func(start, end time.Time) *opencost.NetworkInsightSet {
+			return opencost.GenerateMockNetworkInsightSet(start, end)
+		},
+	}
+}
+
+type MockDataSource struct {
+	resolution time.Duration
+}
+
+func NewMockDataSource() *MockDataSource {
+	return NewMockDataSourceWith(time.Minute)
+}
+
+func NewMockDataSourceWith(resolution time.Duration) *MockDataSource {
+	return &MockDataSource{
+		resolution: resolution,
+	}
+}
+
+func (mds *MockDataSource) RegisterEndPoints(router *httprouter.Router)                   {}
+func (mds *MockDataSource) RegisterDiagnostics(diagService diagnostics.DiagnosticService) {}
+func (mds *MockDataSource) Metrics() source.MetricsQuerier                                { return nil }
+func (mds *MockDataSource) ClusterMap() clusters.ClusterMap                               { return nil }
+func (mds *MockDataSource) ClusterInfo() clusters.ClusterInfoProvider                     { return nil }
+func (mds *MockDataSource) BatchDuration() time.Duration                                  { return time.Hour * 20000 }
+func (mds *MockDataSource) Resolution() time.Duration                                     { return mds.resolution }
+
+type MockPipelineComputeSource struct {
+	allocSource exporter.ComputeSource[opencost.AllocationSet]
+	assetSource exporter.ComputeSource[opencost.AssetSet]
+	netSource   exporter.ComputeSource[opencost.NetworkInsightSet]
+	ds          *MockDataSource
+}
+
+func NewMockPipelineComputeSource() *MockPipelineComputeSource {
+	return &MockPipelineComputeSource{
+		allocSource: NewMockAllocationSource(),
+		assetSource: NewMockAssetSource(),
+		netSource:   NewMockNetworkInsightSource(),
+		ds:          NewMockDataSource(),
+	}
+}
+
+func NewMockPipelineComputeSourceWith(srcResolution time.Duration) *MockPipelineComputeSource {
+	return &MockPipelineComputeSource{
+		allocSource: NewMockAllocationSource(),
+		assetSource: NewMockAssetSource(),
+		netSource:   NewMockNetworkInsightSource(),
+		ds:          NewMockDataSourceWith(srcResolution),
+	}
+}
+
+func (mpcs *MockPipelineComputeSource) ComputeAllocation(start, end time.Time, resolution time.Duration) (*opencost.AllocationSet, error) {
+	return mpcs.allocSource.Compute(start, end, resolution)
+}
+func (mpcs *MockPipelineComputeSource) ComputeAssets(start, end time.Time) (*opencost.AssetSet, error) {
+	return mpcs.assetSource.Compute(start, end, TestResolution)
+}
+func (mpcs *MockPipelineComputeSource) ComputeNetworkInsights(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
+	return mpcs.netSource.Compute(start, end, resolution)
+}
+func (mpcs *MockPipelineComputeSource) GetDataSource() source.OpenCostDataSource {
+	return mpcs.ds
+}
+
+type UnknownSet struct{}
+
+func (u *UnknownSet) MarshalBinary() ([]byte, error) {
+	return []byte{}, nil
+}
+func (u *UnknownSet) UnmarshalBinary(data []byte) error {
+	return nil
+}
+func (u *UnknownSet) IsEmpty() bool {
+	return false
+}
+
+type PipelineData[T any] interface {
+	UnmarshalBinary(data []byte) error
+	IsEmpty() bool
+	*T
+}
+
+func ptr[T any](v T) *T {
+	return &v
+}
+
+func TestExporters(t *testing.T) {
+	t.Run("allocation exporter", func(t *testing.T) {
+		allocSource := NewMockAllocationSource()
+		memStore := storage.NewMemoryStorage()
+		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
+		allocExporter, err := NewComputePipelineExporter[opencost.AllocationSet](TestClusterId, TestResolution, memStore)
+		if err != nil {
+			t.Fatalf("failed to create allocation exporter: %v", err)
+		}
+
+		end := time.Now().UTC().Truncate(TestResolution)
+		start := end.Add(-TestResolution)
+
+		data, err := allocSource.Compute(start, end, TestResolution)
+		if err != nil {
+			t.Fatalf("failed to compute allocation data: %v", err)
+		}
+
+		err = allocExporter.Export(opencost.NewClosedWindow(start, end), data)
+		if err != nil {
+			t.Fatalf("failed to export allocation data: %v", err)
+		}
+
+		validateFileCreation[opencost.AllocationSet](t, memStore, p, start, end)
+	})
+
+	t.Run("asset exporter", func(t *testing.T) {
+		assetSource := NewMockAssetSource()
+		memStore := storage.NewMemoryStorage()
+		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
+		assetExporter, err := NewComputePipelineExporter[opencost.AssetSet](TestClusterId, TestResolution, memStore)
+		if err != nil {
+			t.Fatalf("failed to create allocation exporter: %v", err)
+		}
+
+		end := time.Now().UTC().Truncate(TestResolution)
+		start := end.Add(-TestResolution)
+
+		data, err := assetSource.Compute(start, end, TestResolution)
+		if err != nil {
+			t.Fatalf("failed to compute asset data: %v", err)
+		}
+
+		err = assetExporter.Export(opencost.NewClosedWindow(start, end), data)
+		if err != nil {
+			t.Fatalf("failed to export asset data: %v", err)
+		}
+
+		validateFileCreation[opencost.AssetSet](t, memStore, p, start, end)
+	})
+
+	t.Run("network insight exporter", func(t *testing.T) {
+		netInsightSource := NewMockNetworkInsightSource()
+		memStore := storage.NewMemoryStorage()
+		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create path formatter: %v", err)
+		}
+
+		netInsightExporter, err := NewComputePipelineExporter[opencost.NetworkInsightSet](TestClusterId, TestResolution, memStore)
+		if err != nil {
+			t.Fatalf("failed to create net insights exporter: %v", err)
+		}
+
+		end := time.Now().UTC().Truncate(TestResolution)
+		start := end.Add(-TestResolution)
+
+		data, err := netInsightSource.Compute(start, end, TestResolution)
+		if err != nil {
+			t.Fatalf("failed to compute net insights data: %v", err)
+		}
+
+		err = netInsightExporter.Export(opencost.NewClosedWindow(start, end), data)
+		if err != nil {
+			t.Fatalf("failed to export net insights data: %v", err)
+		}
+
+		validateFileCreation[opencost.NetworkInsightSet](t, memStore, p, start, end)
+	})
+
+	t.Run("unknown exporter", func(t *testing.T) {
+		memStore := storage.NewMemoryStorage()
+
+		// Invalid pipeline
+		_, err := NewComputePipelineExporter[UnknownSet](TestClusterId, TestResolution, memStore)
+		if err == nil {
+			t.Fatalf("expected error creating unknown pipeline exporter, got nil")
+		}
+
+		// Invalid cluster id
+		_, err = NewComputePipelineExporter[opencost.AllocationSet]("", TestResolution, memStore)
+		if err == nil {
+			t.Fatalf("expected error creating allocation pipeline exporter with empty cluster id, got nil")
+		}
+	})
+}
+
+func TestPipelineExportControllers(t *testing.T) {
+	t.Run("with custom export config", func(t *testing.T) {
+		pipelineComputeSource := NewMockPipelineComputeSource()
+		memStore := storage.NewMemoryStorage()
+
+		exportControllers := NewPipelineExportControllers(TestClusterId, memStore, pipelineComputeSource, &PipelinesExportConfig{
+			AllocationPiplineResolutions:      []time.Duration{TestResolution},
+			AssetPipelineResolutons:           []time.Duration{TestResolution},
+			NetworkInsightPipelineResolutions: []time.Duration{TestResolution},
+		})
+
+		start := time.Now().UTC().Truncate(TestResolution)
+		end := start.Add(TestResolution)
+
+		// allow a single export to occur
+		exportControllers.Start(time.Second)
+		time.Sleep(time.Second + (750 * time.Millisecond))
+		exportControllers.Stop()
+
+		allocPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create allocations path formatter: %v", err)
+		}
+		assetPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create assets path formatter: %v", err)
+		}
+		netPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create net insights path formatter: %v", err)
+		}
+
+		validateFileCreation[opencost.AllocationSet](t, memStore, allocPath, start, end)
+		validateFileCreation[opencost.AssetSet](t, memStore, assetPath, start, end)
+		validateFileCreation[opencost.NetworkInsightSet](t, memStore, netPath, start, end)
+	})
+
+	t.Run("with auto-set to minute resolution", func(t *testing.T) {
+		pipelineComputeSource := NewMockPipelineComputeSourceWith(30 * time.Second)
+		memStore := storage.NewMemoryStorage()
+
+		exportControllers := NewPipelineExportControllers(TestClusterId, memStore, pipelineComputeSource, &PipelinesExportConfig{
+			AllocationPiplineResolutions:      []time.Duration{TestResolution},
+			AssetPipelineResolutons:           []time.Duration{TestResolution},
+			NetworkInsightPipelineResolutions: []time.Duration{TestResolution},
+		})
+
+		start := time.Now().UTC().Truncate(TestResolution)
+		end := start.Add(TestResolution)
+
+		// allow a single export to occur
+		exportControllers.Start(time.Second)
+		time.Sleep(time.Second + (750 * time.Millisecond))
+		exportControllers.Stop()
+
+		allocPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create allocations path formatter: %v", err)
+		}
+		assetPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create assets path formatter: %v", err)
+		}
+		netPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		if err != nil {
+			t.Fatalf("failed to create net insights path formatter: %v", err)
+		}
+
+		validateFileCreation[opencost.AllocationSet](t, memStore, allocPath, start, end)
+		validateFileCreation[opencost.AssetSet](t, memStore, assetPath, start, end)
+		validateFileCreation[opencost.NetworkInsightSet](t, memStore, netPath, start, end)
+	})
+
+	t.Run("with default export config", func(t *testing.T) {
+		pipelineComputeSource := NewMockPipelineComputeSource()
+		memStore := storage.NewMemoryStorage()
+
+		exportControllers := NewPipelineExportControllers(TestClusterId, memStore, pipelineComputeSource, nil)
+
+		if len(exportControllers.AllocationExportController.Resolutions()) != 2 {
+			t.Fatalf("expected 2 allocation resolutions, got %d", len(exportControllers.AllocationExportController.Resolutions()))
+		}
+		if len(exportControllers.AssetExportController.Resolutions()) != 2 {
+			t.Fatalf("expected 2 asset resolutions, got %d", len(exportControllers.AssetExportController.Resolutions()))
+		}
+		if len(exportControllers.NetworkInsightExportController.Resolutions()) != 2 {
+			t.Fatalf("expected 2 network insight resolutions, got %d", len(exportControllers.NetworkInsightExportController.Resolutions()))
+		}
+	})
+
+	t.Run("with 2day source resolution", func(t *testing.T) {
+		// make compute source use a source resolution of 48 hours
+		pipelineComputeSource := NewMockPipelineComputeSourceWith(48 * time.Hour)
+		memStore := storage.NewMemoryStorage()
+
+		exportControllers := NewPipelineExportControllers(TestClusterId, memStore, pipelineComputeSource, nil)
+
+		if len(exportControllers.AllocationExportController.Resolutions()) != 0 {
+			t.Fatalf("expected 0 allocation resolutions, got %d", len(exportControllers.AllocationExportController.Resolutions()))
+		}
+		if len(exportControllers.AssetExportController.Resolutions()) != 0 {
+			t.Fatalf("expected 0 asset resolutions, got %d", len(exportControllers.AssetExportController.Resolutions()))
+		}
+		if len(exportControllers.NetworkInsightExportController.Resolutions()) != 0 {
+			t.Fatalf("expected 0 network insight resolutions, got %d", len(exportControllers.NetworkInsightExportController.Resolutions()))
+		}
+	})
+
+	t.Run("with empty cluster id", func(t *testing.T) {
+		pipelineComputeSource := NewMockPipelineComputeSource()
+		memStore := storage.NewMemoryStorage()
+
+		exportControllers := NewPipelineExportControllers("", memStore, pipelineComputeSource, nil)
+
+		if len(exportControllers.AllocationExportController.Resolutions()) != 0 {
+			t.Fatalf("expected 0 allocation resolutions, got %d", len(exportControllers.AllocationExportController.Resolutions()))
+		}
+		if len(exportControllers.AssetExportController.Resolutions()) != 0 {
+			t.Fatalf("expected 0 asset resolutions, got %d", len(exportControllers.AssetExportController.Resolutions()))
+		}
+		if len(exportControllers.NetworkInsightExportController.Resolutions()) != 0 {
+			t.Fatalf("expected 0 network insight resolutions, got %d", len(exportControllers.NetworkInsightExportController.Resolutions()))
+		}
+	})
+}
+
+// test helper function that will load a path from a storage implementation and ensure that the file is not empty and can be decoded, etc...
+func validateFileCreation[T any, U PipelineData[T]](t *testing.T, memStore storage.Storage, p pathing.StoragePathFormatter[opencost.Window], start, end time.Time) {
+	t.Helper()
+
+	expectedPath := p.ToFullPath("", opencost.NewClosedWindow(start, end), "")
+
+	fileContents, err := memStore.Read(expectedPath)
+	if err != nil {
+		t.Fatalf("failed to read file %s: %v", expectedPath, err)
+	}
+	if len(fileContents) == 0 {
+		t.Fatalf("file %s is empty", expectedPath)
+	}
+
+	var set U = new(T)
+	err = set.UnmarshalBinary(fileContents)
+	if err != nil {
+		t.Fatalf("failed to unmarshal data: %v", err)
+	}
+
+	if set.IsEmpty() {
+		t.Fatalf("data set is empty")
+	}
+}

+ 30 - 37
pkg/exporter/exporters.go

@@ -1,64 +1,57 @@
 package exporter
 
 import (
+	"fmt"
 	"time"
 
 	export "github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/exporter/pathing"
 	"github.com/opencost/opencost/core/pkg/exporter/validator"
-	"github.com/opencost/opencost/core/pkg/log"
-	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/typeutil"
 )
 
-func NewAllocationStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.ComputeExporter[opencost.AllocationSet] {
-	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.AllocationPipelineName, &resolution)
-	if err != nil {
-		log.Errorf("failed to create pathing formatter: %v", err)
-		return nil
+// NewComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
+// by window for a specific pipeline.
+func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
+	clusterId string,
+	resolution time.Duration,
+	store storage.Storage,
+) (export.ComputeExporter[T], error) {
+	pipelineName := pipelines.NameFor[T]()
+	if pipelineName == "" {
+		return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
 	}
 
-	return export.NewComputeStorageExporter(
-		pipelines.AllocationPipelineName,
-		resolution,
-		pathing,
-		NewAllocationEncoder(),
-		store,
-		validator.NewSetValidator[opencost.AllocationSet](resolution),
-	)
-}
-
-func NewAssetsStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.ComputeExporter[opencost.AssetSet] {
-	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.AssetsPipelineName, &resolution)
+	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelineName, &resolution)
 	if err != nil {
-		log.Errorf("failed to create pathing formatter: %v", err)
-		return nil
+		return nil, fmt.Errorf("failed to create path formatter: %w", err)
 	}
 
 	return export.NewComputeStorageExporter(
-		pipelines.AssetsPipelineName,
+		pipelineName,
 		resolution,
 		pathing,
-		NewAssetsEncoder(),
+		export.NewBingenEncoder[T, U](),
 		store,
-		validator.NewSetValidator[opencost.AssetSet](resolution),
-	)
+		validator.NewSetValidator[T, S](resolution),
+	), nil
 }
 
-func NewNetworkInsightStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.ComputeExporter[opencost.NetworkInsightSet] {
-	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.NetworkInsightPipelineName, &resolution)
+// NewComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
+// using the provided source, storage, resolution, and source resolution.
+func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
+	clusterId string,
+	store storage.Storage,
+	source export.ComputeSource[T],
+	resolution time.Duration,
+	sourceResolution time.Duration,
+) (*export.ComputeExportController[T], error) {
+	exporter, err := NewComputePipelineExporter[T, U, S](clusterId, resolution, store)
 	if err != nil {
-		log.Errorf("failed to create pathing formatter: %v", err)
-		return nil
+		return nil, fmt.Errorf("failed to create compute exporter: %w", err)
 	}
 
-	return export.NewComputeStorageExporter(
-		pipelines.NetworkInsightPipelineName,
-		resolution,
-		pathing,
-		NewNetworkInsightEncoder(),
-		store,
-		validator.NewSetValidator[opencost.NetworkInsightSet](resolution),
-	)
+	return export.NewComputeExportController(source, exporter, resolution, sourceResolution), nil
 }

+ 8 - 5
pkg/exporter/networkinsight/source.go

@@ -6,17 +6,20 @@ import (
 	"github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/pipelines"
-	"github.com/opencost/opencost/pkg/costmodel"
 )
 
+type NetworkInsightSource interface {
+	ComputeNetworkInsights(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error)
+}
+
 type NetworkInsightsComputeSource struct {
-	cm *costmodel.CostModel
+	src NetworkInsightSource
 }
 
 // NewNetworkInsightsComputeSource creates an `exporter.ComputeSource[opencost.NetworkInsightSet]` implementation
-func NewNetworkInsightsComputeSource(cm *costmodel.CostModel) exporter.ComputeSource[opencost.NetworkInsightSet] {
+func NewNetworkInsightsComputeSource(src NetworkInsightSource) exporter.ComputeSource[opencost.NetworkInsightSet] {
 	return &NetworkInsightsComputeSource{
-		cm: cm,
+		src: src,
 	}
 }
 
@@ -31,7 +34,7 @@ func (acs *NetworkInsightsComputeSource) CanCompute(start, end time.Time) bool {
 
 // Compute should compute a single T for the given time range, optionally using the given resolution.
 func (acs *NetworkInsightsComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
-	return acs.cm.ComputeNetworkInsights(start, end, resolution)
+	return acs.src.ComputeNetworkInsights(start, end, resolution)
 }
 
 // Name returns the name of the ComputeSource