atomicrunstate.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package atomic
  2. import (
  3. "sync"
  4. )
  5. // AtomicRunState can be used to provide thread-safe start/stop functionality to internal run-loops
  6. // inside a goroutine.
  7. type AtomicRunState struct {
  8. lock sync.Mutex
  9. stopping bool
  10. stop chan struct{}
  11. reset chan struct{}
  12. // buffer contains channels that are returned before Start() is called.
  13. stopBuffer []chan struct{}
  14. }
  15. // Start checks for an existing run state and returns false if the run state has already started. If
  16. // the run state has not started, then it will advance to the started state and return true.
  17. func (ars *AtomicRunState) Start() bool {
  18. ars.lock.Lock()
  19. defer ars.lock.Unlock()
  20. if ars.stop != nil {
  21. return false
  22. }
  23. ars.stop = make(chan struct{})
  24. // if there are any channels in the buffer, assign them a wait routine on
  25. // the stop channel.
  26. for _, ch := range ars.stopBuffer {
  27. go func(ch chan struct{}) {
  28. defer close(ch)
  29. <-ars.stop
  30. }(ch)
  31. }
  32. ars.stopBuffer = nil
  33. return true
  34. }
  35. // OnStop returns a channel that should be used within a select goroutine run loop. It is set to signal
  36. // whenever Stop() is executed. Once the channel is signaled, Reset() should be called if the runstate
  37. // is to be used again.
  38. func (ars *AtomicRunState) OnStop() <-chan struct{} {
  39. ars.lock.Lock()
  40. defer ars.lock.Unlock()
  41. if ars.stop == nil {
  42. ch := make(chan struct{})
  43. ars.stopBuffer = append(ars.stopBuffer, ch)
  44. return ch
  45. }
  46. return ars.stop
  47. }
  48. // Stops closes the stop channel triggering any selects waiting for OnStop()
  49. func (ars *AtomicRunState) Stop() bool {
  50. ars.lock.Lock()
  51. defer ars.lock.Unlock()
  52. if !ars.stopping && ars.stop != nil {
  53. ars.stopping = true
  54. ars.reset = make(chan struct{})
  55. close(ars.stop)
  56. return true
  57. }
  58. return false
  59. }
  60. // Reset should be called in the select case for OnStop(). Note that calling Reset() prior to
  61. // selecting OnStop() will result in failed Stop signal receive.
  62. func (ars *AtomicRunState) Reset() {
  63. ars.lock.Lock()
  64. defer ars.lock.Unlock()
  65. close(ars.reset)
  66. ars.stopping = false
  67. ars.stop = nil
  68. }
  69. // IsRunning returns true if the state is running or in the process of stopping.
  70. func (ars *AtomicRunState) IsRunning() bool {
  71. ars.lock.Lock()
  72. defer ars.lock.Unlock()
  73. return ars.stop != nil
  74. }
  75. // IsStopping returns true if the run state has been stopped, but not yet reset.
  76. func (ars *AtomicRunState) IsStopping() bool {
  77. ars.lock.Lock()
  78. defer ars.lock.Unlock()
  79. return ars.stopping && ars.stop != nil
  80. }
  81. // WaitForStop will wait for a stop to occur IFF the run state is in the process of stopping.
  82. func (ars *AtomicRunState) WaitForReset() {
  83. if ars.IsStopping() {
  84. <-ars.reset
  85. }
  86. }