Procházet zdrojové kódy

Merge branch 'develop' into dependabot/go_modules/modules/prometheus-source/github.com/rs/zerolog-1.35.0

Warwick před 1 měsícem
rodič
revize
87bcae9cb4

+ 25 - 0
CONTRIBUTING.md

@@ -126,6 +126,31 @@ To run these tests:
 - Navigate to cost-model/test
 - Run `go test -timeout 700s` from the testing directory. The tests right now take about 10 minutes (600s) to run because they bring up and down pods and wait for Prometheus to scrape data about them.
 
+## Code Review Standards
+
+All pull requests must be reviewed before merging. The review process ensures:
+
+### What reviewers check:
+- **Correctness:** Does the code do what it claims?
+- **Tests:** Are new features and bug fixes covered by tests?
+- **Style:** Does the code follow Go conventions (`gofmt`, `go vet`)?
+- **Security:** Are inputs validated? Are credentials handled safely?
+- **Performance:** Are there obvious performance issues (unbounded allocations, N+1 queries)?
+
+### Review requirements:
+- At least one approval from a Committer or Maintainer is required
+- The reviewer must be a different person than the PR author
+- For security-sensitive changes, review by a Maintainer is required
+- Emergency fixes may bypass review with post-merge review required within 48 hours (per [GOVERNANCE.md](GOVERNANCE.md))
+
+## Regression Tests
+
+When fixing a bug, contributors SHOULD add a test that reproduces the bug before applying the fix. This ensures the bug does not recur. As a project-wide goal, at least 50% of bugs fixed in any six-month window should have corresponding regression tests. This is tracked by maintainers using issues labeled `bug` and measured during release reviews; it is an aspirational target for the project as a whole, not a requirement applied to individual contributors.
+
+## Finding Issues to Work On
+
+Look for issues labeled [`good first issue`](https://github.com/opencost/opencost/labels/good%20first%20issue) or [`help wanted`](https://github.com/opencost/opencost/labels/help%20wanted) for a curated list of tasks suitable for new contributors.
+
 ## Certificate of Origin
 
 By contributing to this project, you certify that your contribution was created in whole or in part by you and that you have the right to submit it under the open source license indicated in the project. In other words, please confirm that you, as a contributor, have the legal right to make the contribution. This is enforced on Pull Requests and requires `Signed-off-by` with the email address for the author in the commit message.

+ 37 - 0
docs/ROADMAP.md

@@ -0,0 +1,37 @@
+# OpenCost Roadmap
+
+This roadmap reflects the current priorities for the OpenCost project. It is reviewed quarterly and discussed in the biweekly [Working Group meetings](https://zoom-lfx.platform.linuxfoundation.org/meetings/opencost?view=list).
+
+## Current Focus Areas
+
+- **Cloud cost integration:** Connecting cloud billing data to the demo environment, cloud cost bug fixes, and multi-account support
+- **UI revamp:** Major frontend overhaul via LFX mentorship — new UI released, stabilizing before next core release
+- **OpenCost AI:** New sub-project for airgapped private cost models (CI/CD, testing models at scale, finding smallest viable model)
+- **First-class LLM cost support:** Design proposal for native LLM cost tracking in OpenCost core
+- **Integration test expansion:** Pod restart tests, network cost tests, resolving Prometheus-less (promless) vs Prometheus-backed test discrepancies
+- **Plugin ecosystem:** Snowflake, GitHub, and currency conversion plugins proposed; MongoDB reference implementation for currency support
+- **Helm chart signing:** Cryptographic signing of Helm charts (research in progress)
+- **Data persistence and export:** Mounting persistence for promless mode, potential S3 export for cost data
+- **Supply chain security:** Achieving [OpenSSF Best Practices](https://www.bestpractices.dev/projects/6219) Silver and Gold badges, cryptographically signed releases via Sigstore/cosign, SLSA build provenance, and SPDX license compliance across all source files
+- **Community growth:** EMEA/APAC meeting cadence, YouTube channel for meeting recordings, DigitalOcean cloud sponsorship for testing
+
+## Recent Milestones
+
+- New OpenCost UI released (v1.0 via LFX mentorship)
+- OpenCost AI sub-project introduced (first PR merged)
+- MCP server released in v1.118 with right-sizing recommendations
+- KubeModel 1.0 shipped (Fall 2025 LFX mentorship)
+- SBOM generation integrated across core and UI repos (SPDX + CycloneDX)
+- OpenSSF Scorecard integration
+- Community Maintainer role introduced
+- Gateway API deployed for infrastructure
+- Spot node testing enabled in integration test cluster
+- Copilot AI review bot enabled across repositories (provided by CNCF)
+- OpenCost Specification v0.1 published
+- Collector data source shipped (alternative to Prometheus)
+
+## How to Influence the Roadmap
+
+- Join the [OpenCost Working Group](https://zoom-lfx.platform.linuxfoundation.org/meetings/opencost?view=week) (biweekly, alternating between EMEA/APAC at 15:00 UTC and NA at 21:00 UTC)
+- Propose changes via [GitHub Issues](https://github.com/opencost/opencost/issues)
+- Discuss ideas in the [#opencost](https://cloud-native.slack.com/archives/C03D56FPD4G) channel on [CNCF Slack](https://slack.cncf.io/)

+ 45 - 8
pkg/cloud/alibaba/provider.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"fmt"
 	"io"
+	"math"
 	"os"
 	"regexp"
 	"strconv"
@@ -60,8 +61,8 @@ const (
 )
 
 var (
-	// Regular expression to get the numerical value of PV suffix with GiB from *v1.PersistentVolume.
-	sizeRegEx = regexp.MustCompile("(.*?)Gi")
+	// sizeRegEx parses a PV capacity string into a numeric part and an optional binary SI suffix (Ki, Mi, Gi, Ti).
+	sizeRegEx = regexp.MustCompile(`^(\d+(?:\.\d+)?)(Ki|Mi|Gi|Ti)?$`)
 )
 
 // Variable to keep track of instance families that fail in DescribePrice API due improper defaulting of systemDisk if the information is not available
@@ -1296,21 +1297,57 @@ func generateSlimK8sNodeFromV1Node(node *clustercache.Node) *SlimK8sNode {
 	return NewSlimK8sNode(instanceType, regionID, priceUnit, memorySizeInKiB, osType, providerID, instanceFamily, IsIoOptimized, systemDisk)
 }
 
-// getNumericalValueFromResourceQuantity returns the numericalValue of the resourceQuantity
-// An example is: 20Gi returns to 20. If any error occurs it returns the default value used in describePrice API which is 2000.
+// getNumericalValueFromResourceQuantity converts a Kubernetes PV capacity string (e.g. "20Gi", "48828125Ki")
+// into a whole GiB integer string, as required by the Alibaba DescribePrice API.
+// Returns ALIBABA_DEFAULT_DATADISK_SIZE if the quantity cannot be parsed.
 func getNumericalValueFromResourceQuantity(quantity string) (value string) {
-	// defaulting when any panic or empty string occurs.
 	defer func() {
-		log.Debugf("unable to determine the size of the PV so defaulting the size to %s", ALIBABA_DEFAULT_DATADISK_SIZE)
 		if err := recover(); err != nil {
+			log.Debugf("panic while parsing PV capacity %q, defaulting to %s: %v", quantity, ALIBABA_DEFAULT_DATADISK_SIZE, err)
 			value = ALIBABA_DEFAULT_DATADISK_SIZE
 		}
 		if value == "" {
+			log.Debugf("unable to determine the size of the PV from quantity %q, defaulting to %s", quantity, ALIBABA_DEFAULT_DATADISK_SIZE)
 			value = ALIBABA_DEFAULT_DATADISK_SIZE
 		}
 	}()
-	res := sizeRegEx.FindAllStringSubmatch(quantity, 1)
-	value = res[0][1]
+
+	res := sizeRegEx.FindStringSubmatch(strings.TrimSpace(quantity))
+	if len(res) < 2 || res[1] == "" {
+		return
+	}
+
+	numericPart, err := strconv.ParseFloat(res[1], 64)
+	if err != nil || numericPart <= 0 {
+		return
+	}
+
+	unit := ""
+	if len(res) >= 3 {
+		unit = res[2]
+	}
+
+	var sizeInGiB float64
+	switch unit {
+	case "Ki":
+		sizeInGiB = numericPart / (1024 * 1024)
+	case "Mi":
+		sizeInGiB = numericPart / 1024
+	case "Gi":
+		sizeInGiB = numericPart
+	case "Ti":
+		sizeInGiB = numericPart * 1024
+	default:
+		sizeInGiB = numericPart / (1024 * 1024 * 1024)
+	}
+
+	// ceil so we never underreport disk size to the DescribePrice API.
+	sizeInGiBInt := int64(math.Ceil(sizeInGiB))
+	if sizeInGiBInt <= 0 {
+		return
+	}
+
+	value = strconv.FormatInt(sizeInGiBInt, 10)
 	return
 }
 

+ 69 - 3
pkg/cloud/alibaba/provider_test.go

@@ -770,20 +770,86 @@ func TestGetNumericalValueFromResourceQuantity(t *testing.T) {
 			expectedValue:        "10",
 		},
 		{
-			name:                 "negative scenario: when inputResourceQuantity is Gi",
+			name:                 "negative scenario: when inputResourceQuantity is Gi (no numeric prefix)",
 			inputResourceQuanity: "Gi",
 			expectedValue:        ALIBABA_DEFAULT_DATADISK_SIZE,
 		},
 		{
-			name:                 "negative scenario: when inputResourceQuantity is 10",
+			name:                 "edge case: when inputResourceQuantity is 10 (plain bytes, rounds up to 1 GiB)",
 			inputResourceQuanity: "10",
-			expectedValue:        ALIBABA_DEFAULT_DATADISK_SIZE,
+			expectedValue:        "1",
 		},
 		{
 			name:                 "negative scenario: when inputResourceQuantity is empty string",
 			inputResourceQuanity: "",
 			expectedValue:        ALIBABA_DEFAULT_DATADISK_SIZE,
 		},
+		{
+			name:                 "negative scenario: when inputResourceQuantity is non-numeric",
+			inputResourceQuanity: "abc",
+			expectedValue:        ALIBABA_DEFAULT_DATADISK_SIZE,
+		},
+		{
+			// 48828125Ki / (1024*1024) = 46.875 GiB, rounds up to 47
+			name:                 "positive scenario: Ki unit - 48828125Ki",
+			inputResourceQuanity: "48828125Ki",
+			expectedValue:        "47",
+		},
+		{
+			name:                 "positive scenario: Ki unit - 1048576Ki (exactly 1 GiB)",
+			inputResourceQuanity: "1048576Ki",
+			expectedValue:        "1",
+		},
+		{
+			name:                 "positive scenario: Ki unit - 2097152Ki (exactly 2 GiB)",
+			inputResourceQuanity: "2097152Ki",
+			expectedValue:        "2",
+		},
+		{
+			name:                 "positive scenario: Mi unit - 512Mi (rounds up to 1 GiB)",
+			inputResourceQuanity: "512Mi",
+			expectedValue:        "1",
+		},
+		{
+			name:                 "positive scenario: Mi unit - 1024Mi (exactly 1 GiB)",
+			inputResourceQuanity: "1024Mi",
+			expectedValue:        "1",
+		},
+		{
+			name:                 "positive scenario: Mi unit - 20480Mi (exactly 20 GiB)",
+			inputResourceQuanity: "20480Mi",
+			expectedValue:        "20",
+		},
+		{
+			name:                 "positive scenario: Gi unit - 20Gi",
+			inputResourceQuanity: "20Gi",
+			expectedValue:        "20",
+		},
+		{
+			name:                 "positive scenario: Gi unit - 100Gi",
+			inputResourceQuanity: "100Gi",
+			expectedValue:        "100",
+		},
+		{
+			name:                 "positive scenario: Ti unit - 1Ti",
+			inputResourceQuanity: "1Ti",
+			expectedValue:        "1024",
+		},
+		{
+			name:                 "positive scenario: Ti unit - 2Ti",
+			inputResourceQuanity: "2Ti",
+			expectedValue:        "2048",
+		},
+		{
+			name:                 "positive scenario: fractional Gi unit - 1.5Gi (rounds up to 2 GiB)",
+			inputResourceQuanity: "1.5Gi",
+			expectedValue:        "2",
+		},
+		{
+			name:                 "positive scenario: fractional Mi unit - 1536Mi (1.5 GiB, rounds up to 2 GiB)",
+			inputResourceQuanity: "1536Mi",
+			expectedValue:        "2",
+		},
 	}
 	for _, c := range cases {
 		t.Run(c.name, func(t *testing.T) {

+ 6 - 2
pkg/cloud/aws/provider.go

@@ -1403,7 +1403,9 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 			UsageType:    PreemptibleType,
 		}, meta, nil
 	} else if aws.isPreemptible(key) { // Preemptible but we don't have any data in the pricing report.
-		log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
+		if aws.SpotRefreshEnabled() {
+			log.DedupedWarningf(5, "Node %s marked preemptible but we have no data in spot feed", k.ID())
+		}
 		if publicPricingFound {
 			// return public price if found
 			return &models.Node{
@@ -1419,7 +1421,9 @@ func (aws *AWS) createNode(terms *AWSProductTerms, usageType string, k models.Ke
 			}, meta, nil
 		} else {
 			// return defaults if public pricing not found
-			log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
+			if aws.SpotRefreshEnabled() {
+				log.DedupedWarningf(5, "Could not find Node %s's public pricing info, using default configured spot prices instead", k.ID())
+			}
 			return &models.Node{
 				VCPU:         terms.VCpu,
 				VCPUCost:     aws.BaseSpotCPUPrice,

+ 9 - 0
pkg/costmodel/csv_export.go

@@ -183,6 +183,15 @@ func (e *csvExporter) writeCSVToWriter(ctx context.Context, w io.Writer, dates [
 				return data.alloc.Properties.Namespace
 			},
 		},
+		{
+			column: "Cluster",
+			value: func(data rowData) string {
+				if data.alloc.Properties == nil {
+					return ""
+				}
+				return data.alloc.Properties.Cluster
+			},
+		},
 		{
 			column: "ControllerKind",
 			value: func(data rowData) string {

+ 8 - 7
pkg/costmodel/csv_export_test.go

@@ -49,6 +49,7 @@ func Test_UpdateCSV(t *testing.T) {
 								},
 							}, // 2 PVBytes, 2 PVCost
 							Properties: &opencost.AllocationProperties{
+								Cluster:        "test-cluster",
 								Namespace:      "test-namespace",
 								Controller:     "test-controller-name",
 								ControllerKind: "test-controller-kind",
@@ -66,8 +67,8 @@ func Test_UpdateCSV(t *testing.T) {
 		assert.Len(t, model.ComputeAllocationCalls(), 1)
 		assert.Equal(t, time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), model.ComputeAllocationCalls()[0].Start)
 		assert.Equal(t, time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), model.ComputeAllocationCalls()[0].End)
-		assert.Equal(t, `Date,Namespace,ControllerKind,ControllerName,Pod,Container,CPUCoreUsageAverage,CPUCoreRequestAverage,RAMBytesUsageAverage,RAMBytesRequestAverage,NetworkReceiveBytes,NetworkTransferBytes,GPUs,PVBytes,CPUCost,RAMCost,NetworkCost,PVCost,GPUCost,TotalCost
-2021-01-01,test-namespace,test-controller-kind,test-controller-name,test-pod,test-container,0.1,0.2,0.4,0.5,11,10,2,2,0.3,0.6,0.9,2,0.8,4.6000000000000005
+		assert.Equal(t, `Date,Namespace,Cluster,ControllerKind,ControllerName,Pod,Container,CPUCoreUsageAverage,CPUCoreRequestAverage,RAMBytesUsageAverage,RAMBytesRequestAverage,NetworkReceiveBytes,NetworkTransferBytes,GPUs,PVBytes,CPUCost,RAMCost,NetworkCost,PVCost,GPUCost,TotalCost
+2021-01-01,test-namespace,test-cluster,test-controller-kind,test-controller-name,test-pod,test-container,0.1,0.2,0.4,0.5,11,10,2,2,0.3,0.6,0.9,2,0.8,4.6000000000000005
 `, string(storage.Data))
 	})
 
@@ -101,8 +102,8 @@ func Test_UpdateCSV(t *testing.T) {
 		require.NoError(t, err)
 		// uploaded a single file with the data
 		assert.Len(t, model.ComputeAllocationCalls(), 1)
-		assert.Equal(t, `Date,Namespace,ControllerKind,ControllerName,Pod,Container,CPUCoreUsageAverage,CPUCoreRequestAverage,RAMBytesUsageAverage,RAMBytesRequestAverage,NetworkReceiveBytes,NetworkTransferBytes,GPUs,PVBytes,CPUCost,RAMCost,NetworkCost,PVCost,GPUCost,TotalCost,Labels,Label_test-label1,Label_test-label2
-2021-01-01,test-namespace,test-controller-kind,test-controller-name,test-pod,test-container,0,0,0,0,0,0,0,0,0,0,0,0,0,0,"{""test-label1"":""test-value1"",""test-label2"":""test-value2""}",test-value1,test-value2
+		assert.Equal(t, `Date,Namespace,Cluster,ControllerKind,ControllerName,Pod,Container,CPUCoreUsageAverage,CPUCoreRequestAverage,RAMBytesUsageAverage,RAMBytesRequestAverage,NetworkReceiveBytes,NetworkTransferBytes,GPUs,PVBytes,CPUCost,RAMCost,NetworkCost,PVCost,GPUCost,TotalCost,Labels,Label_test-label1,Label_test-label2
+2021-01-01,test-namespace,,test-controller-kind,test-controller-name,test-pod,test-container,0,0,0,0,0,0,0,0,0,0,0,0,0,0,"{""test-label1"":""test-value1"",""test-label2"":""test-value2""}",test-value1,test-value2
 `, string(storage.Data))
 	})
 
@@ -137,9 +138,9 @@ func Test_UpdateCSV(t *testing.T) {
 		// 2021-01-01 is already in the export file, so we only compute for 2021-01-02
 		assert.Equal(t, time.Date(2021, 1, 2, 0, 0, 0, 0, time.UTC), model.ComputeAllocationCalls()[0].Start)
 		assert.Equal(t, time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), model.ComputeAllocationCalls()[0].End)
-		assert.Equal(t, `Date,Namespace,CPUCoreUsageAverage,CPUCoreRequestAverage,CPUCost,RAMBytesUsageAverage,RAMBytesRequestAverage,RAMCost,Label_app,ControllerKind,ControllerName,Pod,Container,NetworkReceiveBytes,NetworkTransferBytes,GPUs,PVBytes,NetworkCost,PVCost,GPUCost,TotalCost
-2021-01-01,test-namespace,0.1,0.2,0.3,0.4,0.5,0.6,app1,,,,,,,,,,,,
-2021-01-02,test-namespace,0,0,1,0,0,0,,,,,,0,0,0,0,0,0,0,1
+		assert.Equal(t, `Date,Namespace,CPUCoreUsageAverage,CPUCoreRequestAverage,CPUCost,RAMBytesUsageAverage,RAMBytesRequestAverage,RAMCost,Label_app,Cluster,ControllerKind,ControllerName,Pod,Container,NetworkReceiveBytes,NetworkTransferBytes,GPUs,PVBytes,NetworkCost,PVCost,GPUCost,TotalCost
+2021-01-01,test-namespace,0.1,0.2,0.3,0.4,0.5,0.6,app1,,,,,,,,,,,,,
+2021-01-02,test-namespace,0,0,1,0,0,0,,,,,,,0,0,0,0,0,0,0,1
 `, string(storage.Data))
 	})
 

+ 21 - 3
pkg/customcost/ingestor.go

@@ -21,6 +21,12 @@ import (
 	"github.com/opencost/opencost/pkg/env"
 )
 
+// pluginConnector abstracts *plugin.Client so buildSingleDomain can be tested with mocks.
+type pluginConnector interface {
+	Client() (plugin.ClientProtocol, error)
+	Kill()
+}
+
 // IngestorStatus includes diagnostic values for a given Ingestor
 type IngestorStatus struct {
 	Created     time.Time
@@ -65,14 +71,22 @@ type CustomCostIngestor struct {
 	isStopping   atomic.Bool
 	exitBuildCh  chan string
 	exitRunCh    chan string
-	plugins      map[string]*plugin.Client
+	plugins      map[string]pluginConnector
 	pluginsLock  sync.RWMutex
 	resolution   time.Duration
 	refreshRate  time.Duration
 }
 
-// NewIngestor is an initializer for ingestor
+// NewCustomCostIngestor is an initializer for ingestor
 func NewCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]*plugin.Client, res time.Duration) (*CustomCostIngestor, error) {
+	connectors := make(map[string]pluginConnector, len(plugins))
+	for k, v := range plugins {
+		connectors[k] = v
+	}
+	return newCustomCostIngestor(ingestorConfig, repo, connectors, res)
+}
+
+func newCustomCostIngestor(ingestorConfig *CustomCostIngestorConfig, repo Repository, plugins map[string]pluginConnector, res time.Duration) (*CustomCostIngestor, error) {
 	if repo == nil {
 		return nil, fmt.Errorf("CustomCost: NewCustomCostIngestor: repository connot be nil")
 	}
@@ -194,7 +208,11 @@ func (ing *CustomCostIngestor) buildSingleDomain(start, end time.Time, domain st
 		return
 	}
 
-	custCostSrc := raw.(ocplugin.CustomCostSource)
+	custCostSrc, ok := raw.(ocplugin.CustomCostSource)
+	if !ok {
+		log.Errorf("plugin %s returned invalid type: expected CustomCostSource, got %T", domain, raw)
+		return
+	}
 
 	custCostResps := custCostSrc.GetCustomCosts(req)
 	// loop through each customCostResponse, adding to repo

+ 95 - 5
pkg/customcost/ingestor_test.go

@@ -1,6 +1,7 @@
 package customcost
 
 import (
+	"fmt"
 	"os/exec"
 	"runtime"
 	"sync"
@@ -8,6 +9,7 @@ import (
 	"time"
 
 	"github.com/hashicorp/go-plugin"
+	"github.com/opencost/opencost/core/pkg/opencost"
 )
 
 func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
@@ -25,7 +27,7 @@ func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
 	_, _ = client.Client()
 
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{
+		plugins: map[string]pluginConnector{
 			"test-plugin": client,
 		},
 	}
@@ -37,6 +39,7 @@ func TestIngestor_Stop_KillsPluginProcesses(t *testing.T) {
 }
 
 func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
+	connectors := make(map[string]pluginConnector)
 	clients := make(map[string]*plugin.Client)
 	for _, name := range []string{"plugin-a", "plugin-b", "plugin-c"} {
 		cmd := exec.Command("sleep", "60")
@@ -50,10 +53,11 @@ func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
 			StartTimeout: 2 * time.Second,
 		})
 		_, _ = client.Client()
+		connectors[name] = client
 		clients[name] = client
 	}
 
-	ingestor := &CustomCostIngestor{plugins: clients}
+	ingestor := &CustomCostIngestor{plugins: connectors}
 	ingestor.Stop()
 
 	for name, client := range clients {
@@ -65,7 +69,7 @@ func TestIngestor_Stop_MultiplePlugins(t *testing.T) {
 
 func TestIngestor_Stop_EmptyPluginsMap(t *testing.T) {
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{},
+		plugins: map[string]pluginConnector{},
 	}
 	ingestor.Stop() // covers lock path with 0 iterations
 }
@@ -77,7 +81,7 @@ func TestIngestor_Stop_NilPluginsMap(t *testing.T) {
 
 func TestIngestor_Stop_AlreadyStopping(t *testing.T) {
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{},
+		plugins: map[string]pluginConnector{},
 	}
 	ingestor.isStopping.Store(true) // atomic.Bool must use Store()!
 	ingestor.Stop()                 // should return immediately
@@ -85,7 +89,7 @@ func TestIngestor_Stop_AlreadyStopping(t *testing.T) {
 
 func TestIngestor_Stop_ConcurrentCalls(t *testing.T) {
 	ingestor := &CustomCostIngestor{
-		plugins: map[string]*plugin.Client{},
+		plugins: map[string]pluginConnector{},
 	}
 
 	var wg sync.WaitGroup
@@ -187,3 +191,89 @@ func TestIngestor_BuildWindow_WithPlugin(t *testing.T) {
 	// BuildWindow and buildSingleDomain; client.Client() fails fast (false exits)
 	ingestor.BuildWindow(now.Add(-time.Hour), now)
 }
+
+// mockClientProtocol implements plugin.ClientProtocol for testing.
+type mockClientProtocol struct {
+	dispenseResult interface{}
+	dispenseErr    error
+}
+
+func (m *mockClientProtocol) Dispense(string) (interface{}, error) {
+	return m.dispenseResult, m.dispenseErr
+}
+func (m *mockClientProtocol) Ping() error  { return nil }
+func (m *mockClientProtocol) Close() error { return nil }
+
+// mockPluginConnector implements pluginConnector for testing.
+type mockPluginConnector struct {
+	protocol  plugin.ClientProtocol
+	clientErr error
+	killed    bool
+}
+
+func (m *mockPluginConnector) Client() (plugin.ClientProtocol, error) {
+	if m.clientErr != nil {
+		return nil, m.clientErr
+	}
+	return m.protocol, nil
+}
+
+func (m *mockPluginConnector) Kill() { m.killed = true }
+
+func TestBuildSingleDomain_InvalidPluginType_NoPanic(t *testing.T) {
+	mock := &mockPluginConnector{
+		protocol: &mockClientProtocol{
+			dispenseResult: "not a CustomCostSource", // wrong type
+		},
+	}
+
+	repo := NewMemoryRepository()
+	ingestor := &CustomCostIngestor{
+		plugins:    map[string]pluginConnector{"bad-plugin": mock},
+		resolution: time.Hour,
+		repo:       repo,
+		coverage:   map[string]opencost.Window{},
+	}
+
+	now := time.Now().UTC()
+	// Before the fix this would panic; now it should log an error and return.
+	ingestor.BuildWindow(now.Add(-time.Hour), now)
+}
+
+func TestBuildSingleDomain_DispenseError(t *testing.T) {
+	mock := &mockPluginConnector{
+		protocol: &mockClientProtocol{
+			dispenseErr: fmt.Errorf("dispense failed"),
+		},
+	}
+
+	repo := NewMemoryRepository()
+	ingestor := &CustomCostIngestor{
+		plugins:    map[string]pluginConnector{"err-plugin": mock},
+		resolution: time.Hour,
+		repo:       repo,
+		coverage:   map[string]opencost.Window{},
+	}
+
+	now := time.Now().UTC()
+	// Should handle the error gracefully without panic.
+	ingestor.BuildWindow(now.Add(-time.Hour), now)
+}
+
+func TestBuildSingleDomain_ClientError(t *testing.T) {
+	mock := &mockPluginConnector{
+		clientErr: fmt.Errorf("connection failed"),
+	}
+
+	repo := NewMemoryRepository()
+	ingestor := &CustomCostIngestor{
+		plugins:    map[string]pluginConnector{"fail-plugin": mock},
+		resolution: time.Hour,
+		repo:       repo,
+		coverage:   map[string]opencost.Window{},
+	}
+
+	now := time.Now().UTC()
+	// Should handle the error gracefully without panic.
+	ingestor.BuildWindow(now.Add(-time.Hour), now)
+}

+ 7 - 8
pkg/customcost/pipelineservice_test.go

@@ -10,7 +10,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/hashicorp/go-plugin"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/opencost/opencost/core/pkg/util/timeutil"
 )
@@ -280,7 +279,7 @@ func TestPipelineService_Stop_WithNilIngestors(t *testing.T) {
 func TestPipelineService_Stop_PartialNilIngestors(t *testing.T) {
 	hourly := &CustomCostIngestor{
 		key:     "hourly",
-		plugins: make(map[string]*plugin.Client),
+		plugins: make(map[string]pluginConnector),
 	}
 
 	ps := &PipelineService{
@@ -298,11 +297,11 @@ func TestPipelineService_Stop_ShutdownLogging(t *testing.T) {
 	ps := &PipelineService{
 		hourlyIngestor: &CustomCostIngestor{
 			key:     "hourly",
-			plugins: make(map[string]*plugin.Client),
+			plugins: make(map[string]pluginConnector),
 		},
 		dailyIngestor: &CustomCostIngestor{
 			key:     "daily",
-			plugins: make(map[string]*plugin.Client),
+			plugins: make(map[string]pluginConnector),
 		},
 		domains: []string{},
 	}
@@ -324,8 +323,8 @@ func TestPipelineService_Stop_NilIngestors(t *testing.T) {
 }
 
 func TestPipelineService_Stop_WithIngestors(t *testing.T) {
-	hourly := &CustomCostIngestor{plugins: map[string]*plugin.Client{}}
-	daily := &CustomCostIngestor{plugins: map[string]*plugin.Client{}}
+	hourly := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
+	daily := &CustomCostIngestor{plugins: map[string]pluginConnector{}}
 	ps := &PipelineService{
 		hourlyIngestor: hourly,
 		dailyIngestor:  daily,
@@ -335,14 +334,14 @@ func TestPipelineService_Stop_WithIngestors(t *testing.T) {
 
 func TestPipelineService_Stop_OnlyHourlyIngestor(t *testing.T) {
 	ps := &PipelineService{
-		hourlyIngestor: &CustomCostIngestor{plugins: map[string]*plugin.Client{}},
+		hourlyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
 	}
 	ps.Stop() // should not panic when dailyIngestor is nil
 }
 
 func TestPipelineService_Stop_OnlyDailyIngestor(t *testing.T) {
 	ps := &PipelineService{
-		dailyIngestor: &CustomCostIngestor{plugins: map[string]*plugin.Client{}},
+		dailyIngestor: &CustomCostIngestor{plugins: map[string]pluginConnector{}},
 	}
 	ps.Stop() // should not panic when hourlyIngestor is nil
 }