Ver Fonte

Add encoders and compute sources for allocations, assets, and network insights.

Matt Bolt há 1 ano atrás
pai
commit
fd59c51665

+ 11 - 5
core/pkg/exporter/controller.go

@@ -10,23 +10,29 @@ import (
 	"github.com/opencost/opencost/core/pkg/util/atomic"
 	"github.com/opencost/opencost/core/pkg/util/atomic"
 )
 )
 
 
+// ComputeExportController[T] is a controller type which leverages a `ComputeSource[T]` and `Exporter[T]`
+// to regularly compute the data for the current resolution and export it on a specific interval.
 type ComputeExportController[T any] struct {
 type ComputeExportController[T any] struct {
 	runState   atomic.AtomicRunState
 	runState   atomic.AtomicRunState
 	source     ComputeSource[T]
 	source     ComputeSource[T]
 	exporter   Exporter[T]
 	exporter   Exporter[T]
 	resolution time.Duration
 	resolution time.Duration
-	tName      string
+	typeName   string
 }
 }
 
 
+// NewComputeExportController creates a new `ComputeExportController[T]` instance.
 func NewComputeExportController[T any](source ComputeSource[T], exporter Exporter[T], resolution time.Duration) *ComputeExportController[T] {
 func NewComputeExportController[T any](source ComputeSource[T], exporter Exporter[T], resolution time.Duration) *ComputeExportController[T] {
 	return &ComputeExportController[T]{
 	return &ComputeExportController[T]{
 		source:     source,
 		source:     source,
 		resolution: resolution,
 		resolution: resolution,
 		exporter:   exporter,
 		exporter:   exporter,
-		tName:      reflect.TypeOf((*T)(nil)).Elem().String(),
+		typeName:   reflect.TypeOf((*T)(nil)).Elem().String(),
 	}
 	}
 }
 }
 
 
+// Start starts a background compute processing loop, which will compute the data for the current resolution and export it
+// on the provided interval. This function will return `true` if the loop was started successfully, and `false` if it was
+// already running.
 func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
 func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
 	// Before we attempt to start, we must ensure we are not in a stopping state
 	// Before we attempt to start, we must ensure we are not in a stopping state
 	cd.runState.WaitForReset()
 	cd.runState.WaitForReset()
@@ -56,9 +62,9 @@ func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
 			start := time.Now().UTC().Truncate(cd.resolution)
 			start := time.Now().UTC().Truncate(cd.resolution)
 			end := start.Add(cd.resolution)
 			end := start.Add(cd.resolution)
 
 
-			log.Debugf("[%s] Reporting for window: %s - %s", cd.tName, start.UTC(), end.UTC())
+			log.Debugf("[%s] Reporting for window: %s - %s", cd.typeName, start.UTC(), end.UTC())
 			if !cd.source.CanCompute(start, end) {
 			if !cd.source.CanCompute(start, end) {
-				log.Errorf("[%s] Cannot compute window: [Start: %s, End: %s]", cd.tName, start, end)
+				log.Errorf("[%s] Cannot compute window: [Start: %s, End: %s]", cd.typeName, start, end)
 				continue
 				continue
 			}
 			}
 
 
@@ -82,7 +88,7 @@ func (cd *ComputeExportController[T]) Start(interval time.Duration) bool {
 
 
 			err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
 			err = cd.exporter.Export(opencost.NewClosedWindow(start, end), set)
 			if err != nil {
 			if err != nil {
-				log.Warnf("[%s] Error during Write: %s", cd.tName, err)
+				log.Warnf("[%s] Error during Write: %s", cd.typeName, err)
 			}
 			}
 		}
 		}
 	}()
 	}()

+ 7 - 0
core/pkg/exporter/encoder.go

@@ -7,17 +7,24 @@ type Encoder[T any] interface {
 	Encode(*T) ([]byte, error)
 	Encode(*T) ([]byte, error)
 }
 }
 
 
+// BinaryMarshalerPtr[T] is a generic constraint to ensure types passed to the encoder implement
+// encoding.BinaryMarshaler and are pointers to T.
 type BinaryMarshalerPtr[T any] interface {
 type BinaryMarshalerPtr[T any] interface {
 	encoding.BinaryMarshaler
 	encoding.BinaryMarshaler
 	*T
 	*T
 }
 }
 
 
+// BingenEncoder[T, U] is a generic encoder that uses the BinaryMarshaler interface to encode data.
+// It supports any type T that implements the encoding.BinaryMarshaler interface.
 type BingenEncoder[T any, U BinaryMarshalerPtr[T]] struct{}
 type BingenEncoder[T any, U BinaryMarshalerPtr[T]] struct{}
 
 
+// NewBingenEncoder creates an `Encoder[T]` implementation which supports binary encoding for the `T`
+// type.
 func NewBingenEncoder[T any, U BinaryMarshalerPtr[T]]() Encoder[T] {
 func NewBingenEncoder[T any, U BinaryMarshalerPtr[T]]() Encoder[T] {
 	return new(BingenEncoder[T, U])
 	return new(BingenEncoder[T, U])
 }
 }
 
 
+// Encode encodes the provided data of type T into a byte slice using the BinaryMarshaler interface.
 func (b *BingenEncoder[T, U]) Encode(data *T) ([]byte, error) {
 func (b *BingenEncoder[T, U]) Encode(data *T) ([]byte, error) {
 	var bingenData U = data
 	var bingenData U = data
 	return bingenData.MarshalBinary()
 	return bingenData.MarshalBinary()

+ 6 - 0
core/pkg/exporter/exporter.go

@@ -27,6 +27,10 @@ type StorageExporter[T any] struct {
 	validator  validator.ExportValidator[T]
 	validator  validator.ExportValidator[T]
 }
 }
 
 
+// NewStorageExporter creates a new StorageExporter instance, which is responsible for exporting data for
+// a specific window to a storage backend. It uses a pathing strategy to determine the storage location,
+// an encoder to convert the data to binary format, and a validator to check the data before export.
+// The pipeline name and resolution are also provided to help identify the data being exported.
 func NewStorageExporter[T any](
 func NewStorageExporter[T any](
 	pipeline string,
 	pipeline string,
 	resolution time.Duration,
 	resolution time.Duration,
@@ -45,6 +49,8 @@ func NewStorageExporter[T any](
 	}
 	}
 }
 }
 
 
+// Export performs validation on the provided window and data, determines if it should overwrite existing data,
+// and stores the data in the location specified by the pathing formatter.
 func (se *StorageExporter[T]) Export(window opencost.Window, data *T) error {
 func (se *StorageExporter[T]) Export(window opencost.Window, data *T) error {
 	if se.validator != nil {
 	if se.validator != nil {
 		err := se.validator.Validate(window, data)
 		err := se.validator.Validate(window, data)

+ 1 - 1
pkg/costmodel/networkinsight.go

@@ -10,7 +10,7 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 	"github.com/opencost/opencost/pkg/env"
 )
 )
 
 
-func (cm *CostModel) Compute(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
+func (cm *CostModel) ComputeNetworkInsights(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
 	log.Debugf("Network Insight compute called on prometheus source for window  %s", opencost.NewClosedWindow(start, end).String())
 	log.Debugf("Network Insight compute called on prometheus source for window  %s", opencost.NewClosedWindow(start, end).String())
 
 
 	// If the duration is short enough, compute the network insight directly
 	// If the duration is short enough, compute the network insight directly

+ 9 - 1
pkg/exporter/allocationsource.go → pkg/exporter/allocation/source.go

@@ -1,8 +1,9 @@
-package source
+package allocation
 
 
 import (
 import (
 	"time"
 	"time"
 
 
+	"github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/pkg/costmodel"
 	"github.com/opencost/opencost/pkg/costmodel"
@@ -12,6 +13,13 @@ type AllocationComputeSource struct {
 	cm *costmodel.CostModel
 	cm *costmodel.CostModel
 }
 }
 
 
+// NewAllocationComputeSource creates an `exporter.ComputeSource[opencost.AssetSet]` implementation
+func NewAllocationComputeSource(cm *costmodel.CostModel) exporter.ComputeSource[opencost.AllocationSet] {
+	return &AllocationComputeSource{
+		cm: cm,
+	}
+}
+
 // CanCompute should return true iff the ComputeSource can effectively act as
 // CanCompute should return true iff the ComputeSource can effectively act as
 // a source of T data for the given time range. For example, a ComputeSource
 // a source of T data for the given time range. For example, a ComputeSource
 // with two-day coverage cannot fulfill a range from three days ago, and should
 // with two-day coverage cannot fulfill a range from three days ago, and should

+ 40 - 0
pkg/exporter/asset/source.go

@@ -0,0 +1,40 @@
+package asset
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+	"github.com/opencost/opencost/pkg/costmodel"
+)
+
+type AssetsComputeSource struct {
+	cm *costmodel.CostModel
+}
+
+// NewAssetsComputeSource creates an `exporter.ComputeSource[opencost.AssetSet]` implementation
+func NewAssetsComputeSource(cm *costmodel.CostModel) exporter.ComputeSource[opencost.AssetSet] {
+	return &AssetsComputeSource{
+		cm: cm,
+	}
+}
+
+// CanCompute should return true iff the ComputeSource can effectively act as
+// a source of T data for the given time range. For example, a ComputeSource
+// with two-day coverage cannot fulfill a range from three days ago, and should
+// not be left to return an error in Compute. Instead, it should report that is
+// cannot compute and allow another Source to handle the computation.
+func (acs *AssetsComputeSource) CanCompute(start, end time.Time) bool {
+	return true
+}
+
+// Compute should compute a single T for the given time range, optionally using the given resolution.
+func (acs *AssetsComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.AssetSet, error) {
+	return acs.cm.ComputeAssets(start, end)
+}
+
+// Name returns the name of the ComputeSource
+func (acs *AssetsComputeSource) Name() string {
+	return pipelines.AssetsPipelineName
+}

+ 24 - 0
pkg/exporter/encoder.go

@@ -0,0 +1,24 @@
+package exporter
+
+import (
+	export "github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/opencost"
+)
+
+// NewAllocationEncoder creates an `export.Encoder[opencost.AllocationSet]` implementation for
+// encoding AllocationSet data.
+func NewAllocationEncoder() export.Encoder[opencost.AllocationSet] {
+	return export.NewBingenEncoder[opencost.AllocationSet]()
+}
+
+// NewAssetsEncoder creates an `export.Encoder[opencost.AssetSet]` implementation for
+// encoding AssetSet data.
+func NewAssetsEncoder() export.Encoder[opencost.AssetSet] {
+	return export.NewBingenEncoder[opencost.AssetSet]()
+}
+
+// NewNetworkInsightEncoder creates an `export.Encoder[opencost.NetworkInsightSet]` implementation for
+// encoding NetworkInsightSet data.
+func NewNetworkInsightEncoder() export.Encoder[opencost.NetworkInsightSet] {
+	return export.NewBingenEncoder[opencost.NetworkInsightSet]()
+}

+ 40 - 0
pkg/exporter/networkinsight/source.go

@@ -0,0 +1,40 @@
+package networkinsight
+
+import (
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/exporter"
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
+	"github.com/opencost/opencost/pkg/costmodel"
+)
+
+type NetworkInsightsComputeSource struct {
+	cm *costmodel.CostModel
+}
+
+// NewNetworkInsightsComputeSource creates an `exporter.ComputeSource[opencost.NetworkInsightSet]` implementation
+func NewNetworkInsightsComputeSource(cm *costmodel.CostModel) exporter.ComputeSource[opencost.NetworkInsightSet] {
+	return &NetworkInsightsComputeSource{
+		cm: cm,
+	}
+}
+
+// CanCompute should return true iff the ComputeSource can effectively act as
+// a source of T data for the given time range. For example, a ComputeSource
+// with two-day coverage cannot fulfill a range from three days ago, and should
+// not be left to return an error in Compute. Instead, it should report that is
+// cannot compute and allow another Source to handle the computation.
+func (acs *NetworkInsightsComputeSource) CanCompute(start, end time.Time) bool {
+	return true
+}
+
+// Compute should compute a single T for the given time range, optionally using the given resolution.
+func (acs *NetworkInsightsComputeSource) Compute(start, end time.Time, resolution time.Duration) (*opencost.NetworkInsightSet, error) {
+	return acs.cm.ComputeNetworkInsights(start, end, resolution)
+}
+
+// Name returns the name of the ComputeSource
+func (acs *NetworkInsightsComputeSource) Name() string {
+	return pipelines.NetworkInsightPipelineName
+}