Pārlūkot izejas kodu

Add mechanism for waiting on resets between stopping and starting again, add to cluster exporter

Matt Bolt 4 gadi atpakaļ
vecāks
revīzija
3b8f98b801

+ 11 - 11
pkg/clustercache/clusterexporter.go

@@ -5,6 +5,7 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/config"
 	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/kubecost/cost-model/pkg/util/atomic"
 	"github.com/kubecost/cost-model/pkg/util/json"
 
 	appsv1 "k8s.io/api/apps/v1"
@@ -38,7 +39,7 @@ type ClusterExporter struct {
 	cluster  ClusterCache
 	target   *config.ConfigFile
 	interval time.Duration
-	stop     chan struct{}
+	runState atomic.AtomicRunState
 }
 
 // NewClusterExporter creates a new ClusterExporter instance for exporting the kubernetes cluster.
@@ -52,11 +53,15 @@ func NewClusterExporter(cluster ClusterCache, target *config.ConfigFile, interva
 
 // Run starts the automated process of running Export on a specific interval.
 func (ce *ClusterExporter) Run() {
-	if ce.stop != nil {
+	// in the event there is a race that occurs between Run() and Stop(), we
+	// ensure that we wait for the reset to occur before starting again
+	ce.runState.WaitForReset()
+
+	if !ce.runState.Start() {
+		log.Warningf("ClusterExporter already running")
 		return
 	}
 
-	ce.stop = make(chan struct{})
 	go func() {
 		for {
 			err := ce.Export()
@@ -66,7 +71,8 @@ func (ce *ClusterExporter) Run() {
 
 			select {
 			case <-time.After(ce.interval):
-			case <-ce.stop:
+			case <-ce.runState.OnStop():
+				ce.runState.Reset()
 				return
 			}
 		}
@@ -75,13 +81,7 @@ func (ce *ClusterExporter) Run() {
 
 // Stop halts the Cluster export on an interval
 func (ce *ClusterExporter) Stop() {
-	if ce.stop == nil {
-		log.Warningf("Cluster exporter is already stopped.")
-		return
-	}
-
-	close(ce.stop)
-	ce.stop = nil
+	ce.runState.Stop()
 }
 
 // Export stores the cluster cache data into a PODO, marshals as JSON, and saves it to the

+ 9 - 8
pkg/config/configfile.go

@@ -237,6 +237,12 @@ func (cf *ConfigFile) RemoveAllHandlers() {
 // runWatcher creates a go routine which will poll the stat of a storage target on a specific
 // interval and dispatch created, modified, and deleted events for that file.
 func (cf *ConfigFile) runWatcher() {
+	// we wait for a reset on the run state prior to starting, which
+	// will only block iff the run state is in the process of stopping
+	cf.runState.WaitForReset()
+
+	// if start fails after waiting for a reset, it means that another thread
+	// beat this thread to the start
 	if !cf.runState.Start() {
 		log.Warningf("Run watcher already running for file: %s", cf.file)
 		return
@@ -337,7 +343,9 @@ func (cf *ConfigFile) onFileChange(changeType ChangeType, newData []byte) {
 	copy(toNotify, cf.watchers)
 	cf.watchLock.Unlock()
 
-	sort.Sort(byPriority(toNotify))
+	sort.SliceStable(toNotify, func(i, j int) bool {
+		return toNotify[i].priority < toNotify[j].priority
+	})
 
 	for _, handler := range toNotify {
 		handler.handler(changeType, newData)
@@ -354,10 +362,3 @@ type pHandler struct {
 	handler  ConfigChangedHandler
 	priority int
 }
-
-// byPriority is an implementation of sort.Interface to allow sorting a slice of pHandlers by priority
-type byPriority []*pHandler
-
-func (b byPriority) Len() int           { return len(b) }
-func (b byPriority) Less(i, j int) bool { return b[i].priority < b[j].priority }
-func (b byPriority) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }

+ 26 - 3
pkg/util/atomic/atomicrunstate.go

@@ -1,6 +1,8 @@
 package atomic
 
-import "sync"
+import (
+	"sync"
+)
 
 // AtomicRunState can be used to provide thread-safe start/stop functionality to internal run-loops
 // inside a goroutine.
@@ -8,6 +10,7 @@ type AtomicRunState struct {
 	lock     sync.Mutex
 	stopping bool
 	stop     chan struct{}
+	reset    chan struct{}
 }
 
 // Start checks for an existing run state and returns false if the run state has already started. If
@@ -35,14 +38,18 @@ func (ars *AtomicRunState) OnStop() <-chan struct{} {
 }
 
 // Stops closes the stop channel triggering any selects waiting for OnStop()
-func (ars *AtomicRunState) Stop() {
+func (ars *AtomicRunState) Stop() bool {
 	ars.lock.Lock()
 	defer ars.lock.Unlock()
 
 	if !ars.stopping && ars.stop != nil {
 		ars.stopping = true
+		ars.reset = make(chan struct{}, 1)
 		close(ars.stop)
+		return true
 	}
+
+	return false
 }
 
 // Reset should be called in the select case for OnStop(). Note that calling Reset() prior to
@@ -51,14 +58,30 @@ func (ars *AtomicRunState) Reset() {
 	ars.lock.Lock()
 	defer ars.lock.Unlock()
 
+	close(ars.reset)
 	ars.stopping = false
 	ars.stop = nil
 }
 
-// IsRunning returns true if metric recording is running.
+// IsRunning returns true if metric recording is running or in the process of stopping.
 func (ars *AtomicRunState) IsRunning() bool {
 	ars.lock.Lock()
 	defer ars.lock.Unlock()
 
 	return ars.stop != nil
 }
+
+// IsStopping returns true if the run state has been stopped, but not yet reset.
+func (ars *AtomicRunState) IsStopping() bool {
+	ars.lock.Lock()
+	defer ars.lock.Unlock()
+
+	return ars.stopping && ars.stop != nil
+}
+
+// WaitForStop will wait for a stop to occur IFF the run state is in the process of stopping.
+func (ars *AtomicRunState) WaitForReset() {
+	if ars.IsStopping() {
+		<-ars.reset
+	}
+}

+ 110 - 0
pkg/util/atomic/atomicrunstate_test.go

@@ -1,6 +1,7 @@
 package atomic
 
 import (
+	"sync"
 	"testing"
 	"time"
 )
@@ -9,6 +10,8 @@ import (
 // NOTE: Testing concurrency is hard, so if there are inconsistent results, make sure it's not just the timing
 // NOTE: of the test on the testing hardware.
 func TestRunState(t *testing.T) {
+	t.Parallel()
+
 	var ars AtomicRunState
 
 	if !ars.Start() {
@@ -54,3 +57,110 @@ func TestRunState(t *testing.T) {
 		t.Fatalf("Executed too many work cycles, expected 1 cycle")
 	}
 }
+
+// leaks goroutines potentially, so only use in testing!
+func waitChannelFor(wg *sync.WaitGroup) chan bool {
+	ch := make(chan bool)
+	go func() {
+		wg.Wait()
+		ch <- true
+	}()
+	return ch
+}
+
+func TestDoubleWait(t *testing.T) {
+	t.Parallel()
+
+	var ars AtomicRunState
+
+	ars.WaitForReset()
+
+	if !ars.Start() {
+		t.Fatalf("Failed to Start() AtomicRunState")
+	}
+
+	if ars.Start() {
+		t.Fatalf("Started AtomicRunState a second time")
+	}
+
+	var wg sync.WaitGroup
+	wg.Add(2)
+
+	go func() {
+		t.Logf("GoRoutine 1 Waiting....")
+		<-ars.OnStop()
+		wg.Done()
+	}()
+
+	go func() {
+		t.Logf("GoRoutine 2 Waiting....")
+		<-ars.OnStop()
+		wg.Done()
+	}()
+
+	time.Sleep(1 * time.Second)
+	ars.Stop()
+	select {
+	case <-time.After(time.Second):
+		t.Fatalf("Did not receive signal from both go routines after a second\n")
+		return
+	case <-waitChannelFor(&wg):
+		t.Logf("Received signals from both go routines\n")
+	}
+	ars.Reset()
+}
+
+func TestContinuousConcurrentStartsAndStops(t *testing.T) {
+	t.Parallel()
+
+	const cycles = 5
+
+	var ars AtomicRunState
+	started := make(chan bool)
+
+	var wg sync.WaitGroup
+	wg.Add(cycles)
+
+	// continuously try and start the ars on a tight loop
+	// throttled by OnStop and WaitForReset()
+	go func() {
+		firstCycle := true
+		for {
+			ars.WaitForReset()
+			if ars.Start() {
+				t.Logf("Started")
+				if firstCycle {
+					firstCycle = false
+					started <- true
+				}
+				wg.Done()
+			}
+
+			<-ars.OnStop()
+			t.Logf("Stopped")
+		}
+	}()
+
+	// wait for an initial start
+	<-started
+
+	// Loop Stop/Resets from other goroutines
+	go func() {
+		for {
+			time.Sleep(100 * time.Millisecond)
+			if ars.Stop() {
+				<-ars.OnStop()
+				time.Sleep(500 * time.Millisecond)
+				ars.Reset()
+			}
+		}
+	}()
+
+	// Wait for full cycles
+	select {
+	case <-time.After(5 * time.Second):
+		t.Fatalf("Didn't complete %d cycles after 10 seconds", cycles)
+	case <-waitChannelFor(&wg):
+		t.Logf("Completed!")
+	}
+}