Просмотр исходного кода

Added sytstem for exporting timestampped events. Added heartbeat event exporting

Matt Bolt 1 год назад
Родитель
Сommit
0ec9998316

+ 1 - 1
core/go.mod

@@ -12,6 +12,7 @@ require (
 	github.com/davecgh/go-spew v1.1.1
 	github.com/goccy/go-json v0.10.5
 	github.com/google/go-cmp v0.7.0
+	github.com/google/uuid v1.6.0
 	github.com/hashicorp/go-multierror v1.1.1
 	github.com/hashicorp/go-plugin v1.6.0
 	github.com/json-iterator/go v1.1.12
@@ -65,7 +66,6 @@ require (
 	github.com/golang/protobuf v1.5.3 // indirect
 	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/google/s2a-go v0.1.7 // indirect
-	github.com/google/uuid v1.6.0 // indirect
 	github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
 	github.com/googleapis/gax-go/v2 v2.12.0 // indirect
 	github.com/hashicorp/errwrap v1.0.0 // indirect

+ 64 - 4
core/pkg/exporter/controller.go

@@ -26,12 +26,72 @@ type ExportController interface {
 	Stop()
 }
 
+// EventExportController[T] is used to export timestamped events of type T on a specific interval.
+type EventExportController[T any] struct {
+	runState atomic.AtomicRunState
+	source   ExportSource[T]
+	exporter Exporter[T]
+	typeName string
+}
+
+// NewEventExportController creates a new `EventExportController[T]` instance which is used to export timestamped events of type T
+// on a specific interval.
+func NewEventExportController[T any](source ExportSource[T], exporter Exporter[T]) *EventExportController[T] {
+	return &EventExportController[T]{
+		source:   source,
+		exporter: exporter,
+		typeName: reflect.TypeOf((*T)(nil)).Elem().String(),
+	}
+}
+
+// Name returns the name of the controller, which is the name of the T-type
+func (cd *EventExportController[T]) Name() string {
+	return cd.typeName
+}
+
+// Start starts a background export loop, which will create a new event instance for the current minute-truncated time
+// and export it on the provided interval. This function will return `true` if the loop was started successfully, and
+// `false` if it was already running.
+func (cd *EventExportController[T]) Start(interval time.Duration) bool {
+	cd.runState.WaitForReset()
+	if !cd.runState.Start() {
+		return false
+	}
+
+	go func() {
+		for {
+			select {
+			case <-cd.runState.OnStop():
+				cd.runState.Reset()
+				return // exit go routine
+
+			case <-time.After(interval):
+			}
+
+			// truncate the time to the minute to ensure broad enough coverage for event exports
+			t := time.Now().UTC().Truncate(time.Second)
+
+			err := cd.exporter.Export(cd.source.Make(t))
+			if err != nil {
+				log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
+			}
+		}
+	}()
+
+	return true
+}
+
+// Stops the export loop
+func (cd *EventExportController[T]) Stop() {
+	cd.runState.Stop()
+}
+
 // ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
 // to regularly compute the data for the current resolution and export it on a specific interval.
 type ComputeExportController[T any] struct {
 	runState         atomic.AtomicRunState
 	source           ComputeSource[T]
-	exporter         Exporter[T]
+	exporter         ComputeExporter[T]
 	resolution       time.Duration
 	sourceResolution time.Duration
 	typeName         string
@@ -40,7 +100,7 @@ type ComputeExportController[T any] struct {
 // NewComputeExportController creates a new `ComputeExportController[T]` instance.
 func NewComputeExportController[T any](
 	source ComputeSource[T],
-	exporter Exporter[T],
+	exporter ComputeExporter[T],
 	sourceResolution time.Duration,
 ) *ComputeExportController[T] {
 	return &ComputeExportController[T]{
@@ -133,11 +193,11 @@ func (cd *ComputeExportController[T]) Stop() {
 // temporary
 func (cd *ComputeExportController[T]) logErrors(start, end time.Time, warnings []string, errors []string) {
 	for _, w := range warnings {
-		log.Warnf("[%s] %s", cd.typeName, w)
+		log.Warnf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), w)
 	}
 
 	for _, e := range errors {
-		log.Errorf("[%s] %s", cd.typeName, e)
+		log.Errorf("[%s] (%s-%s) %s", cd.typeName, start.Format(time.RFC3339), end.Format(time.RFC3339), e)
 	}
 }
 

+ 36 - 1
core/pkg/exporter/encoder.go

@@ -1,10 +1,19 @@
 package exporter
 
-import "encoding"
+import (
+	"encoding"
+
+	"github.com/opencost/opencost/core/pkg/util/json"
+)
 
 // Encoder[T] is a generic interface for encoding an instance of a T type into a byte slice.
 type Encoder[T any] interface {
 	Encode(*T) ([]byte, error)
+
+	// FileExt returns the file extension for the encoded data. This can be used by a pathing strategy
+	// to append the file extension when exporting the data. Returning an empty string will typically
+	// omit the file extension completely.
+	FileExt() string
 }
 
 // BinaryMarshalerPtr[T] is a generic constraint to ensure types passed to the encoder implement
@@ -29,3 +38,29 @@ func (b *BingenEncoder[T, U]) Encode(data *T) ([]byte, error) {
 	var bingenData U = data
 	return bingenData.MarshalBinary()
 }
+
+// FileExt returns the file extension for the encoded data. In this case, it returns an empty string
+// to indicate that there is no specific file extension for the binary encoded data.
+func (b *BingenEncoder[T, U]) FileExt() string {
+	return ""
+}
+
+// JSONEncoder[T] is a generic encoder that uses the JSON encoding format to encode data.
+type JSONEncoder[T any] struct{}
+
+// NewJSONEncoder creates an `Encoder[T]` implementation which supports JSON encoding for the `T`
+// type.
+func NewJSONEncoder[T any]() Encoder[T] {
+	return new(JSONEncoder[T])
+}
+
+// Encode encodes the provided data of type T into a byte slice using JSON encoding.
+func (j *JSONEncoder[T]) Encode(data *T) ([]byte, error) {
+	return json.Marshal(data)
+}
+
+// FileExt returns the file extension for the encoded data. In this case, it returns "json" to indicate
+// that the data is in JSON format.
+func (j *JSONEncoder[T]) FileExt() string {
+	return "json"
+}

+ 67 - 16
core/pkg/exporter/exporter.go

@@ -13,37 +13,89 @@ import (
 
 // Exporter[T] is a generic interface for exporting T instances to a specific storage destination.
 type Exporter[T any] interface {
-	// Export performs the export operation for the given window and data.
+	// Export performs the export operation for the provided data.
+	Export(data *T) error
+}
+
+// StorageExporter[T] is an implementation of an Exporter[T] that writes data to a storage backend using
+// the `github.com/opencost/opencost/core/pkg/storage` package, a pathing strategy, and an encoder.
+type StorageExporter[T any] struct {
+	pipeline string
+	paths    pathing.StoragePathFormatter[time.Time]
+	encoder  Encoder[T]
+	storage  storage.Storage
+}
+
+// NewStorageExporter creates a new StorageExporter instance, which is responsible for exporting data to a storage backend.
+// It uses a pathing strategy to determine the storage location, an encoder to convert the data to binary format, and
+// a storage backend to write the data.
+func NewStorageExporter[T any](
+	pipeline string,
+	paths pathing.StoragePathFormatter[time.Time],
+	encoder Encoder[T],
+	storage storage.Storage,
+) *StorageExporter[T] {
+	return &StorageExporter[T]{
+		pipeline: pipeline,
+		paths:    paths,
+		encoder:  encoder,
+		storage:  storage,
+	}
+}
+
+// Export performs the export operation for the provided data. It encodes the data using the encoder and writes it to
+// the storage backend using the pathing strategy.
+func (se *StorageExporter[T]) Export(data *T) error {
+	t := time.Now().UTC()
+	path := se.paths.ToFullPath("", t, se.encoder.FileExt())
+
+	bin, err := se.encoder.Encode(data)
+	if err != nil {
+		return fmt.Errorf("failed to encode data: %w", err)
+	}
+
+	log.Debugf("writing new binary data to storage %s", path)
+	err = se.storage.Write(path, bin)
+	if err != nil {
+		return fmt.Errorf("failed to write binary data to file '%s': %w", path, err)
+	}
+
+	return nil
+}
+
+// ComputeExporter[T] is an interface that exports windowed data of type T using a specific resolution.
+type ComputeExporter[T any] interface {
+	// Export performs the export operation for the provided data.
 	Export(window opencost.Window, data *T) error
 
 	// Resolution contains the resolution of the data being exported
 	Resolution() time.Duration
 }
 
-// StorageExporter[T] is an implementation of Exporter[T] that writes data to a storage backend using
+// ComputeStorageExporter[T] is an implementation of ComputeExporter[T] that writes data to a storage backend using
 // `github.com/opencost/opencost/core/pkg/storage`, a pathing strategy, and an encoder.
-type StorageExporter[T any] struct {
+type ComputeStorageExporter[T any] struct {
 	pipeline   string
 	resolution time.Duration
-	paths      pathing.StoragePathFormatter
+	paths      pathing.StoragePathFormatter[opencost.Window]
 	encoder    Encoder[T]
 	storage    storage.Storage
 	validator  validator.ExportValidator[T]
 }
 
-// NewStorageExporter creates a new StorageExporter instance, which is responsible for exporting data for
-// a specific window to a storage backend. It uses a pathing strategy to determine the storage location,
-// an encoder to convert the data to binary format, and a validator to check the data before export.
-// The pipeline name and resolution are also provided to help identify the data being exported.
-func NewStorageExporter[T any](
+// NewComputeStorageExporter creates a new ComputeStorageExporter instance, which is responsible for exporting
+// data for a specific window to a storage backend. It uses a pathing strategy to determine the storage location,
+// an encoder to convert the data to binary format, and a validator to check the data before export. The pipeline
+// name and resolution are also provided to help identify the data being exported.
+func NewComputeStorageExporter[T any](
 	pipeline string,
 	resolution time.Duration,
-	paths pathing.StoragePathFormatter,
+	paths pathing.StoragePathFormatter[opencost.Window],
 	encoder Encoder[T],
 	storage storage.Storage,
 	validator validator.ExportValidator[T],
-) *StorageExporter[T] {
-	return &StorageExporter[T]{
+) *ComputeStorageExporter[T] {
+	return &ComputeStorageExporter[T]{
 		pipeline:   pipeline,
 		resolution: resolution,
 		paths:      paths,
@@ -55,7 +107,7 @@ func NewStorageExporter[T any](
 
 // Export performs validation on the provided window and data, determines if it should overwrite existing data,
 // and stores the data in the location specified by the pathing formatter.
-func (se *StorageExporter[T]) Export(window opencost.Window, data *T) error {
+func (se *ComputeStorageExporter[T]) Export(window opencost.Window, data *T) error {
 	if se.validator != nil {
 		err := se.validator.Validate(window, data)
 		if err != nil {
@@ -63,8 +115,7 @@ func (se *StorageExporter[T]) Export(window opencost.Window, data *T) error {
 		}
 	}
 
-	s, e := *window.Start(), *window.End()
-	path := se.paths.ToFullPath("", s, e)
+	path := se.paths.ToFullPath("", window, se.encoder.FileExt())
 
 	currentExists, err := se.storage.Exists(path)
 	if err != nil {
@@ -91,6 +142,6 @@ func (se *StorageExporter[T]) Export(window opencost.Window, data *T) error {
 }
 
 // Resolution returns the resolution of the data being exported.
-func (se *StorageExporter[T]) Resolution() time.Duration {
+func (se *ComputeStorageExporter[T]) Resolution() time.Duration {
 	return se.resolution
 }

+ 20 - 4
core/pkg/exporter/pathing/bingenpath.go

@@ -6,6 +6,7 @@ import (
 	"time"
 
 	"github.com/opencost/opencost/core/pkg/exporter/pathing/pathutils"
+	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
 
@@ -28,7 +29,7 @@ type BingenStoragePathFormatter struct {
 // NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path
 // with the given root directory, cluster id, pipeline, and resolution. To omit the resolution directory
 // structure, provide a `nil` resolution.
-func NewBingenStoragePathFormatter(rootDir, clusterId, pipeline string, resolution *time.Duration) (StoragePathFormatter, error) {
+func NewBingenStoragePathFormatter(rootDir, clusterId, pipeline string, resolution *time.Duration) (StoragePathFormatter[opencost.Window], error) {
 	res := "."
 	if resolution != nil {
 		res = timeutil.FormatStoreResolution(*resolution)
@@ -58,8 +59,8 @@ func (bsf *BingenStoragePathFormatter) RootDir() string {
 // ToFullPath returns the full path to a file name within the storage directory using the format:
 //
 //	<root>/federated/<cluster>/etl/bingen/<pipeline>/<resolution>/<prefix>.<start-epoch>-<end-epoch>
-func (bsf *BingenStoragePathFormatter) ToFullPath(prefix string, start, end time.Time) string {
-	fileName := toBingenFileName(prefix, start, end)
+func (bsf *BingenStoragePathFormatter) ToFullPath(prefix string, window opencost.Window, fileExt string) string {
+	fileName := toBingenFileName(prefix, window, fileExt)
 
 	return path.Join(
 		bsf.rootDir,
@@ -74,8 +75,13 @@ func (bsf *BingenStoragePathFormatter) ToFullPath(prefix string, start, end time
 
 // toBingenFileName formats the file name as <prefix>.<start-epoch>-<end-epoch> if a prefix is non-empty.
 // If prefix is an empty string, then just the format <start-epoch>-<end-epoch> is returned.
-func toBingenFileName(prefix string, start, end time.Time) string {
+func toBingenFileName(prefix string, window opencost.Window, fileExt string) string {
+	start, end := derefTimeOrZero(window.Start()), derefTimeOrZero(window.End())
+
 	suffix := pathutils.FormatEpochRange(start, end)
+	if fileExt != "" {
+		suffix = fmt.Sprintf("%s.%s", suffix, fileExt)
+	}
 
 	if prefix == "" {
 		return suffix
@@ -83,3 +89,13 @@ func toBingenFileName(prefix string, start, end time.Time) string {
 
 	return fmt.Sprintf("%s.%s", prefix, suffix)
 }
+
+// derefTimeOrZero dereferences a time.Time pointer and returns the zero value if the pointer is nil.
+// This prevents nil pointer dereference errors when using windows. This is mostly an assertion, as
+// generally windows for pathing will be pre-validated.
+func derefTimeOrZero(t *time.Time) time.Time {
+	if t == nil {
+		return time.Time{}
+	}
+	return *t
+}

+ 76 - 0
core/pkg/exporter/pathing/eventpath.go

@@ -0,0 +1,76 @@
+package pathing
+
+import (
+	"fmt"
+	"path"
+	"time"
+)
+
+// 2006-01-02T15:04:05Z07:00
+
+// EventStorageTimeFormat is YYYYMMDDHHmmss
+const EventStorageTimeFormat = "20060102150405"
+
+// EventStoragePathFormatter is an implementation of the StoragePathFormatter interface for
+// a cluster separated storage path of the format:
+//
+//	<root>/federated/<cluster>/<event>/YYYYMMDDHHmmss
+type EventStoragePathFormatter struct {
+	rootDir   string
+	clusterId string
+	event     string
+}
+
+// NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path
+// with the given root directory, cluster id, pipeline, and resolution. To omit the resolution directory
+// structure, provide a `nil` resolution.
+func NewEventStoragePathFormatter(rootDir, clusterId, event string) (StoragePathFormatter[time.Time], error) {
+	if clusterId == "" {
+		return nil, fmt.Errorf("cluster id cannot be empty")
+	}
+
+	if event == "" {
+		return nil, fmt.Errorf("event cannot be empty")
+	}
+
+	return &EventStoragePathFormatter{
+		rootDir:   rootDir,
+		clusterId: clusterId,
+		event:     event,
+	}, nil
+}
+
+// RootDir returns the root directory of the storage path formatter.
+func (espf *EventStoragePathFormatter) RootDir() string {
+	return espf.rootDir
+}
+
+// ToFullPath returns the full path to a file name within the storage directory using the format:
+//
+//	<root>/federated/<cluster>/<event>/YYYYMMDDHHmm.json
+func (espf *EventStoragePathFormatter) ToFullPath(prefix string, timestamp time.Time, fileExt string) string {
+	fileName := toEventFileName(prefix, timestamp, fileExt)
+
+	return path.Join(
+		espf.rootDir,
+		federatedDir,
+		espf.clusterId,
+		espf.event,
+		fileName,
+	)
+}
+
+// toEventFileName formats the file name as <prefix>.<timestamp>. if a non-empty fileExt is provided,
+// then the file extension is appended to the file name.
+func toEventFileName(prefix string, timestamp time.Time, fileExt string) string {
+	suffix := timestamp.Format(EventStorageTimeFormat)
+	if fileExt != "" {
+		suffix = fmt.Sprintf("%s.%s", suffix, fileExt)
+	}
+
+	if prefix == "" {
+		return suffix
+	}
+
+	return fmt.Sprintf("%s.%s", prefix, suffix)
+}

+ 79 - 1
core/pkg/exporter/pathing/bingenpath_test.go → core/pkg/exporter/pathing/path_test.go

@@ -3,6 +3,8 @@ package pathing
 import (
 	"testing"
 	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
 )
 
 func TestBingenPathFormatter(t *testing.T) {
@@ -86,7 +88,83 @@ func TestBingenPathFormatter(t *testing.T) {
 				end = start.Add(*tc.resolution)
 			}
 
-			result := pathing.ToFullPath(tc.prefix, start, end)
+			result := pathing.ToFullPath(tc.prefix, opencost.NewClosedWindow(start, end), "")
+			if result != tc.expected {
+				t.Errorf("Expected %s, got %s", tc.expected, result)
+			}
+		})
+	}
+}
+
+func TestEventPathFormatter(t *testing.T) {
+	type testCase struct {
+		name      string
+		rootPath  string
+		clusterID string
+		event     string
+		prefix    string
+		fileExt   string
+		expected  string
+	}
+
+	testCases := []testCase{
+		{
+			name:      "with root path with file extension",
+			rootPath:  "/tmp",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			prefix:    "",
+			fileExt:   "json",
+			expected:  "/tmp/federated/cluster-a/heartbeat/202401011240.json",
+		},
+		{
+			name:      "with file extension",
+			rootPath:  "",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			prefix:    "",
+			fileExt:   "json",
+			expected:  "federated/cluster-a/heartbeat/202401011240.json",
+		},
+		{
+			name:      "without file extension",
+			rootPath:  "",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			prefix:    "",
+			fileExt:   "",
+			expected:  "federated/cluster-a/heartbeat/202401011240",
+		},
+		{
+			name:      "with prefix with file extension",
+			rootPath:  "",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			prefix:    "test",
+			fileExt:   "json",
+			expected:  "federated/cluster-a/heartbeat/test.202401011240.json",
+		},
+		{
+			name:      "with prefix without file extension",
+			rootPath:  "",
+			clusterID: "cluster-a",
+			event:     "heartbeat",
+			prefix:    "test",
+			fileExt:   "",
+			expected:  "federated/cluster-a/heartbeat/test.202401011240",
+		},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			pathing, err := NewEventStoragePathFormatter(tc.rootPath, tc.clusterID, tc.event)
+			if err != nil {
+				t.Fatalf("Unexpected error: %v", err)
+			}
+
+			timestamp := time.Date(2024, 1, 1, 12, 40, 0, 0, time.UTC)
+
+			result := pathing.ToFullPath(tc.prefix, timestamp, tc.fileExt)
 			if result != tc.expected {
 				t.Errorf("Expected %s, got %s", tc.expected, result)
 			}

+ 4 - 7
core/pkg/exporter/pathing/pathing.go

@@ -1,14 +1,11 @@
 package pathing
 
-import "time"
-
-// StoragePathFormatter is a contract for an object capable of building storage paths for pipeline files
-// provided a window.
-type StoragePathFormatter interface {
+// StoragePathFormatter is an interface used to format storage paths for exporting data types.
+type StoragePathFormatter[T any] interface {
 	// RootDir returns the root directory for the storage path.
 	RootDir() string
 
 	// ToFullPath returns the full path to a file name within the storage
-	// directory leveraging a prefix and start and end times.
-	ToFullPath(prefix string, start, end time.Time) string
+	// directory leveraging a prefix and an incoming T type (generally a daterange or timestamp).
+	ToFullPath(prefix string, in T, fileExt string) string
 }

+ 8 - 0
core/pkg/exporter/source.go

@@ -2,6 +2,14 @@ package exporter
 
 import "time"
 
+// ExportSource[T] provides a factory style contract for creating new `T` instances for exporting.
+type ExportSource[T any] interface {
+	Make(timestamp time.Time) *T
+
+	// Name returns the name of the ExportSource.
+	Name() string
+}
+
 // ComputeSource[T] provides an interface for a compute data source.
 type ComputeSource[T any] interface {
 	// CanCompute should return true iff the ComputeSource can effectively act as

+ 15 - 0
core/pkg/heartbeat/controller.go

@@ -0,0 +1,15 @@
+package heartbeat
+
+import (
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/storage"
+)
+
+// NewHeartbeatExportController creates a new EventExportController for Heartbeat events.
+// A HeartbeatMetadataProvider can optionally be provided to append metadata to the Heartbeat payload.
+func NewHeartbeatExportController(clusterId string, store storage.Storage, provider HeartbeatMetadataProvider) *exporter.EventExportController[Heartbeat] {
+	return exporter.NewEventExportController(
+		NewHeartbeatSource(provider),
+		NewHeartbeatExporter(clusterId, store),
+	)
+}

+ 8 - 0
core/pkg/heartbeat/encoder.go

@@ -0,0 +1,8 @@
+package heartbeat
+
+import "github.com/opencost/opencost/core/pkg/exporter"
+
+// NewHeartbeatEncoder returns a JSON encoder used to encode Heartbeat events.
+func NewHeartbeatEncoder() exporter.Encoder[Heartbeat] {
+	return exporter.NewJSONEncoder[Heartbeat]()
+}

+ 24 - 0
core/pkg/heartbeat/exporter.go

@@ -0,0 +1,24 @@
+package heartbeat
+
+import (
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/exporter/pathing"
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/storage"
+)
+
+// NewHeartbeatExporter creates a new `StorageExporter[Heartbeat]` instance for exporting Heartbeat events.
+func NewHeartbeatExporter(clusterId string, storage storage.Storage) *exporter.StorageExporter[Heartbeat] {
+	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, HeartbeatEventName)
+	if err != nil {
+		log.Errorf("failed to create pathing formatter: %v", err)
+		return nil
+	}
+
+	return exporter.NewStorageExporter(
+		HeartbeatEventName,
+		pathing,
+		NewHeartbeatEncoder(),
+		storage,
+	)
+}

+ 32 - 0
core/pkg/heartbeat/heartbeat.go

@@ -0,0 +1,32 @@
+package heartbeat
+
+import (
+	"time"
+)
+
+// HeartbeatEventName is used to represent the name of the heartbeat pipeline event to categorize for storage.
+const HeartbeatEventName string = "heartbeat"
+
+// Heartbeat is a payload struct that contains custom information and the timestamp of the heartbeat.
+type Heartbeat struct {
+	Id        string         `json:"id"`
+	Timestamp time.Time      `json:"timestamp"`
+	Uptime    uint64         `json:"uptime"`
+	Version   string         `json:"version"`
+	Metadata  map[string]any `json:"metadata,omitempty"`
+}
+
+// NewHeartbeat creates a new Heartbeat instance with the provided parameters.
+// The `id` is a unique identifier for the heartbeat, `timestamp` is the time of the heartbeat,
+// `uptime` is the uptime in seconds, `version` is the version of the heartbeat, and `metadata`
+// is a pointer to a generic type that can hold any additional information. Metadata _can_ be omitted
+// by passing `nil`.
+func NewHeartbeat(id string, timestamp time.Time, uptime uint64, version string, metadata map[string]any) *Heartbeat {
+	return &Heartbeat{
+		Id:        id,
+		Timestamp: timestamp,
+		Uptime:    uptime,
+		Version:   version,
+		Metadata:  metadata,
+	}
+}

+ 79 - 0
core/pkg/heartbeat/heartbeat_test.go

@@ -0,0 +1,79 @@
+package heartbeat
+
+import (
+	"encoding/json"
+	"fmt"
+	"path/filepath"
+	"slices"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/storage"
+	"github.com/opencost/opencost/core/pkg/util/sliceutil"
+)
+
+const MockClusterId = "mock-cluster-1"
+
+type MockHeartbeatMetadataProvider struct{}
+
+func NewMockHeartbeatMetadataProvider() *MockHeartbeatMetadataProvider {
+	return &MockHeartbeatMetadataProvider{}
+}
+
+func (m *MockHeartbeatMetadataProvider) GetMetadata() map[string]any {
+	return map[string]any{
+		"cluster_id": MockClusterId,
+	}
+}
+
+func TestHeartbeatExporter(t *testing.T) {
+	mdp := NewMockHeartbeatMetadataProvider()
+	store := NewMockStorage()
+
+	controller := NewHeartbeatExportController(MockClusterId, store, mdp)
+
+	if !controller.Start(time.Second) {
+		t.Fatal("Failed to start controller")
+	}
+
+	time.Sleep(10 * time.Second)
+	controller.Stop()
+
+	files, _ := store.List("federated/mock-cluster-1/heartbeat")
+	if len(files) == 0 {
+		t.Fatal("No files found in storage")
+	}
+
+	fileNames := sliceutil.Map(files, func(stat *storage.StorageInfo) string {
+		return stat.Name
+	})
+
+	slices.Sort(fileNames)
+
+	lastCheck := time.Time{}
+
+	for _, f := range fileNames {
+		fpath := filepath.Join("federated", MockClusterId, "heartbeat", f)
+		data, err := store.Read(fpath)
+		if err != nil {
+			t.Fatalf("Failed to read file %s: %v", fpath, err)
+		}
+
+		hb := new(Heartbeat)
+		if err := json.Unmarshal(data, hb); err != nil {
+			t.Fatalf("Failed to unmarshal heartbeat data: %v", err)
+		}
+
+		fmt.Printf("%s\n%s\n\n", f, string(data))
+
+		if hb.Metadata["cluster_id"] != MockClusterId {
+			t.Fatalf("Expected cluster ID %s, got %s", MockClusterId, hb.Metadata["cluster_id"])
+		}
+
+		if hb.Timestamp.Before(lastCheck) {
+			t.Fatalf("Expected timestamp %s to be after %s", hb.Timestamp, lastCheck)
+		}
+		lastCheck = hb.Timestamp
+
+	}
+}

+ 84 - 0
core/pkg/heartbeat/source.go

@@ -0,0 +1,84 @@
+package heartbeat
+
+import (
+	"time"
+
+	"github.com/google/uuid"
+	"github.com/opencost/opencost/core/pkg/clusters"
+	"github.com/opencost/opencost/core/pkg/log"
+	"github.com/opencost/opencost/core/pkg/version"
+)
+
+// HeartbeatMetadataProvider is an interface that provides metadata for heartbeat instances. It can be used to inject
+// custom metadata into a generic `Heartbeat` payload.
+type HeartbeatMetadataProvider interface {
+	// GetMetadata returns the metadata for new heartbeat instances.
+	GetMetadata() map[string]any
+}
+
+// ClusterInfoMetadataProvider is a `HeartbeatMetadataProvider` implementation that provides metadata about the cluster
+// leveraging a `ClusterInfoProvider` implementation.
+type ClusterInfoMetadataProvider struct {
+	clusterInfoProvider clusters.ClusterInfoProvider
+}
+
+// NewClusterInfoMetadataProvider creates a new `ClusterInfoMetadataProvider` instance. The `provider` parameter is used to
+// inject custom metadata, but can be set to `nil` if no metadata is needed.
+func NewClusterInfoMetadataProvider(provider clusters.ClusterInfoProvider) *ClusterInfoMetadataProvider {
+	return &ClusterInfoMetadataProvider{
+		clusterInfoProvider: provider,
+	}
+}
+
+// GetMetadata returns the metadata for new heartbeat instances. It uses the `ClusterInfoProvider` to get the cluster
+// information and injects it into the metadata map.
+func (c *ClusterInfoMetadataProvider) GetMetadata() map[string]any {
+	m := c.clusterInfoProvider.GetClusterInfo()
+	metadata := make(map[string]any, len(m))
+
+	for k, v := range m {
+		metadata[k] = v
+	}
+
+	return metadata
+}
+
+// HeartbeatSource is an `export.ExportSource` implementation that provides the basic data for a `Heartbeat` payload, and
+// leverages a `HeartbeatMetadataProvider` to inject custom metadata.
+type HeartbeatSource struct {
+	startTime        time.Time
+	metadataProvider HeartbeatMetadataProvider
+}
+
+// NewHeartbeatSource creates a new `HeartbeatSource` instance. The `provider` parameter is used to inject custom metadata,
+// but can be set to `nil` if no metadata is needed.
+func NewHeartbeatSource(provider HeartbeatMetadataProvider) *HeartbeatSource {
+	return &HeartbeatSource{
+		startTime:        time.Now().UTC(),
+		metadataProvider: provider,
+	}
+}
+
+// Make creates a new `Heartbeat` instance with the provided current time.
+func (h *HeartbeatSource) Make(t time.Time) *Heartbeat {
+	uid, err := uuid.NewV7()
+	if err != nil {
+		log.Warnf("failed to generate v7 UUID, replacing with UUID v4: %s", err)
+		uid = uuid.New()
+	}
+
+	id := uid.String()
+	uptime := uint64(t.Sub(h.startTime).Minutes())
+	v := version.FriendlyVersion()
+
+	var metadata map[string]any
+	if h.metadataProvider != nil {
+		metadata = h.metadataProvider.GetMetadata()
+	}
+
+	return NewHeartbeat(id, t, uptime, v, metadata)
+}
+
+func (h *HeartbeatSource) Name() string {
+	return HeartbeatEventName + "-source"
+}

+ 235 - 0
core/pkg/heartbeat/storage_test.go

@@ -0,0 +1,235 @@
+package heartbeat
+
+import (
+	"fmt"
+	"path/filepath"
+	"strings"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/storage"
+)
+
+type file struct {
+	name     string
+	contents []byte
+}
+
+func newFile(name string, contents []byte) *file {
+	return &file{
+		name:     name,
+		contents: contents,
+	}
+}
+
+type dir struct {
+	name  string
+	dirs  map[string]*dir
+	files map[string]*file
+}
+
+func newDir(name string) *dir {
+	return &dir{
+		name:  name,
+		dirs:  make(map[string]*dir),
+		files: make(map[string]*file),
+	}
+}
+
+func (d *dir) size() int64 {
+	var size int64
+	for _, f := range d.files {
+		size += int64(len(f.contents))
+	}
+	for _, subdir := range d.dirs {
+		size += subdir.size()
+	}
+	return size
+}
+
+func (d *dir) addFile(f *file) {
+	d.files[f.name] = f
+}
+
+func (d *dir) addDir(subdir *dir) {
+	d.dirs[subdir.name] = subdir
+}
+
+func (d *dir) deleteFile(name string) {
+	delete(d.files, name)
+}
+
+func (d *dir) deleteDir(name string) {
+	delete(d.dirs, name)
+}
+
+type MockStorage struct {
+	s map[string][]byte
+	d *dir
+}
+
+func NewMockStorage() *MockStorage {
+	return &MockStorage{
+		s: make(map[string][]byte),
+		d: newDir(""),
+	}
+}
+
+// StorageType returns a string identifier for the type of storage used by the implementation.
+func (ms *MockStorage) StorageType() storage.StorageType {
+	return storage.StorageType("mock")
+}
+
+// FullPath returns the storage working path combined with the path provided
+func (ms *MockStorage) FullPath(path string) string {
+	return path
+}
+
+// Stat returns the StorageStats for the specific path.
+func (ms *MockStorage) Stat(path string) (*storage.StorageInfo, error) {
+	path = filepath.Clean(path)
+	if data, ok := ms.s[path]; ok {
+		return &storage.StorageInfo{
+			Name:    path,
+			Size:    int64(len(data)),
+			ModTime: time.Now(),
+		}, nil
+	}
+
+	return nil, fmt.Errorf("no valid data found")
+}
+
+// Read uses the relative path of the storage combined with the provided path to
+// read the contents.
+func (ms *MockStorage) Read(path string) ([]byte, error) {
+	path = filepath.Clean(path)
+
+	if data, ok := ms.s[path]; ok {
+		return data, nil
+	}
+
+	return nil, fmt.Errorf("no valid data found")
+}
+
+// Write uses the relative path of the storage combined with the provided path
+// to write a new file or overwrite an existing file.
+func (ms *MockStorage) Write(path string, data []byte) error {
+	paths, pFile := pathsAndFile(path)
+
+	f := newFile(pFile, data)
+	currentDir := writeDir(ms.d, paths)
+
+	currentDir.addFile(f)
+	ms.s[path] = data
+	return nil
+}
+
+// Remove uses the relative path of the storage combined with the provided path to
+// remove a file from storage permanently.
+func (ms *MockStorage) Remove(path string) error {
+	paths, pFile := pathsAndFile(path)
+
+	currentDir, err := searchDir(ms.d, paths)
+	if err != nil {
+		return err
+	}
+
+	currentDir.deleteFile(pFile)
+
+	delete(ms.s, path)
+	return nil
+}
+
+// Exists uses the relative path of the storage combined with the provided path to
+// determine if the file exists.
+func (ms *MockStorage) Exists(path string) (bool, error) {
+	path = filepath.Clean(path)
+
+	_, ok := ms.s[path]
+	return ok, nil
+}
+
+// List uses the relative path of the storage combined with the provided path to return
+// storage information for the files.
+func (ms *MockStorage) List(path string) ([]*storage.StorageInfo, error) {
+	paths := toPaths(path)
+	currentDir, err := searchDir(ms.d, paths)
+	if err != nil {
+		return nil, err
+	}
+
+	storageInfos := make([]*storage.StorageInfo, 0, len(currentDir.files))
+	for _, f := range currentDir.files {
+		storageInfos = append(storageInfos, &storage.StorageInfo{
+			Name:    f.name,
+			Size:    int64(len(f.contents)),
+			ModTime: time.Now(),
+		})
+	}
+
+	return storageInfos, nil
+}
+
+// ListDirectories uses the relative path of the storage combined with the provided path
+// to return storage information for only directories contained along the path. This
+// functions as List, but returns storage information for only directories.
+func (ms *MockStorage) ListDirectories(path string) ([]*storage.StorageInfo, error) {
+	paths := toPaths(path)
+	currentDir, err := searchDir(ms.d, paths)
+	if err != nil {
+		return nil, err
+	}
+
+	storageInfos := make([]*storage.StorageInfo, 0, len(currentDir.files))
+	for _, d := range currentDir.dirs {
+		storageInfos = append(storageInfos, &storage.StorageInfo{
+			Name:    d.name,
+			Size:    d.size(),
+			ModTime: time.Now(),
+		})
+	}
+
+	return storageInfos, nil
+}
+
+func toPaths(path string) []string {
+	path = filepath.Clean(path)
+	if path[len(path)-1] == filepath.Separator {
+		path = path[:len(path)-1]
+	}
+	return strings.Split(path, string(filepath.Separator))
+}
+
+func pathsAndFile(path string) ([]string, string) {
+	path = filepath.Clean(path)
+	pDir, pFile := filepath.Split(path)
+	pDir = filepath.Dir(pDir)
+	return strings.Split(pDir, string(filepath.Separator)), pFile
+}
+
+func writeDir(d *dir, paths []string) *dir {
+	currentDir := d
+
+	for i := 0; i < len(paths); i++ {
+		dirName := paths[i]
+		if _, ok := currentDir.dirs[dirName]; !ok {
+			currentDir.addDir(newDir(dirName))
+		}
+		currentDir = currentDir.dirs[dirName]
+	}
+
+	return currentDir
+}
+
+func searchDir(d *dir, paths []string) (*dir, error) {
+	currentDir := d
+
+	for i := 0; i < len(paths); i++ {
+		dirName := paths[i]
+		if _, ok := currentDir.dirs[dirName]; !ok {
+			return nil, fmt.Errorf("directory %s not found", filepath.Join(paths[:i+1]...))
+		}
+		currentDir = currentDir.dirs[dirName]
+	}
+
+	return currentDir, nil
+}

+ 6 - 6
pkg/exporter/exporters.go

@@ -12,14 +12,14 @@ import (
 	"github.com/opencost/opencost/core/pkg/storage"
 )
 
-func NewAllocationStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.Exporter[opencost.AllocationSet] {
+func NewAllocationStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.ComputeExporter[opencost.AllocationSet] {
 	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.AllocationPipelineName, &resolution)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil
 	}
 
-	return export.NewStorageExporter(
+	return export.NewComputeStorageExporter(
 		pipelines.AllocationPipelineName,
 		resolution,
 		pathing,
@@ -29,14 +29,14 @@ func NewAllocationStorageExporter(clusterId string, resolution time.Duration, st
 	)
 }
 
-func NewAssetsStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.Exporter[opencost.AssetSet] {
+func NewAssetsStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.ComputeExporter[opencost.AssetSet] {
 	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.AssetsPipelineName, &resolution)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil
 	}
 
-	return export.NewStorageExporter(
+	return export.NewComputeStorageExporter(
 		pipelines.AssetsPipelineName,
 		resolution,
 		pathing,
@@ -46,14 +46,14 @@ func NewAssetsStorageExporter(clusterId string, resolution time.Duration, store
 	)
 }
 
-func NewNetworkInsightStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.Exporter[opencost.NetworkInsightSet] {
+func NewNetworkInsightStorageExporter(clusterId string, resolution time.Duration, store storage.Storage) export.ComputeExporter[opencost.NetworkInsightSet] {
 	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelines.NetworkInsightPipelineName, &resolution)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil
 	}
 
-	return export.NewStorageExporter(
+	return export.NewComputeStorageExporter(
 		pipelines.NetworkInsightPipelineName,
 		resolution,
 		pathing,