Ver código fonte

Remove federated as built-in path

Matt Bolt 1 ano atrás
pai
commit
a8a2745094

+ 6 - 1
core/pkg/diagnostics/exporter/controller.go

@@ -7,7 +7,12 @@ import (
 )
 
 // NewDiagnosticsExportController creates a new EventExportController for DiagnosticsRunReport events.
-func NewDiagnosticsExportController(clusterId string, applicationName string, store storage.Storage, service diagnostics.DiagnosticService) *exporter.EventExportController[diagnostics.DiagnosticsRunReport] {
+func NewDiagnosticsExportController(
+	clusterId string,
+	applicationName string,
+	store storage.Storage,
+	service diagnostics.DiagnosticService,
+) *exporter.EventExportController[diagnostics.DiagnosticsRunReport] {
 	return exporter.NewEventExportController(
 		NewDiagnosticSource(service),
 		NewDiagnosticExporter(clusterId, applicationName, store),

+ 1 - 1
core/pkg/diagnostics/exporter/exporter.go

@@ -10,7 +10,7 @@ import (
 
 // NewDiagnosticExporter creates a new `StorageExporter[DiagnosticsRunReport]` instance for exporting diagnostic run events.
 func NewDiagnosticExporter(clusterId string, applicationName string, storage storage.Storage) exporter.EventExporter[diagnostics.DiagnosticsRunReport] {
-	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, diagnostics.DiagnosticsEventName, applicationName)
+	pathing, err := pathing.NewEventStoragePathFormatter("federated", clusterId, diagnostics.DiagnosticsEventName, applicationName)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil

+ 2 - 2
core/pkg/diagnostics/exporter/source.go

@@ -12,8 +12,8 @@ type DiagnosticSource struct {
 	diagnosticService diagnostics.DiagnosticService
 }
 
-// NewDiagnosticSource creates a new `HeartbeatSource` instance. The `provider` parameter is used to inject custom metadata,
-// but can be set to `nil` if no metadata is needed.
+// NewDiagnosticSource creates a new `DiagnosticSource` instance. It accepts the `DiagnosticService` implementation
+// that will be used to retrieve the diagnostic results.
 func NewDiagnosticSource(diagnosticService diagnostics.DiagnosticService) *DiagnosticSource {
 	return &DiagnosticSource{
 		diagnosticService: diagnosticService,

+ 2 - 2
core/pkg/exporter/exporter_test.go

@@ -24,7 +24,7 @@ type TestData struct {
 func TestStorageExporters(t *testing.T) {
 	t.Run("test event storage exporter", func(t *testing.T) {
 		store := storage.NewMemoryStorage()
-		p, err := pathing.NewEventStoragePathFormatter("", TestClusterId, TestEventName)
+		p, err := pathing.NewEventStoragePathFormatter("federated", TestClusterId, TestEventName)
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -65,7 +65,7 @@ func TestStorageExporters(t *testing.T) {
 	t.Run("test compute storage exporter", func(t *testing.T) {
 		res := 24 * time.Hour
 		store := storage.NewMemoryStorage()
-		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, &res)
+		p, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.AllocationPipelineName, &res)
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}

+ 0 - 2
core/pkg/exporter/pathing/bingenpath.go

@@ -11,7 +11,6 @@ import (
 )
 
 const (
-	federatedDir   string = "federated"
 	baseStorageDir string = "etl/bingen"
 )
 
@@ -64,7 +63,6 @@ func (bsf *BingenStoragePathFormatter) ToFullPath(prefix string, window opencost
 
 	return path.Join(
 		bsf.rootDir,
-		federatedDir,
 		bsf.clusterId,
 		baseStorageDir,
 		bsf.pipeline,

+ 0 - 1
core/pkg/exporter/pathing/eventpath.go

@@ -61,7 +61,6 @@ func (espf *EventStoragePathFormatter) ToFullPath(prefix string, timestamp time.
 
 	return path.Join(
 		espf.rootDir,
-		federatedDir,
 		espf.clusterId,
 		espf.event,
 		path.Join(espf.subPaths...),

+ 14 - 14
core/pkg/exporter/pathing/path_test.go

@@ -21,7 +21,7 @@ func TestBingenPathFormatter(t *testing.T) {
 	testCases := []testCase{
 		{
 			name:       "no resolution",
-			rootPath:   "",
+			rootPath:   "federated",
 			clusterID:  "cluster-a",
 			pipeline:   "allocation",
 			resolution: nil,
@@ -30,7 +30,7 @@ func TestBingenPathFormatter(t *testing.T) {
 		},
 		{
 			name:       "with resolution",
-			rootPath:   "",
+			rootPath:   "federated",
 			clusterID:  "cluster-a",
 			pipeline:   "allocation",
 			resolution: &[]time.Duration{1 * time.Hour}[0],
@@ -39,7 +39,7 @@ func TestBingenPathFormatter(t *testing.T) {
 		},
 		{
 			name:       "no resolution with prefix",
-			rootPath:   "",
+			rootPath:   "federated",
 			clusterID:  "cluster-a",
 			pipeline:   "allocation",
 			resolution: nil,
@@ -48,7 +48,7 @@ func TestBingenPathFormatter(t *testing.T) {
 		},
 		{
 			name:       "with resolution with prefix",
-			rootPath:   "",
+			rootPath:   "federated",
 			clusterID:  "cluster-a",
 			pipeline:   "allocation",
 			resolution: &[]time.Duration{1 * time.Hour}[0],
@@ -57,7 +57,7 @@ func TestBingenPathFormatter(t *testing.T) {
 		},
 		{
 			name:       "daily resolution",
-			rootPath:   "",
+			rootPath:   "federated",
 			clusterID:  "cluster-a",
 			pipeline:   "allocation",
 			resolution: &[]time.Duration{24 * time.Hour}[0],
@@ -66,7 +66,7 @@ func TestBingenPathFormatter(t *testing.T) {
 		},
 		{
 			name:       "weekly resolution",
-			rootPath:   "",
+			rootPath:   "federated",
 			clusterID:  "cluster-a",
 			pipeline:   "allocation",
 			resolution: &[]time.Duration{7 * 24 * time.Hour}[0],
@@ -111,7 +111,7 @@ func TestEventPathFormatter(t *testing.T) {
 	testCases := []testCase{
 		{
 			name:      "with root path with file extension",
-			rootPath:  "/tmp",
+			rootPath:  "/tmp/federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{},
@@ -121,7 +121,7 @@ func TestEventPathFormatter(t *testing.T) {
 		},
 		{
 			name:      "with file extension",
-			rootPath:  "",
+			rootPath:  "federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{},
@@ -131,7 +131,7 @@ func TestEventPathFormatter(t *testing.T) {
 		},
 		{
 			name:      "with root path with file extension with sub-paths",
-			rootPath:  "/tmp",
+			rootPath:  "/tmp/federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{"foo", "bar"},
@@ -141,7 +141,7 @@ func TestEventPathFormatter(t *testing.T) {
 		},
 		{
 			name:      "without file extension",
-			rootPath:  "",
+			rootPath:  "federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{},
@@ -151,7 +151,7 @@ func TestEventPathFormatter(t *testing.T) {
 		},
 		{
 			name:      "with prefix with file extension",
-			rootPath:  "",
+			rootPath:  "federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{},
@@ -161,7 +161,7 @@ func TestEventPathFormatter(t *testing.T) {
 		},
 		{
 			name:      "with prefix with file extension with sub-paths",
-			rootPath:  "",
+			rootPath:  "federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{"foo", "bar", "baz"},
@@ -171,7 +171,7 @@ func TestEventPathFormatter(t *testing.T) {
 		},
 		{
 			name:      "with prefix without file extension",
-			rootPath:  "",
+			rootPath:  "federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{},
@@ -181,7 +181,7 @@ func TestEventPathFormatter(t *testing.T) {
 		},
 		{
 			name:      "with prefix without file extension with sub-paths",
-			rootPath:  "",
+			rootPath:  "federated",
 			clusterID: "cluster-a",
 			event:     "heartbeat",
 			subPaths:  []string{"foo"},

+ 1 - 1
core/pkg/heartbeat/exporter/exporter.go

@@ -10,7 +10,7 @@ import (
 
 // NewHeartbeatExporter creates a new `StorageExporter[Heartbeat]` instance for exporting Heartbeat events.
 func NewHeartbeatExporter(clusterId string, applicationName string, storage storage.Storage) exporter.EventExporter[heartbeat.Heartbeat] {
-	pathing, err := pathing.NewEventStoragePathFormatter("", clusterId, heartbeat.HeartbeatEventName, applicationName)
+	pathing, err := pathing.NewEventStoragePathFormatter("federated", clusterId, heartbeat.HeartbeatEventName, applicationName)
 	if err != nil {
 		log.Errorf("failed to create pathing formatter: %v", err)
 		return nil

+ 9 - 9
core/pkg/opencost/exporter/exporter_test.go

@@ -144,7 +144,7 @@ func TestExporters(t *testing.T) {
 	t.Run("allocation exporter", func(t *testing.T) {
 		allocSource := NewMockAllocationSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		p, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -173,7 +173,7 @@ func TestExporters(t *testing.T) {
 	t.Run("asset exporter", func(t *testing.T) {
 		assetSource := NewMockAssetSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		p, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -202,7 +202,7 @@ func TestExporters(t *testing.T) {
 	t.Run("network insight exporter", func(t *testing.T) {
 		netInsightSource := NewMockNetworkInsightSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		p, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -264,15 +264,15 @@ func TestPipelineExportControllers(t *testing.T) {
 		time.Sleep(time.Second + (750 * time.Millisecond))
 		exportControllers.Stop()
 
-		allocPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		allocPath, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create allocations path formatter: %v", err)
 		}
-		assetPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		assetPath, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create assets path formatter: %v", err)
 		}
-		netPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		netPath, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create net insights path formatter: %v", err)
 		}
@@ -300,15 +300,15 @@ func TestPipelineExportControllers(t *testing.T) {
 		time.Sleep(time.Second + (750 * time.Millisecond))
 		exportControllers.Stop()
 
-		allocPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
+		allocPath, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create allocations path formatter: %v", err)
 		}
-		assetPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
+		assetPath, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create assets path formatter: %v", err)
 		}
-		netPath, err := pathing.NewBingenStoragePathFormatter("", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		netPath, err := pathing.NewBingenStoragePathFormatter("federated", TestClusterId, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create net insights path formatter: %v", err)
 		}

+ 1 - 1
core/pkg/opencost/exporter/exporters.go

@@ -24,7 +24,7 @@ func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validat
 		return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
 	}
 
-	pathing, err := pathing.NewBingenStoragePathFormatter("", clusterId, pipelineName, &resolution)
+	pathing, err := pathing.NewBingenStoragePathFormatter("federated", clusterId, pipelineName, &resolution)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create path formatter: %w", err)
 	}