Ver código fonte

Add an AtomicRunState utility which provides a basic start/stop state tracking with a selectable channel for goroutine loops.
Use AtomicRunState in configfile.

Matt Bolt 4 anos atrás
pai
commit
0f19e19cbf

+ 6 - 12
pkg/config/configfile.go

@@ -10,6 +10,7 @@ import (
 	"github.com/google/uuid"
 	"github.com/kubecost/cost-model/pkg/log"
 	"github.com/kubecost/cost-model/pkg/storage"
+	"github.com/kubecost/cost-model/pkg/util/atomic"
 )
 
 // HandlerID is a unique identifier assigned to a provided ConfigChangedHandler. This is used to remove a handler
@@ -58,7 +59,7 @@ type ConfigFile struct {
 	data       []byte
 	watchLock  *sync.Mutex
 	watchers   []*pHandler
-	watchStop  chan struct{}
+	runState   atomic.AtomicRunState
 	lastChange time.Time
 }
 
@@ -71,7 +72,6 @@ func NewConfigFile(store storage.Storage, file string) *ConfigFile {
 		dataLock:  new(sync.Mutex),
 		data:      nil,
 		watchLock: new(sync.Mutex),
-		watchStop: nil,
 	}
 }
 
@@ -237,13 +237,11 @@ func (cf *ConfigFile) RemoveAllHandlers() {
 // runWatcher creates a go routine which will poll the stat of a storage target on a specific
 // interval and dispatch created, modified, and deleted events for that file.
 func (cf *ConfigFile) runWatcher() {
-	if cf.watchStop != nil {
+	if !cf.runState.Start() {
 		log.Warningf("Run watcher already running for file: %s", cf.file)
 		return
 	}
 
-	cf.watchStop = make(chan struct{})
-
 	go func() {
 		first := true
 
@@ -253,7 +251,8 @@ func (cf *ConfigFile) runWatcher() {
 		for {
 			// Each iteration, check for the stop trigger, or wait 10 seconds
 			select {
-			case <-cf.watchStop:
+			case <-cf.runState.OnStop():
+				cf.runState.Reset()
 				return
 			case <-time.After(10 * time.Second):
 			}
@@ -318,12 +317,7 @@ func (cf *ConfigFile) runWatcher() {
 // stopWatcher closes the stop channel, returning from the runWatcher go routine. Allows us
 // to remove any polling stat checks on files when there are no change handlers.
 func (cf *ConfigFile) stopWatcher() {
-	if cf.watchStop == nil {
-		return
-	}
-
-	close(cf.watchStop)
-	cf.watchStop = nil
+	cf.runState.Stop()
 }
 
 // onFileChange is internally called when the core watcher recognizes a change in the ConfigFile. This

+ 64 - 0
pkg/util/atomic/atomicrunstate.go

@@ -0,0 +1,64 @@
+package atomic
+
+import "sync"
+
+// AtomicRunState can be used to provide thread-safe start/stop functionality to internal run-loops
+// inside a goroutine.
+type AtomicRunState struct {
+	lock     sync.Mutex
+	stopping bool
+	stop     chan struct{}
+}
+
+// Start checks for an existing run state and returns false if the run state has already started. If
+// the run state has not started, then it will advance to the started state and return true.
+func (ars *AtomicRunState) Start() bool {
+	ars.lock.Lock()
+	defer ars.lock.Unlock()
+
+	if ars.stop != nil {
+		return false
+	}
+
+	ars.stop = make(chan struct{}, 1)
+	return true
+}
+
+// OnStop returns a channel that should be used within a select goroutine run loop. It is set to signal
+// whenever Stop() is executed. Once the channel is signaled, Reset() should be called if the runstate
+// is to be used again.
+func (ars *AtomicRunState) OnStop() <-chan struct{} {
+	ars.lock.Lock()
+	defer ars.lock.Unlock()
+
+	return ars.stop
+}
+
+// Stops closes the stop channel triggering any selects waiting for OnStop()
+func (ars *AtomicRunState) Stop() {
+	ars.lock.Lock()
+	defer ars.lock.Unlock()
+
+	if !ars.stopping && ars.stop != nil {
+		ars.stopping = true
+		close(ars.stop)
+	}
+}
+
+// Reset should be called in the select case for OnStop(). Note that calling Reset() prior to
+// selecting OnStop() will result in failed Stop signal receive.
+func (ars *AtomicRunState) Reset() {
+	ars.lock.Lock()
+	defer ars.lock.Unlock()
+
+	ars.stopping = false
+	ars.stop = nil
+}
+
+// IsRunning returns true if metric recording is running.
+func (ars *AtomicRunState) IsRunning() bool {
+	ars.lock.Lock()
+	defer ars.lock.Unlock()
+
+	return ars.stop != nil
+}

+ 56 - 0
pkg/util/atomic/atomicrunstate_test.go

@@ -0,0 +1,56 @@
+package atomic
+
+import (
+	"testing"
+	"time"
+)
+
+// NOTE: This test uses time.Sleep() in an attempt to specifically schedule concurrent actions for testing
+// NOTE: Testing concurrency is hard, so if there are inconsistent results, make sure it's not just the timing
+// NOTE: of the test on the testing hardware.
+func TestRunState(t *testing.T) {
+	var ars AtomicRunState
+
+	if !ars.Start() {
+		t.Fatalf("Failed to Start() AtomicRunState")
+	}
+
+	if ars.Start() {
+		t.Fatalf("Started AtomicRunState a second time")
+	}
+
+	success := make(chan bool)
+
+	go func() {
+		cycles := 0
+		for {
+			// Our test expects exactly 1 cycle, so if we exceed that, we fail!
+			if cycles >= 2 {
+				success <- false
+				return
+			}
+			// create a "work" time before the select
+			time.Sleep(1 * time.Second)
+
+			select {
+			case <-ars.OnStop():
+				t.Logf("Stopped\n")
+				ars.Reset()
+				success <- true
+				return
+			case <-time.After(2 * time.Second):
+				t.Logf("Tick\n")
+			}
+			cycles++
+		}
+	}()
+
+	// Wait for one full work cycle (3 seconds), attempt Stop during "work" phase
+	time.Sleep(3500 * time.Millisecond)
+	ars.Stop()
+
+	result := <-success
+	if !result {
+		t.Fatalf("Executed too many work cycles, expected 1 cycle")
+	}
+}