wait.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wait
  14. import (
  15. "context"
  16. "math/rand"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/runtime"
  20. )
  21. // For any test of the style:
  22. //
  23. // ...
  24. // <- time.After(timeout):
  25. // t.Errorf("Timed out")
  26. //
  27. // The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
  28. // is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
  29. // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
  30. var ForeverTestTimeout = time.Second * 30
  31. // NeverStop may be passed to Until to make it never stop.
  32. var NeverStop <-chan struct{} = make(chan struct{})
  33. // Group allows to start a group of goroutines and wait for their completion.
  34. type Group struct {
  35. wg sync.WaitGroup
  36. }
  37. func (g *Group) Wait() {
  38. g.wg.Wait()
  39. }
  40. // StartWithChannel starts f in a new goroutine in the group.
  41. // stopCh is passed to f as an argument. f should stop when stopCh is available.
  42. func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) {
  43. g.Start(func() {
  44. f(stopCh)
  45. })
  46. }
  47. // StartWithContext starts f in a new goroutine in the group.
  48. // ctx is passed to f as an argument. f should stop when ctx.Done() is available.
  49. func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) {
  50. g.Start(func() {
  51. f(ctx)
  52. })
  53. }
  54. // Start starts f in a new goroutine in the group.
  55. func (g *Group) Start(f func()) {
  56. g.wg.Add(1)
  57. go func() {
  58. defer g.wg.Done()
  59. f()
  60. }()
  61. }
  62. // Forever calls f every period for ever.
  63. //
  64. // Forever is syntactic sugar on top of Until.
  65. func Forever(f func(), period time.Duration) {
  66. Until(f, period, NeverStop)
  67. }
  68. // jitterRand is a dedicated random source for jitter calculations.
  69. // It defaults to rand.Float64, but is a package variable so it can be overridden to make unit tests deterministic.
  70. var jitterRand = rand.Float64
  71. // Jitter returns a time.Duration between duration and duration + maxFactor *
  72. // duration.
  73. //
  74. // This allows clients to avoid converging on periodic behavior. If maxFactor
  75. // is 0.0, a suggested default value will be chosen.
  76. func Jitter(duration time.Duration, maxFactor float64) time.Duration {
  77. if maxFactor <= 0.0 {
  78. maxFactor = 1.0
  79. }
  80. wait := duration + time.Duration(jitterRand()*maxFactor*float64(duration))
  81. return wait
  82. }
  83. // ConditionFunc returns true if the condition is satisfied, or an error
  84. // if the loop should be aborted.
  85. type ConditionFunc func() (done bool, err error)
  86. // ConditionWithContextFunc returns true if the condition is satisfied, or an error
  87. // if the loop should be aborted.
  88. //
  89. // The caller passes along a context that can be used by the condition function.
  90. type ConditionWithContextFunc func(context.Context) (done bool, err error)
  91. // WithContext converts a ConditionFunc into a ConditionWithContextFunc
  92. func (cf ConditionFunc) WithContext() ConditionWithContextFunc {
  93. return func(context.Context) (done bool, err error) {
  94. return cf()
  95. }
  96. }
  97. // ContextForChannel provides a context that will be treated as cancelled
  98. // when the provided parentCh is closed. The implementation returns
  99. // context.Canceled for Err() if and only if the parentCh is closed.
  100. func ContextForChannel(parentCh <-chan struct{}) context.Context {
  101. return channelContext{stopCh: parentCh}
  102. }
  103. var _ context.Context = channelContext{}
  104. // channelContext will behave as if the context were cancelled when stopCh is
  105. // closed.
  106. type channelContext struct {
  107. stopCh <-chan struct{}
  108. }
  109. func (c channelContext) Done() <-chan struct{} { return c.stopCh }
  110. func (c channelContext) Err() error {
  111. select {
  112. case <-c.stopCh:
  113. return context.Canceled
  114. default:
  115. return nil
  116. }
  117. }
  118. func (c channelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
  119. func (c channelContext) Value(key any) any { return nil }
  120. // runConditionWithCrashProtection runs a ConditionFunc with crash protection.
  121. //
  122. // Deprecated: Will be removed when the legacy polling methods are removed.
  123. func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) {
  124. //nolint:logcheck // Already deprecated.
  125. defer runtime.HandleCrash()
  126. return condition()
  127. }
  128. // runConditionWithCrashProtectionWithContext runs a ConditionWithContextFunc
  129. // with crash protection.
  130. //
  131. // Deprecated: Will be removed when the legacy polling methods are removed.
  132. func runConditionWithCrashProtectionWithContext(ctx context.Context, condition ConditionWithContextFunc) (bool, error) {
  133. defer runtime.HandleCrashWithContext(ctx)
  134. return condition(ctx)
  135. }
  136. // waitFunc creates a channel that receives an item every time a test
  137. // should be executed and is closed when the last test should be invoked.
  138. //
  139. // Deprecated: Will be removed in a future release in favor of
  140. // loopConditionUntilContext.
  141. type waitFunc func(done <-chan struct{}) <-chan struct{}
  142. // WithContext converts the WaitFunc to an equivalent WaitWithContextFunc
  143. func (w waitFunc) WithContext() waitWithContextFunc {
  144. return func(ctx context.Context) <-chan struct{} {
  145. return w(ctx.Done())
  146. }
  147. }
  148. // waitWithContextFunc creates a channel that receives an item every time a test
  149. // should be executed and is closed when the last test should be invoked.
  150. //
  151. // When the specified context gets cancelled or expires the function
  152. // stops sending item and returns immediately.
  153. //
  154. // Deprecated: Will be removed in a future release in favor of
  155. // loopConditionUntilContext.
  156. type waitWithContextFunc func(ctx context.Context) <-chan struct{}
  157. // waitForWithContext continually checks 'fn' as driven by 'wait'.
  158. //
  159. // waitForWithContext gets a channel from 'wait()”, and then invokes 'fn'
  160. // once for every value placed on the channel and once more when the
  161. // channel is closed. If the channel is closed and 'fn'
  162. // returns false without error, waitForWithContext returns ErrWaitTimeout.
  163. //
  164. // If 'fn' returns an error the loop ends and that error is returned. If
  165. // 'fn' returns true the loop ends and nil is returned.
  166. //
  167. // context.Canceled will be returned if the ctx.Done() channel is closed
  168. // without fn ever returning true.
  169. //
  170. // When the ctx.Done() channel is closed, because the golang `select` statement is
  171. // "uniform pseudo-random", the `fn` might still run one or multiple times,
  172. // though eventually `waitForWithContext` will return.
  173. //
  174. // Deprecated: Will be removed in a future release in favor of
  175. // loopConditionUntilContext.
  176. func waitForWithContext(ctx context.Context, wait waitWithContextFunc, fn ConditionWithContextFunc) error {
  177. waitCtx, cancel := context.WithCancel(context.Background())
  178. defer cancel()
  179. c := wait(waitCtx)
  180. for {
  181. select {
  182. case _, open := <-c:
  183. ok, err := runConditionWithCrashProtectionWithContext(ctx, fn)
  184. if err != nil {
  185. return err
  186. }
  187. if ok {
  188. return nil
  189. }
  190. if !open {
  191. return ErrWaitTimeout
  192. }
  193. case <-ctx.Done():
  194. // returning ctx.Err() will break backward compatibility, use new PollUntilContext*
  195. // methods instead
  196. return ErrWaitTimeout
  197. }
  198. }
  199. }