atomicrunstate_test.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package atomic
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. "time"
  7. )
  8. // NOTE: This test uses time.Sleep() in an attempt to specifically schedule concurrent actions for testing
  9. // NOTE: Testing concurrency is hard, so if there are inconsistent results, make sure it's not just the timing
  10. // NOTE: of the test on the testing hardware.
  11. func TestRunState(t *testing.T) {
  12. t.Parallel()
  13. var ars AtomicRunState
  14. if !ars.Start() {
  15. t.Fatalf("Failed to Start() AtomicRunState")
  16. }
  17. if ars.Start() {
  18. t.Fatalf("Started AtomicRunState a second time")
  19. }
  20. success := make(chan bool)
  21. go func() {
  22. cycles := 0
  23. for {
  24. // Our test expects exactly 1 cycle, so if we exceed that, we fail!
  25. if cycles >= 2 {
  26. success <- false
  27. return
  28. }
  29. // create a "work" time before the select
  30. time.Sleep(1 * time.Second)
  31. select {
  32. case <-ars.OnStop():
  33. t.Logf("Stopped\n")
  34. ars.Reset()
  35. success <- true
  36. return
  37. case <-time.After(2 * time.Second):
  38. t.Logf("Tick\n")
  39. }
  40. cycles++
  41. }
  42. }()
  43. // Wait for one full work cycle (3 seconds), attempt Stop during "work" phase
  44. time.Sleep(3500 * time.Millisecond)
  45. ars.Stop()
  46. result := <-success
  47. if !result {
  48. t.Fatalf("Executed too many work cycles, expected 1 cycle")
  49. }
  50. }
  51. // leaks goroutines potentially, so only use in testing!
  52. func waitChannelFor(wg *sync.WaitGroup) chan bool {
  53. ch := make(chan bool)
  54. go func() {
  55. wg.Wait()
  56. ch <- true
  57. }()
  58. return ch
  59. }
  60. func TestDoubleWait(t *testing.T) {
  61. t.Parallel()
  62. var ars AtomicRunState
  63. ars.WaitForReset()
  64. if !ars.Start() {
  65. t.Fatalf("Failed to Start() AtomicRunState")
  66. }
  67. if ars.Start() {
  68. t.Fatalf("Started AtomicRunState a second time")
  69. }
  70. var wg sync.WaitGroup
  71. wg.Add(2)
  72. go func() {
  73. t.Logf("GoRoutine 1 Waiting....")
  74. <-ars.OnStop()
  75. wg.Done()
  76. }()
  77. go func() {
  78. t.Logf("GoRoutine 2 Waiting....")
  79. <-ars.OnStop()
  80. wg.Done()
  81. }()
  82. time.Sleep(1 * time.Second)
  83. ars.Stop()
  84. select {
  85. case <-time.After(time.Second):
  86. t.Fatalf("Did not receive signal from both go routines after a second\n")
  87. return
  88. case <-waitChannelFor(&wg):
  89. t.Logf("Received signals from both go routines\n")
  90. }
  91. ars.Reset()
  92. }
  93. func TestContinuousConcurrentStartsAndStops(t *testing.T) {
  94. t.Parallel()
  95. const cycles = 5
  96. var ars AtomicRunState
  97. started := make(chan bool)
  98. var wg sync.WaitGroup
  99. wg.Add(cycles)
  100. // continuously try and start the ars on a tight loop
  101. // throttled by OnStop and WaitForReset()
  102. go func() {
  103. c := cycles
  104. for c > 0 {
  105. ars.WaitForReset()
  106. if ars.Start() {
  107. t.Logf("Started")
  108. if c == cycles {
  109. started <- true
  110. }
  111. c--
  112. }
  113. }
  114. }()
  115. // wait for an initial start
  116. <-started
  117. // Loop Stop from other goroutines
  118. go func() {
  119. c := cycles
  120. for c > 0 {
  121. time.Sleep(100 * time.Millisecond)
  122. if ars.Stop() {
  123. t.Logf("Wait for stop")
  124. c--
  125. }
  126. }
  127. }()
  128. // Loop OnStop and Resets
  129. go func() {
  130. c := cycles
  131. time.Sleep(150 * time.Millisecond)
  132. for c > 0 {
  133. <-ars.OnStop()
  134. t.Logf("Stopped")
  135. time.Sleep(500 * time.Millisecond)
  136. ars.Reset()
  137. c--
  138. wg.Done()
  139. }
  140. }()
  141. // Wait for full cycles
  142. select {
  143. case <-time.After(5 * time.Second):
  144. t.Fatalf("Didn't complete %d cycles after 10 seconds", cycles)
  145. case <-waitChannelFor(&wg):
  146. t.Logf("Completed!")
  147. }
  148. }
  149. func TestStopChannelWhenStopped(t *testing.T) {
  150. t.Parallel()
  151. // This scenario is a bit odd, but there was a bug where waiting on `OnStop()`
  152. // before the run state is started will indefinitely block. The problem is resolved by
  153. // buffering the stop channel with intermediate channels until Start() is called.
  154. var ars AtomicRunState
  155. finished := make(chan struct{})
  156. errors := make(chan error)
  157. go func() {
  158. <-ars.OnStop()
  159. t.Logf("Stopped")
  160. finished <- struct{}{}
  161. }()
  162. // wait a bit, then start and stop the run state -- the OnStop
  163. // channel should complete.
  164. go func() {
  165. time.Sleep(1 * time.Second)
  166. ars.WaitForReset()
  167. if !ars.Start() {
  168. errors <- fmt.Errorf("Failed to Start() AtomicRunState")
  169. }
  170. time.Sleep(500 * time.Millisecond)
  171. if !ars.Stop() {
  172. errors <- fmt.Errorf("Failed to Stop() AtomicRunState")
  173. }
  174. }()
  175. select {
  176. case <-time.After(5 * time.Second):
  177. t.Fatalf("Didn't complete after 5 seconds")
  178. case e := <-errors:
  179. t.Fatalf("Received error from goroutine: %s", e)
  180. case <-finished:
  181. t.Logf("Completed!")
  182. }
  183. }