Kaynağa Gözat

Fix unsafe type assertion in custom cost plugin loader (#3721)

Signed-off-by: Claude <noreply@anthropic.com>
Co-authored-by: Claude <noreply@anthropic.com>
Warwick 1 ay önce
ebeveyn
işleme
22da667c04

+ 21 - 3
pkg/customcost/ingestor.go

@@ -21,6 +21,12 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 )
 
+// pluginConnector abstracts *plugin.Client so buildSingleDomain can be tested with mocks.
+type pluginConnector interface {
+	Client() (plugin.ClientProtocol, error)
+	Kill()
+}
+
 // IngestorStatus includes diagnostic values for a given Ingestor
 type IngestorStatus struct {
 	Created     time.Time
@@ -65,14 +71,22 @@ type CustomCostIngestor struct {
 	isStopping   atomic.Bool
 	exitBuildCh  chan string
 	exitRunCh    chan string
-	plugins      map[string]*plugin.Client
+	plugins      map[string]pluginConnector
 	pluginsLock  sync.RWMutex
 	resolution   time.Duration
 	refreshRate  time.Duration
 }
 
-// NewIngestor is an initializer for ingestor
+// NewCustomCostIngestor is an initializer for ingestor
 func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
+	connectors := make(map[string]pluginConnector, len(plugins))
+	for k, v := range plugins {
+		connectors[k] = v
+	}
+	return newCustomCostIngestor(ingestorConfig, repo, connectors, res)
+}
+
+func newCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]pluginConnector, res time.Duration) (*CustomCostIngestor, error) {
 	if repo == nil {
 		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
 	}
@@ -194,7 +208,11 @@ func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain st
 		return
 	}
 
-	custCostSrc := raw.(ocplugin.CustomCostSource)
+	custCostSrc, ok := raw.(ocplugin.CustomCostSource)
+	if !ok {
+		log.Errorf("plugin %s returned invalid type: expected CustomCostSource, got %T", domain, raw)
+		return
+	}
 
 	custCostResps := custCostSrc.GetCustomCosts(req)
 	// loop through each customCostResponse, adding to repo

+ 95 - 5
pkg/customcost/ingestor_test.go

@@ -1,6 +1,7 @@
 package customcost
 
 import (
+	"fmt"
 	"os/exec"
 	"runtime"
 	"sync"
@@ -8,6 +9,7 @@ import (
 	"time"
 
 	"github.com/hashicorp/go-plugin"
+	"github.com/opencost/opencost/core/pkg/opencost"
 )
 
 func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
@@ -25,7 +27,7 @@ func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
 	_, _ = client.Client()
 
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{
+		plugins: map[string]pluginConnector{
 			"test-plugin": client,
 		},
 	}
@@ -37,6 +39,7 @@ func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
 }
 
 func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
+	connectors := make(map[string]pluginConnector)
 	clients := make(map[string]*plugin.Client)
 	for _, name := range []string{"plugin-a", "plugin-b", "plugin-c"} {
 		cmd := exec.Command("sleep", "60")
@@ -50,10 +53,11 @@ func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
 			StartTimeout: 2 * time.Second,
 		})
 		_, _ = client.Client()
+		connectors[name] = client
 		clients[name] = client
 	}
 
-	ingestor := &CustomCostIngestor{plugins: clients}
+	ingestor := &CustomCostIngestor{plugins: connectors}
 	ingestor.Stop()
 
 	for name, client := range clients {
@@ -65,7 +69,7 @@ func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
 
 func TestIngestor_Stop_EmptyPluginsMap(t *testing.T) {
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{},
+		plugins: map[string]pluginConnector{},
 	}
 	ingestor.Stop() // covers lock path with 0 iterations
 }
@@ -77,7 +81,7 @@ func TestIngestor_Stop_NilPluginsMap(t *testing.T) {
 
 func TestIngestor_Stop_AlreadyStopping(t *testing.T) {
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{},
+		plugins: map[string]pluginConnector{},
 	}
 	ingestor.isStopping.Store(true) // atomic.Bool must use Store()!
 	ingestor.Stop()                 // should return immediately
@@ -85,7 +89,7 @@ func TestIngestor_Stop_AlreadyStopping(t *testing.T) {
 
 func TestIngestor_Stop_ConcurrentCalls(t *testing.T) {
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{},
+		plugins: map[string]pluginConnector{},
 	}
 
 	var wg sync.WaitGroup
@@ -187,3 +191,89 @@ func TestIngestor_BuildWindow_WithPlugin(t *testing.T) {
 	// BuildWindow and buildSingleDomain; client.Client() fails fast (false exits)
 	ingestor.BuildWindow(now.Add(-time.Hour), now)
 }
+
+// mockClientProtocol implements plugin.ClientProtocol for testing.
+type mockClientProtocol struct {
+	dispenseResult interface{}
+	dispenseErr    error
+}
+
+func (m *mockClientProtocol) Dispense(string) (interface{}, error) {
+	return m.dispenseResult, m.dispenseErr
+}
+func (m *mockClientProtocol) Ping() error  { return nil }
+func (m *mockClientProtocol) Close() error { return nil }
+
+// mockPluginConnector implements pluginConnector for testing.
+type mockPluginConnector struct {
+	protocol  plugin.ClientProtocol
+	clientErr error
+	killed    bool
+}
+
+func (m *mockPluginConnector) Client() (plugin.ClientProtocol, error) {
+	if m.clientErr != nil {
+		return nil, m.clientErr
+	}
+	return m.protocol, nil
+}
+
+func (m *mockPluginConnector) Kill() { m.killed = true }
+
+func TestBuildSingleDomain_InvalidPluginType_NoPanic(t *testing.T) {
+	mock := &mockPluginConnector{
+		protocol: &mockClientProtocol{
+			dispenseResult: "not a CustomCostSource", // wrong type
+		},
+	}
+
+	repo := NewMemoryRepository()
+	ingestor := &CustomCostIngestor{
+		plugins:    map[string]pluginConnector{"bad-plugin": mock},
+		resolution: time.Hour,
+		repo:       repo,
+		coverage:   map[string]opencost.Window{},
+	}
+
+	now := time.Now().UTC()
+	// Before the fix this would panic; now it should log an error and return.
+	ingestor.BuildWindow(now.Add(-time.Hour), now)
+}
+
+func TestBuildSingleDomain_DispenseError(t *testing.T) {
+	mock := &mockPluginConnector{
+		protocol: &mockClientProtocol{
+			dispenseErr: fmt.Errorf("dispense failed"),
+		},
+	}
+
+	repo := NewMemoryRepository()
+	ingestor := &CustomCostIngestor{
+		plugins:    map[string]pluginConnector{"err-plugin": mock},
+		resolution: time.Hour,
+		repo:       repo,
+		coverage:   map[string]opencost.Window{},
+	}
+
+	now := time.Now().UTC()
+	// Should handle the error gracefully without panic.
+	ingestor.BuildWindow(now.Add(-time.Hour), now)
+}
+
+func TestBuildSingleDomain_ClientError(t *testing.T) {
+	mock := &mockPluginConnector{
+		clientErr: fmt.Errorf("connection failed"),
+	}
+
+	repo := NewMemoryRepository()
+	ingestor := &CustomCostIngestor{
+		plugins:    map[string]pluginConnector{"fail-plugin": mock},
+		resolution: time.Hour,
+		repo:       repo,
+		coverage:   map[string]opencost.Window{},
+	}
+
+	now := time.Now().UTC()
+	// Should handle the error gracefully without panic.
+	ingestor.BuildWindow(now.Add(-time.Hour), now)
+}

+ 7 - 8
pkg/customcost/pipelineservice_test.go

@@ -10,7 +10,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/hashicorp/go-plugin"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
@@ -280,7 +279,7 @@ func TestPipelineService_Stop_WithNilIngestors(t *testing.T) {
 func TestPipelineService_Stop_PartialNilIngestors(t *testing.T) {
 	hourly := &CustomCostIngestor{
 		key:     "hourly",
-		plugins: make(map[string]*plugin.Client),
+		plugins: make(map[string]pluginConnector),
 	}
 
 	ps := &PipelineService{
@@ -298,11 +297,11 @@ func TestPipelineService_Stop_ShutdownLogging(t *testing.T) {
 	ps := &PipelineService{
 		hourlyIngestor: &CustomCostIngestor{
 			key:     "hourly",
-			plugins: make(map[string]*plugin.Client),
+			plugins: make(map[string]pluginConnector),
 		},
 		dailyIngestor: &CustomCostIngestor{
 			key:     "daily",
-			plugins: make(map[string]*plugin.Client),
+			plugins: make(map[string]pluginConnector),
 		},
 		domains: []string{},
 	}
@@ -324,8 +323,8 @@ func TestPipelineService_Stop_NilIngestors(t *testing.T) {
 }
 
 func TestPipelineService_Stop_WithIngestors(t *testing.T) {
-	hourly := &CustomCostIngestor{plugins: map[string]*plugin.Client{}}
-	daily := &CustomCostIngestor{plugins: map[string]*plugin.Client{}}
+	hourly := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
+	daily := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
 	ps := &PipelineService{
 		hourlyIngestor: hourly,
 		dailyIngestor:  daily,
@@ -335,14 +334,14 @@ func TestPipelineService_Stop_WithIngestors(t *testing.T) {
 
 func TestPipelineService_Stop_OnlyHourlyIngestor(t *testing.T) {
 	ps := &PipelineService{
-		hourlyIngestor: &CustomCostIngestor{plugins: map[string]*plugin.Client{}},
+		hourlyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
 	}
 	ps.Stop() // should not panic when dailyIngestor is nil
 }
 
 func TestPipelineService_Stop_OnlyDailyIngestor(t *testing.T) {
 	ps := &PipelineService{
-		dailyIngestor: &CustomCostIngestor{plugins: map[string]*plugin.Client{}},
+		dailyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
 	}
 	ps.Stop() // should not panic when hourlyIngestor is nil
 }