Przeglądaj źródła

Fix bug where AtomicRunState allowed waiting on a nil stop channel.

Matt Bolt 1 rok temu
rodzic
commit
0b3e1f8ada

+ 21 - 0
core/pkg/util/atomic/atomicrunstate.go

@@ -11,6 +11,9 @@ type AtomicRunState struct {
 	stopping bool
 	stop     chan struct{}
 	reset    chan struct{}
+
+	// buffer contains channels that are returned before Start() is called.
+	stopBuffer []chan struct{}
 }
 
 // Start checks for an existing run state and returns false if the run state has already started. If
@@ -24,6 +27,18 @@ func (ars *AtomicRunState) Start() bool {
 	}
 
 	ars.stop = make(chan struct{})
+
+	// if there are any channels in the buffer, assign them a wait routine on
+	// the stop channel.
+	for _, ch := range ars.stopBuffer {
+		go func(ch chan struct{}) {
+			defer close(ch)
+
+			<-ars.stop
+		}(ch)
+	}
+	ars.stopBuffer = nil
+
 	return true
 }
 
@@ -34,6 +49,12 @@ func (ars *AtomicRunState) OnStop() <-chan struct{} {
 	ars.lock.Lock()
 	defer ars.lock.Unlock()
 
+	if ars.stop == nil {
+		ch := make(chan struct{})
+		ars.stopBuffer = append(ars.stopBuffer, ch)
+		return ch
+	}
+
 	return ars.stop
 }
 

+ 70 - 21
core/pkg/util/atomic/atomicrunstate_test.go

@@ -1,6 +1,7 @@
 package atomic
 
 import (
+	"fmt"
 	"sync"
 	"testing"
 	"time"
@@ -124,46 +125,49 @@ func TestContinuousConcurrentStartsAndStops(t *testing.T) {
 	// continuously try and start the ars on a tight loop
 	// throttled by OnStop and WaitForReset()
 	go func() {
-		defer func() {
-			if e := recover(); e != nil {
-				// sometimes the waitgroup will hit a negative value at the end of the test
-				// this is ok given the way the test behaves (chaos star/stop calls), so
-				// we can safely ignore.
-			}
-		}()
-
-		firstCycle := true
-		for {
+		c := cycles
+		for c > 0 {
 			ars.WaitForReset()
 			if ars.Start() {
 				t.Logf("Started")
-				if firstCycle {
-					firstCycle = false
+				if c == cycles {
 					started <- true
 				}
-				wg.Done()
+				c--
 			}
-
-			<-ars.OnStop()
-			t.Logf("Stopped")
 		}
 	}()
 
 	// wait for an initial start
 	<-started
 
-	// Loop Stop/Resets from other goroutines
+	// Loop Stop from other goroutines
 	go func() {
-		for {
+		c := cycles
+		for c > 0 {
 			time.Sleep(100 * time.Millisecond)
 			if ars.Stop() {
-				<-ars.OnStop()
-				time.Sleep(500 * time.Millisecond)
-				ars.Reset()
+				t.Logf("Wait for stop")
+				c--
 			}
 		}
 	}()
 
+	// Loop OnStop and Resets
+	go func() {
+		c := cycles
+
+		time.Sleep(150 * time.Millisecond)
+		for c > 0 {
+			<-ars.OnStop()
+			t.Logf("Stopped")
+			time.Sleep(500 * time.Millisecond)
+			ars.Reset()
+			c--
+			wg.Done()
+		}
+	}()
+
 	// Wait for full cycles
 	select {
 	case <-time.After(5 * time.Second):
@@ -172,3 +176,48 @@ func TestContinuousConcurrentStartsAndStops(t *testing.T) {
 		t.Logf("Completed!")
 	}
 }
+
+func TestStopChannelWhenStopped(t *testing.T) {
+	t.Parallel()
+
+	// This scenario is a bit odd, but there was a bug where waiting on `OnStop()`
+	// before the run state is started will indefinitely block. The problem is resolved by
+	// buffering the stop channel with intermediate channels until Start() is called.
+
+	var ars AtomicRunState
+
+	finished := make(chan struct{})
+	errors := make(chan error)
+
+	go func() {
+		<-ars.OnStop()
+		t.Logf("Stopped")
+		finished <- struct{}{}
+	}()
+
+	// wait a bit, then start and stop the run state -- the OnStop
+	// channel should complete.
+	go func() {
+		time.Sleep(1 * time.Second)
+		ars.WaitForReset()
+
+		if !ars.Start() {
+			errors <- fmt.Errorf("Failed to Start() AtomicRunState")
+		}
+		time.Sleep(500 * time.Millisecond)
+
+		if !ars.Stop() {
+			errors <- fmt.Errorf("Failed to Stop() AtomicRunState")
+		}
+	}()
+
+	select {
+	case <-time.After(5 * time.Second):
+		t.Fatalf("Didn't complete after 5 seconds")
+	case e := <-errors:
+		t.Fatalf("Received error from goroutine: %s", e)
+	case <-finished:
+		t.Logf("Completed!")
+	}
+
+}