| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package atomic
- import (
- "sync"
- "testing"
- "time"
- )
- // NOTE: This test uses time.Sleep() in an attempt to specifically schedule concurrent actions for testing
- // NOTE: Testing concurrency is hard, so if there are inconsistent results, make sure it's not just the timing
- // NOTE: of the test on the testing hardware.
- func TestRunState(t *testing.T) {
- t.Parallel()
- var ars AtomicRunState
- if !ars.Start() {
- t.Fatalf("Failed to Start() AtomicRunState")
- }
- if ars.Start() {
- t.Fatalf("Started AtomicRunState a second time")
- }
- success := make(chan bool)
- go func() {
- cycles := 0
- for {
- // Our test expects exactly 1 cycle, so if we exceed that, we fail!
- if cycles >= 2 {
- success <- false
- return
- }
- // create a "work" time before the select
- time.Sleep(1 * time.Second)
- select {
- case <-ars.OnStop():
- t.Logf("Stopped\n")
- ars.Reset()
- success <- true
- return
- case <-time.After(2 * time.Second):
- t.Logf("Tick\n")
- }
- cycles++
- }
- }()
- // Wait for one full work cycle (3 seconds), attempt Stop during "work" phase
- time.Sleep(3500 * time.Millisecond)
- ars.Stop()
- result := <-success
- if !result {
- t.Fatalf("Executed too many work cycles, expected 1 cycle")
- }
- }
- // leaks goroutines potentially, so only use in testing!
- func waitChannelFor(wg *sync.WaitGroup) chan bool {
- ch := make(chan bool)
- go func() {
- wg.Wait()
- ch <- true
- }()
- return ch
- }
- func TestDoubleWait(t *testing.T) {
- t.Parallel()
- var ars AtomicRunState
- ars.WaitForReset()
- if !ars.Start() {
- t.Fatalf("Failed to Start() AtomicRunState")
- }
- if ars.Start() {
- t.Fatalf("Started AtomicRunState a second time")
- }
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- t.Logf("GoRoutine 1 Waiting....")
- <-ars.OnStop()
- wg.Done()
- }()
- go func() {
- t.Logf("GoRoutine 2 Waiting....")
- <-ars.OnStop()
- wg.Done()
- }()
- time.Sleep(1 * time.Second)
- ars.Stop()
- select {
- case <-time.After(time.Second):
- t.Fatalf("Did not receive signal from both go routines after a second\n")
- return
- case <-waitChannelFor(&wg):
- t.Logf("Received signals from both go routines\n")
- }
- ars.Reset()
- }
- func TestContinuousConcurrentStartsAndStops(t *testing.T) {
- t.Parallel()
- const cycles = 5
- var ars AtomicRunState
- started := make(chan bool)
- var wg sync.WaitGroup
- wg.Add(cycles)
- // continuously try and start the ars on a tight loop
- // throttled by OnStop and WaitForReset()
- go func() {
- defer func() {
- if e := recover(); e != nil {
- // sometimes the waitgroup will hit a negative value at the end of the test
- // this is ok given the way the test behaves (chaos star/stop calls), so
- // we can safely ignore.
- }
- }()
- firstCycle := true
- for {
- ars.WaitForReset()
- if ars.Start() {
- t.Logf("Started")
- if firstCycle {
- firstCycle = false
- started <- true
- }
- wg.Done()
- }
- <-ars.OnStop()
- t.Logf("Stopped")
- }
- }()
- // wait for an initial start
- <-started
- // Loop Stop/Resets from other goroutines
- go func() {
- for {
- time.Sleep(100 * time.Millisecond)
- if ars.Stop() {
- <-ars.OnStop()
- time.Sleep(500 * time.Millisecond)
- ars.Reset()
- }
- }
- }()
- // Wait for full cycles
- select {
- case <-time.After(5 * time.Second):
- t.Fatalf("Didn't complete %d cycles after 10 seconds", cycles)
- case <-waitChannelFor(&wg):
- t.Logf("Completed!")
- }
- }
|