Просмотр исходного кода

Switch up pathing init

Signed-off-by: Sean Holcomb <seanholcomb@gmail.com>
Sean Holcomb 6 дней назад
Родитель
Сommit
1bdd3bf65c

+ 8 - 6
core/pkg/exporter/exporter_test.go

@@ -15,8 +15,10 @@ import (
 )
 
 const (
-	TestClusterId = "test-cluster"
-	TestEventName = "test-event-path"
+	TestAppName     = "test-app"
+	TestClusterID   = "test-cluster-id"
+	TestClusterName = "test-cluster"
+	TestEventName   = "test-event-path"
 )
 
 type TestData struct {
@@ -26,7 +28,7 @@ type TestData struct {
 func TestStorageExporters(t *testing.T) {
 	t.Run("test event storage exporter", func(t *testing.T) {
 		store := storage.NewMemoryStorage()
-		p, err := pathing.NewEventStoragePathFormatter("root", TestClusterId, TestEventName)
+		p, err := pathing.NewEventStoragePathFormatter("root", TestClusterName, TestEventName)
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -67,7 +69,7 @@ func TestStorageExporters(t *testing.T) {
 	t.Run("test compute storage exporter", func(t *testing.T) {
 		res := 24 * time.Hour
 		store := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
+		p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, &res)
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -115,7 +117,7 @@ func TestStorageExporters(t *testing.T) {
 	t.Run("test streaming compute storage exporter", func(t *testing.T) {
 		res := 24 * time.Hour
 		store := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
+		p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, &res)
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -163,7 +165,7 @@ func TestStorageExporters(t *testing.T) {
 	t.Run("test compressed streaming compute storage exporter", func(t *testing.T) {
 		res := 24 * time.Hour
 		store := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterId, pipelines.AllocationPipelineName, &res)
+		p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, &res)
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}

+ 9 - 2
core/pkg/exporter/pathing/bingenpath.go

@@ -7,6 +7,7 @@ import (
 
 	"github.com/opencost/opencost/core/pkg/exporter/pathing/pathutils"
 	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
 
@@ -26,13 +27,19 @@ type BingenStoragePathFormatter struct {
 	resolution string
 }
 
-func NewDefaultStoragePathFormatter(clusterId, pipeline string, resolution *time.Duration) (StoragePathFormatter[opencost.Window], error) {
+func NewDefaultStoragePathFormatter(appName, clusterID, clusterName, pipeline string, resolution *time.Duration) (StoragePathFormatter[opencost.Window], error) {
 	res := "."
 	if resolution != nil {
 		res = timeutil.FormatStoreResolution(*resolution)
 	}
 
-	return NewBingenStoragePathFormatter(DefaultRootDir, clusterId, pipeline, res)
+	// KubeModel uses a distinct pathing pattern which breaks with the original
+	// Allocations and Assets bingen pathing.
+	if pipeline == pipelines.KubeModelPipelineName {
+		return NewKubeModelStoragePathFormatter(appName, clusterID, res)
+	}
+
+	return NewBingenStoragePathFormatter(DefaultRootDir, clusterName, pipeline, res)
 }
 
 // NewBingenStoragePathFormatter creates a StoragePathFormatter for a cluster separated storage path

+ 58 - 44
core/pkg/exporter/pathing/path_test.go

@@ -9,70 +9,84 @@ import (
 	"github.com/stretchr/testify/require"
 )
 
-func TestBingenPathFormatter(t *testing.T) {
+func TestDefaultPathFormatter(t *testing.T) {
 	type testCase struct {
-		name       string
-		clusterID  string
-		pipeline   string
-		resolution *time.Duration
-		prefix     string
-		expected   string
+		name        string
+		appName     string
+		clusterID   string
+		clusterName string
+		pipeline    string
+		resolution  *time.Duration
+		prefix      string
+		expected    string
 	}
 
 	testCases := []testCase{
 		{
-			name:       "no resolution",
-			clusterID:  "cluster-a",
-			pipeline:   "allocation",
-			resolution: nil,
-			prefix:     "",
-			expected:   fmt.Sprintf("%s/cluster-a/%s/allocation/1704110400-1704114000", DefaultRootDir, BaseStorageDir),
+			name:        "no resolution",
+			appName:     "test-app",
+			clusterID:   "clusterID-a",
+			clusterName: "cluster-a",
+			pipeline:    "allocation",
+			resolution:  nil,
+			prefix:      "",
+			expected:    fmt.Sprintf("%s/cluster-a/%s/allocation/1704110400-1704114000", DefaultRootDir, BaseStorageDir),
 		},
 		{
-			name:       "with resolution",
-			clusterID:  "cluster-a",
-			pipeline:   "allocation",
-			resolution: &[]time.Duration{1 * time.Hour}[0],
-			prefix:     "",
-			expected:   fmt.Sprintf("%s/cluster-a/%s/allocation/1h/1704110400-1704114000", DefaultRootDir, BaseStorageDir),
+			name:        "with resolution",
+			appName:     "test-app",
+			clusterID:   "clusterID-a",
+			clusterName: "cluster-a",
+			pipeline:    "allocation",
+			resolution:  &[]time.Duration{1 * time.Hour}[0],
+			prefix:      "",
+			expected:    fmt.Sprintf("%s/cluster-a/%s/allocation/1h/1704110400-1704114000", DefaultRootDir, BaseStorageDir),
 		},
 		{
-			name:       "no resolution with prefix",
-			clusterID:  "cluster-a",
-			pipeline:   "allocation",
-			resolution: nil,
-			prefix:     "test",
-			expected:   fmt.Sprintf("%s/cluster-a/%s/allocation/test.1704110400-1704114000", DefaultRootDir, BaseStorageDir),
+			name:        "no resolution with prefix",
+			appName:     "test-app",
+			clusterID:   "clusterID-a",
+			clusterName: "cluster-a",
+			pipeline:    "allocation",
+			resolution:  nil,
+			prefix:      "test",
+			expected:    fmt.Sprintf("%s/cluster-a/%s/allocation/test.1704110400-1704114000", DefaultRootDir, BaseStorageDir),
 		},
 		{
-			name:       "with resolution with prefix",
-			clusterID:  "cluster-a",
-			pipeline:   "allocation",
-			resolution: &[]time.Duration{1 * time.Hour}[0],
-			prefix:     "test",
-			expected:   fmt.Sprintf("%s/cluster-a/%s/allocation/1h/test.1704110400-1704114000", DefaultRootDir, BaseStorageDir),
+			name:        "with resolution with prefix",
+			appName:     "test-app",
+			clusterID:   "clusterID-a",
+			clusterName: "cluster-a",
+			pipeline:    "allocation",
+			resolution:  &[]time.Duration{1 * time.Hour}[0],
+			prefix:      "test",
+			expected:    fmt.Sprintf("%s/cluster-a/%s/allocation/1h/test.1704110400-1704114000", DefaultRootDir, BaseStorageDir),
 		},
 		{
-			name:       "daily resolution",
-			clusterID:  "cluster-a",
-			pipeline:   "allocation",
-			resolution: &[]time.Duration{24 * time.Hour}[0],
-			prefix:     "",
-			expected:   fmt.Sprintf("%s/cluster-a/%s/allocation/1d/1704110400-1704196800", DefaultRootDir, BaseStorageDir),
+			name:        "daily resolution",
+			appName:     "test-app",
+			clusterID:   "clusterID-a",
+			clusterName: "cluster-a",
+			pipeline:    "allocation",
+			resolution:  &[]time.Duration{24 * time.Hour}[0],
+			prefix:      "",
+			expected:    fmt.Sprintf("%s/cluster-a/%s/allocation/1d/1704110400-1704196800", DefaultRootDir, BaseStorageDir),
 		},
 		{
-			name:       "weekly resolution",
-			clusterID:  "cluster-a",
-			pipeline:   "allocation",
-			resolution: &[]time.Duration{7 * 24 * time.Hour}[0],
-			prefix:     "",
-			expected:   fmt.Sprintf("%s/cluster-a/%s/allocation/1w/1704110400-1704715200", DefaultRootDir, BaseStorageDir),
+			name:        "weekly resolution",
+			appName:     "test-app",
+			clusterID:   "clusterID-a",
+			clusterName: "cluster-a",
+			pipeline:    "allocation",
+			resolution:  &[]time.Duration{7 * 24 * time.Hour}[0],
+			prefix:      "",
+			expected:    fmt.Sprintf("%s/cluster-a/%s/allocation/1w/1704110400-1704715200", DefaultRootDir, BaseStorageDir),
 		},
 	}
 
 	for _, tc := range testCases {
 		t.Run(tc.name, func(t *testing.T) {
-			pathing, err := NewDefaultStoragePathFormatter(tc.clusterID, tc.pipeline, tc.resolution)
+			pathing, err := NewDefaultStoragePathFormatter(tc.appName, tc.clusterID, tc.clusterName, tc.pipeline, tc.resolution)
 			if err != nil {
 				t.Fatalf("Unexpected error: %v", err)
 			}

+ 10 - 10
core/pkg/opencost/exporter/exporter_test.go

@@ -167,7 +167,7 @@ func TestExporters(t *testing.T) {
 	t.Run("allocation exporter", func(t *testing.T) {
 		allocSource := NewMockAllocationSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
+		p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -196,7 +196,7 @@ func TestExporters(t *testing.T) {
 	t.Run("asset exporter", func(t *testing.T) {
 		assetSource := NewMockAssetSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
+		p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -225,7 +225,7 @@ func TestExporters(t *testing.T) {
 	t.Run("network insight exporter", func(t *testing.T) {
 		netInsightSource := NewMockNetworkInsightSource()
 		memStore := storage.NewMemoryStorage()
-		p, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		p, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create path formatter: %v", err)
 		}
@@ -290,7 +290,7 @@ func TestExporters(t *testing.T) {
 			t.Fatalf("expected error creating unknown pipeline exporter, got nil")
 		}
 
-		// Invalid cluster id
+		// Invalid cluster name
 		_, err = NewComputePipelineExporter[opencost.AllocationSet](ComputeExporterConfig{ClusterName: "", Resolution: TestResolution}, memStore)
 		if err == nil {
 			t.Fatalf("expected error creating allocation pipeline exporter with empty cluster id, got nil")
@@ -320,15 +320,15 @@ func TestPipelineExportControllers(t *testing.T) {
 		time.Sleep(time.Second + (750 * time.Millisecond))
 		exportControllers.Stop()
 
-		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
+		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create allocations path formatter: %v", err)
 		}
-		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
+		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create assets path formatter: %v", err)
 		}
-		netPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		netPath, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create net insights path formatter: %v", err)
 		}
@@ -359,15 +359,15 @@ func TestPipelineExportControllers(t *testing.T) {
 		time.Sleep(time.Second + (750 * time.Millisecond))
 		exportControllers.Stop()
 
-		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
+		allocPath, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AllocationPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create allocations path formatter: %v", err)
 		}
-		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
+		assetPath, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.AssetsPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create assets path formatter: %v", err)
 		}
-		netPath, err := pathing.NewDefaultStoragePathFormatter(TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
+		netPath, err := pathing.NewDefaultStoragePathFormatter(TestAppName, TestClusterID, TestClusterName, pipelines.NetworkInsightPipelineName, ptr(TestResolution))
 		if err != nil {
 			t.Fatalf("failed to create net insights path formatter: %v", err)
 		}

+ 5 - 29
core/pkg/opencost/exporter/exporters.go

@@ -8,10 +8,8 @@ import (
 	export "github.com/opencost/opencost/core/pkg/exporter"
 	"github.com/opencost/opencost/core/pkg/exporter/pathing"
 	"github.com/opencost/opencost/core/pkg/exporter/validator"
-	"github.com/opencost/opencost/core/pkg/opencost"
 	"github.com/opencost/opencost/core/pkg/pipelines"
 	"github.com/opencost/opencost/core/pkg/storage"
-	"github.com/opencost/opencost/core/pkg/util/timeutil"
 	"github.com/opencost/opencost/core/pkg/util/typeutil"
 )
 
@@ -52,8 +50,12 @@ func NewComputePipelineExporter[T any, U export.BinaryMarshalerPtr[T], S validat
 	config ComputeExporterConfig,
 	store storage.Storage,
 ) (export.ComputeExporter[T], error) {
+	pipelineName := pipelines.NameFor[T]()
+	if pipelineName == "" {
+		return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
+	}
 
-	pathing, err := GetExporterPathing[T, U, S](config)
+	pathing, err := pathing.NewDefaultStoragePathFormatter(config.AppName, config.ClusterUID, config.ClusterName, pipelineName, &config.Resolution)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create path formatter: %w", err)
 	}
@@ -92,29 +94,3 @@ func NewComputePipelineExportController[T any, U export.BinaryMarshalerPtr[T], S
 
 	return export.NewComputeExportController(source, exporter, config.Resolution), nil
 }
-
-func GetExporterPathing[T any, U export.BinaryMarshalerPtr[T], S validator.SetConstraint[T]](
-	config ComputeExporterConfig,
-) (pathing.StoragePathFormatter[opencost.Window], error) {
-	pipelineName := pipelines.NameFor[T]()
-	if pipelineName == "" {
-		return nil, fmt.Errorf("failed to extract pipeline name for type: %s", typeutil.TypeOf[T]())
-	}
-	res := timeutil.FormatStoreResolution(config.Resolution)
-
-	var pathFormatter pathing.StoragePathFormatter[opencost.Window]
-	var err error
-
-	switch pipelineName {
-	case pipelines.KubeModelPipelineName:
-		pathFormatter, err = pathing.NewKubeModelStoragePathFormatter(config.AppName, config.ClusterUID, res)
-	default:
-		// Use ClusterName for "clusterId" here to maintain legacy pattern
-		pathFormatter, err = pathing.NewDefaultStoragePathFormatter(config.ClusterName, pipelineName, &config.Resolution)
-	}
-
-	if err != nil {
-		return nil, fmt.Errorf("failed to create path formatter[%s]: %w", pipelineName, err)
-	}
-	return pathFormatter, nil
-}