Explorar o código

fix: guard cloudcost Status coverage read and stop ClusterMap ticker (#3865)

Signed-off-by: Tushar Verma <tusharmyself06@gmail.com>
Tushar-Verma hai 1 día
pai
achega
4d117aabe1

+ 1 - 0
modules/prometheus-source/pkg/prom/clustermap.go

@@ -47,6 +47,7 @@ func newPrometheusClusterMap(contextFactory *ContextFactory, cip clusters.Cluste
 
 		// Tick on interval and refresh clusters
 		ticker := time.NewTicker(refresh)
+		defer ticker.Stop()
 		for {
 			select {
 			case <-ticker.C:

+ 7 - 1
pkg/cloudcost/ingestor.go

@@ -188,12 +188,18 @@ func (ing *ingestor) Stop() {
 
 // Status returns an IngestorStatus that describes the current state of the ingestor
 func (ing *ingestor) Status() IngestorStatus {
+	// Read coverage under the lock; the build loop reassigns it under
+	// coverageLock, so an unlocked read here is a data race.
+	ing.coverageLock.Lock()
+	coverage := ing.coverage
+	ing.coverageLock.Unlock()
+
 	return IngestorStatus{
 		Created:          ing.creationTime,
 		LastRun:          ing.lastRun,
 		NextRun:          ing.lastRun.Add(ing.config.RefreshRate).UTC(),
 		Runs:             ing.runs,
-		Coverage:         ing.coverage,
+		Coverage:         coverage,
 		ConnectionStatus: ing.integration.GetStatus(),
 	}
 }

+ 47 - 0
pkg/cloudcost/ingestor_test.go

@@ -0,0 +1,47 @@
+package cloudcost
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/opencost/opencost/core/pkg/opencost"
+	"github.com/opencost/opencost/pkg/cloud"
+)
+
+type fakeIntegration struct{}
+
+func (fakeIntegration) GetCloudCost(time.Time, time.Time) (*opencost.CloudCostSetRange, error) {
+	return nil, nil
+}
+func (fakeIntegration) GetStatus() cloud.ConnectionStatus     { return cloud.InitialStatus }
+func (fakeIntegration) RefreshStatus() cloud.ConnectionStatus { return cloud.InitialStatus }
+
+// TestIngestor_Status_ConcurrentWithCoverageWrite guards against a data race on
+// the coverage window: Status() read it without holding coverageLock while the
+// build loop reassigns it under the lock. Run with -race to detect it.
+func TestIngestor_Status_ConcurrentWithCoverageWrite(t *testing.T) {
+	now := time.Now().UTC()
+	ing := &ingestor{
+		integration: fakeIntegration{},
+		coverage:    opencost.NewClosedWindow(now, now.Add(time.Hour)),
+	}
+
+	window := opencost.NewClosedWindow(now, now.Add(2*time.Hour))
+
+	var wg sync.WaitGroup
+	wg.Add(2)
+	go func() {
+		defer wg.Done()
+		for i := 0; i < 2000; i++ {
+			ing.expandCoverage(window)
+		}
+	}()
+	go func() {
+		defer wg.Done()
+		for i := 0; i < 2000; i++ {
+			_ = ing.Status()
+		}
+	}()
+	wg.Wait()
+}