|
@@ -8,6 +8,7 @@ import (
|
|
|
export "github.com/opencost/opencost/core/pkg/exporter"
|
|
export "github.com/opencost/opencost/core/pkg/exporter"
|
|
|
"github.com/opencost/opencost/core/pkg/exporter/pathing"
|
|
"github.com/opencost/opencost/core/pkg/exporter/pathing"
|
|
|
"github.com/opencost/opencost/core/pkg/exporter/validator"
|
|
"github.com/opencost/opencost/core/pkg/exporter/validator"
|
|
|
|
|
+ "github.com/opencost/opencost/core/pkg/opencost"
|
|
|
"github.com/opencost/opencost/core/pkg/pipelines"
|
|
"github.com/opencost/opencost/core/pkg/pipelines"
|
|
|
"github.com/opencost/opencost/core/pkg/storage"
|
|
"github.com/opencost/opencost/core/pkg/storage"
|
|
|
"github.com/opencost/opencost/core/pkg/util/timeutil"
|
|
"github.com/opencost/opencost/core/pkg/util/timeutil"
|
|
@@ -36,60 +37,34 @@ const (
|
|
|
ExportCompressionLevelDefault ExportCompressionLevel = gzip.DefaultCompression
|
|
ExportCompressionLevelDefault ExportCompressionLevel = gzip.DefaultCompression
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-// NewComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
|
|
|
|
|
-// by window for a specific pipeline.
|
|
|
|
|
-func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
- clusterName string,
|
|
|
|
|
- resolution time.Duration,
|
|
|
|
|
- store storage.Storage,
|
|
|
|
|
-) (export.ComputeExporter[T], error) {
|
|
|
|
|
- pipelineName := pipelines.NameFor[T]()
|
|
|
|
|
- if pipelineName == "" {
|
|
|
|
|
- return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- pathing, err := pathing.NewDefaultStoragePathFormatter(clusterName, pipelineName, &resolution)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, fmt.Errorf("failed to create path formatter: %w", err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- var encoder export.Encoder[T]
|
|
|
|
|
- encoder = export.NewBingenEncoder[T, U]()
|
|
|
|
|
-
|
|
|
|
|
- return export.NewComputeStorageExporter(
|
|
|
|
|
- pathing,
|
|
|
|
|
- encoder,
|
|
|
|
|
- store,
|
|
|
|
|
- validator.NewSetValidator[T, S](resolution),
|
|
|
|
|
- false,
|
|
|
|
|
- ), nil
|
|
|
|
|
|
|
+type ComputeExporterConfig struct {
|
|
|
|
|
+ AppName string
|
|
|
|
|
+ ClusterUID string
|
|
|
|
|
+ ClusterName string
|
|
|
|
|
+ Resolution time.Duration
|
|
|
|
|
+ Streaming bool
|
|
|
|
|
+ Compression ExportCompressionLevel
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// NewStreamingComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
|
|
|
|
|
|
|
+// NewComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
|
|
|
// by window for a specific pipeline.
|
|
// by window for a specific pipeline.
|
|
|
-func NewStreamingComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
- clusterId string,
|
|
|
|
|
- resolution time.Duration,
|
|
|
|
|
|
|
+func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
+ config ComputeExporterConfig,
|
|
|
store storage.Storage,
|
|
store storage.Storage,
|
|
|
- compressionLevel ExportCompressionLevel,
|
|
|
|
|
) (export.ComputeExporter[T], error) {
|
|
) (export.ComputeExporter[T], error) {
|
|
|
- pipelineName := pipelines.NameFor[T]()
|
|
|
|
|
- if pipelineName == "" {
|
|
|
|
|
- return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
|
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- pathing, err := pathing.NewDefaultStoragePathFormatter(clusterId, pipelineName, &resolution)
|
|
|
|
|
|
|
+ pathing, err := GetExporterPathing[T, U, S](config)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("failed to create path formatter: %w", err)
|
|
return nil, fmt.Errorf("failed to create path formatter: %w", err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if !compressionLevel.IsValid() {
|
|
|
|
|
- return nil, fmt.Errorf("invalid compression level passed: %d is not a valid compression level", int(compressionLevel))
|
|
|
|
|
|
|
+ if !config.Compression.IsValid() {
|
|
|
|
|
+ return nil, fmt.Errorf("invalid compression level passed: %d is not a valid compression level", int(config.Compression))
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var encoder export.Encoder[T]
|
|
var encoder export.Encoder[T]
|
|
|
- if compressionLevel != ExportCompressionLevelNone {
|
|
|
|
|
- encoder = export.NewGZipEncoderWithLevel(export.NewBingenEncoder[T, U](), int(compressionLevel))
|
|
|
|
|
|
|
+ if config.Streaming && config.Compression != ExportCompressionLevelNone {
|
|
|
|
|
+ encoder = export.NewGZipEncoderWithLevel(export.NewBingenEncoder[T, U](), int(config.Compression))
|
|
|
} else {
|
|
} else {
|
|
|
encoder = export.NewBingenEncoder[T, U]()
|
|
encoder = export.NewBingenEncoder[T, U]()
|
|
|
}
|
|
}
|
|
@@ -98,144 +73,48 @@ func NewStreamingComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T],
|
|
|
pathing,
|
|
pathing,
|
|
|
encoder,
|
|
encoder,
|
|
|
store,
|
|
store,
|
|
|
- validator.NewSetValidator[T, S](resolution),
|
|
|
|
|
- true,
|
|
|
|
|
|
|
+ validator.NewSetValidator[T, S](config.Resolution),
|
|
|
|
|
+ config.Streaming,
|
|
|
), nil
|
|
), nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// NewComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
|
|
// NewComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
|
|
|
// using the provided source, storage, resolution, and source resolution.
|
|
// using the provided source, storage, resolution, and source resolution.
|
|
|
func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
- clusterName string,
|
|
|
|
|
- store storage.Storage,
|
|
|
|
|
- source export.ComputeSource[T],
|
|
|
|
|
- resolution time.Duration,
|
|
|
|
|
-) (*export.ComputeExportController[T], error) {
|
|
|
|
|
- exporter, err := NewComputePipelineExporter[T, U, S](clusterName, resolution, store)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, fmt.Errorf("failed to create compute exporter: %w", err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return export.NewComputeExportController(source, exporter, resolution), nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// NewStreamingComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to stream/export the
|
|
|
|
|
-// computed data using the provided source, storage, resolution, and source resolution.
|
|
|
|
|
-func NewStreamingComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
- clusterId string,
|
|
|
|
|
|
|
+ config ComputeExporterConfig,
|
|
|
store storage.Storage,
|
|
store storage.Storage,
|
|
|
source export.ComputeSource[T],
|
|
source export.ComputeSource[T],
|
|
|
- resolution time.Duration,
|
|
|
|
|
- compressionLevel ExportCompressionLevel,
|
|
|
|
|
) (*export.ComputeExportController[T], error) {
|
|
) (*export.ComputeExportController[T], error) {
|
|
|
- exporter, err := NewStreamingComputePipelineExporter[T, U, S](clusterId, resolution, store, compressionLevel)
|
|
|
|
|
|
|
+ exporter, err := NewComputePipelineExporter[T, U, S](config, store)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("failed to create compute exporter: %w", err)
|
|
return nil, fmt.Errorf("failed to create compute exporter: %w", err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return export.NewComputeExportController(source, exporter, resolution), nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// NewKubeModelComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
|
|
|
|
|
-// by window for a specific pipeline.
|
|
|
|
|
-func NewKubeModelComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
- appName string,
|
|
|
|
|
- clusterId string,
|
|
|
|
|
- resolution time.Duration,
|
|
|
|
|
- store storage.Storage,
|
|
|
|
|
-) (export.ComputeExporter[T], error) {
|
|
|
|
|
- pipelineName := pipelines.NameFor[T]()
|
|
|
|
|
- if pipelineName == "" {
|
|
|
|
|
- return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
|
|
|
|
|
- }
|
|
|
|
|
- res := timeutil.FormatStoreResolution(resolution)
|
|
|
|
|
- pathing, err := pathing.NewKubeModelStoragePathFormatter(appName, clusterId, res)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, fmt.Errorf("failed to create path formatter: %w", err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- encoder := export.NewBingenFileEncoder[T, U]()
|
|
|
|
|
-
|
|
|
|
|
- return export.NewComputeStorageExporter(
|
|
|
|
|
- pathing,
|
|
|
|
|
- encoder,
|
|
|
|
|
- store,
|
|
|
|
|
- validator.NewSetValidator[T, S](resolution),
|
|
|
|
|
- false,
|
|
|
|
|
- ), nil
|
|
|
|
|
|
|
+ return export.NewComputeExportController(source, exporter, config.Resolution), nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// NewStreamingComputePipelineExporter creates a new `ComputeExporter[T]` instance which is used to export computed data
|
|
|
|
|
-// by window for a specific pipeline.
|
|
|
|
|
-func NewStreamingKubeModelComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
- appName string,
|
|
|
|
|
- clusterId string,
|
|
|
|
|
- resolution time.Duration,
|
|
|
|
|
- store storage.Storage,
|
|
|
|
|
- compressionLevel ExportCompressionLevel,
|
|
|
|
|
-) (export.ComputeExporter[T], error) {
|
|
|
|
|
|
|
+func GetExporterPathing[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
+ config ComputeExporterConfig,
|
|
|
|
|
+) (pathing.StoragePathFormatter[opencost.Window], error) {
|
|
|
pipelineName := pipelines.NameFor[T]()
|
|
pipelineName := pipelines.NameFor[T]()
|
|
|
if pipelineName == "" {
|
|
if pipelineName == "" {
|
|
|
return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
|
|
return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
|
|
|
}
|
|
}
|
|
|
|
|
+ res := timeutil.FormatStoreResolution(config.Resolution)
|
|
|
|
|
|
|
|
- res := timeutil.FormatStoreResolution(resolution)
|
|
|
|
|
- pathing, err := pathing.NewKubeModelStoragePathFormatter(appName, clusterId, res)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, fmt.Errorf("failed to create path formatter: %w", err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if !compressionLevel.IsValid() {
|
|
|
|
|
- return nil, fmt.Errorf("invalid compression level passed: %d is not a valid compression level", int(compressionLevel))
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- var encoder export.Encoder[T]
|
|
|
|
|
- if compressionLevel != ExportCompressionLevelNone {
|
|
|
|
|
- encoder = export.NewGZipEncoderWithLevel(export.NewBingenEncoder[T, U](), int(compressionLevel))
|
|
|
|
|
- } else {
|
|
|
|
|
- encoder = export.NewBingenEncoder[T, U]()
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return export.NewComputeStorageExporter(
|
|
|
|
|
- pathing,
|
|
|
|
|
- encoder,
|
|
|
|
|
- store,
|
|
|
|
|
- validator.NewSetValidator[T, S](resolution),
|
|
|
|
|
- true,
|
|
|
|
|
- ), nil
|
|
|
|
|
-}
|
|
|
|
|
|
|
+ var pathFormatter pathing.StoragePathFormatter[opencost.Window]
|
|
|
|
|
+ var err error
|
|
|
|
|
|
|
|
-// NewKubeModelComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to export computed data
|
|
|
|
|
-// using the provided source, storage, resolution, and source resolution.
|
|
|
|
|
-func NewKubeModelComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
- appName string,
|
|
|
|
|
- clusterId string,
|
|
|
|
|
- store storage.Storage,
|
|
|
|
|
- source export.ComputeSource[T],
|
|
|
|
|
- resolution time.Duration,
|
|
|
|
|
-) (*export.ComputeExportController[T], error) {
|
|
|
|
|
- exporter, err := NewKubeModelComputePipelineExporter[T, U, S](appName, clusterId, resolution, store)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return nil, fmt.Errorf("failed to create compute exporter: %w", err)
|
|
|
|
|
|
|
+ switch pipelineName {
|
|
|
|
|
+ case pipelines.KubeModelPipelineName:
|
|
|
|
|
+ pathFormatter, err = pathing.NewKubeModelStoragePathFormatter(config.AppName, config.ClusterUID, res)
|
|
|
|
|
+ default:
|
|
|
|
|
+ // Use ClusterName for "clusterId" here to maintain legacy pattern
|
|
|
|
|
+ pathFormatter, err = pathing.NewDefaultStoragePathFormatter(config.ClusterName, pipelineName, &config.Resolution)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- return export.NewComputeExportController(source, exporter, resolution), nil
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-// NewStreamingComputePipelineExportController creates a new `ComputeExportController[T]` instance which is used to stream/export the
|
|
|
|
|
-// computed data using the provided source, storage, resolution, and source resolution.
|
|
|
|
|
-func NewStreamingKubeModelComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
|
|
|
|
|
- appName string,
|
|
|
|
|
- clusterId string,
|
|
|
|
|
- store storage.Storage,
|
|
|
|
|
- source export.ComputeSource[T],
|
|
|
|
|
- resolution time.Duration,
|
|
|
|
|
- compressionLevel ExportCompressionLevel,
|
|
|
|
|
-) (*export.ComputeExportController[T], error) {
|
|
|
|
|
- exporter, err := NewStreamingKubeModelComputePipelineExporter[T, U, S](appName, clusterId, resolution, store, compressionLevel)
|
|
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("failed to create compute exporter: %w", err)
|
|
|
|
|
|
|
+ return nil, fmt.Errorf("failed to create path formatter[%s]: %w", pipelineName, err)
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- return export.NewComputeExportController(source, exporter, resolution), nil
|
|
|
|
|
|
|
+ return pathFormatter, nil
|
|
|
}
|
|
}
|