atomicrunstate_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package atomic
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. )
  7. // NOTE: This test uses time.Sleep() in an attempt to specifically schedule concurrent actions for testing
  8. // NOTE: Testing concurrency is hard, so if there are inconsistent results, make sure it's not just the timing
  9. // NOTE: of the test on the testing hardware.
  10. func TestRunState(t *testing.T) {
  11. t.Parallel()
  12. var ars AtomicRunState
  13. if !ars.Start() {
  14. t.Fatalf("Failed to Start() AtomicRunState")
  15. }
  16. if ars.Start() {
  17. t.Fatalf("Started AtomicRunState a second time")
  18. }
  19. success := make(chan bool)
  20. go func() {
  21. cycles := 0
  22. for {
  23. // Our test expects exactly 1 cycle, so if we exceed that, we fail!
  24. if cycles >= 2 {
  25. success <- false
  26. return
  27. }
  28. // create a "work" time before the select
  29. time.Sleep(1 * time.Second)
  30. select {
  31. case <-ars.OnStop():
  32. t.Logf("Stopped\n")
  33. ars.Reset()
  34. success <- true
  35. return
  36. case <-time.After(2 * time.Second):
  37. t.Logf("Tick\n")
  38. }
  39. cycles++
  40. }
  41. }()
  42. // Wait for one full work cycle (3 seconds), attempt Stop during "work" phase
  43. time.Sleep(3500 * time.Millisecond)
  44. ars.Stop()
  45. result := <-success
  46. if !result {
  47. t.Fatalf("Executed too many work cycles, expected 1 cycle")
  48. }
  49. }
  50. // leaks goroutines potentially, so only use in testing!
  51. func waitChannelFor(wg *sync.WaitGroup) chan bool {
  52. ch := make(chan bool)
  53. go func() {
  54. wg.Wait()
  55. ch <- true
  56. }()
  57. return ch
  58. }
  59. func TestDoubleWait(t *testing.T) {
  60. t.Parallel()
  61. var ars AtomicRunState
  62. ars.WaitForReset()
  63. if !ars.Start() {
  64. t.Fatalf("Failed to Start() AtomicRunState")
  65. }
  66. if ars.Start() {
  67. t.Fatalf("Started AtomicRunState a second time")
  68. }
  69. var wg sync.WaitGroup
  70. wg.Add(2)
  71. go func() {
  72. t.Logf("GoRoutine 1 Waiting....")
  73. <-ars.OnStop()
  74. wg.Done()
  75. }()
  76. go func() {
  77. t.Logf("GoRoutine 2 Waiting....")
  78. <-ars.OnStop()
  79. wg.Done()
  80. }()
  81. time.Sleep(1 * time.Second)
  82. ars.Stop()
  83. select {
  84. case <-time.After(time.Second):
  85. t.Fatalf("Did not receive signal from both go routines after a second\n")
  86. return
  87. case <-waitChannelFor(&wg):
  88. t.Logf("Received signals from both go routines\n")
  89. }
  90. ars.Reset()
  91. }
  92. func TestContinuousConcurrentStartsAndStops(t *testing.T) {
  93. t.Parallel()
  94. const cycles = 5
  95. var ars AtomicRunState
  96. started := make(chan bool)
  97. var wg sync.WaitGroup
  98. wg.Add(cycles)
  99. // continuously try and start the ars on a tight loop
  100. // throttled by OnStop and WaitForReset()
  101. go func() {
  102. defer func() {
  103. if e := recover(); e != nil {
  104. // sometimes the waitgroup will hit a negative value at the end of the test
  105. // this is ok given the way the test behaves (chaos star/stop calls), so
  106. // we can safely ignore.
  107. }
  108. }()
  109. firstCycle := true
  110. for {
  111. ars.WaitForReset()
  112. if ars.Start() {
  113. t.Logf("Started")
  114. if firstCycle {
  115. firstCycle = false
  116. started <- true
  117. }
  118. wg.Done()
  119. }
  120. <-ars.OnStop()
  121. t.Logf("Stopped")
  122. }
  123. }()
  124. // wait for an initial start
  125. <-started
  126. // Loop Stop/Resets from other goroutines
  127. go func() {
  128. for {
  129. time.Sleep(100 * time.Millisecond)
  130. if ars.Stop() {
  131. <-ars.OnStop()
  132. time.Sleep(500 * time.Millisecond)
  133. ars.Reset()
  134. }
  135. }
  136. }()
  137. // Wait for full cycles
  138. select {
  139. case <-time.After(5 * time.Second):
  140. t.Fatalf("Didn't complete %d cycles after 10 seconds", cycles)
  141. case <-waitChannelFor(&wg):
  142. t.Logf("Completed!")
  143. }
  144. }