backoff.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. /*
  2. Copyright 2023 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wait
  14. import (
  15. "context"
  16. "math"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/runtime"
  20. "k8s.io/utils/clock"
  21. )
  22. // Backoff holds parameters applied to a Backoff function.
  23. type Backoff struct {
  24. // The initial duration.
  25. Duration time.Duration
  26. // Duration is multiplied by factor each iteration, if factor is not zero
  27. // and the limits imposed by Steps and Cap have not been reached.
  28. // Should not be negative.
  29. // The jitter does not contribute to the updates to the duration parameter.
  30. Factor float64
  31. // The sleep at each iteration is the duration plus an additional
  32. // amount chosen uniformly at random from the interval between
  33. // zero and `jitter*duration`.
  34. Jitter float64
  35. // The remaining number of iterations in which the duration
  36. // parameter may change (but progress can be stopped earlier by
  37. // hitting the cap). If not positive, the duration is not
  38. // changed. Used for exponential backoff in combination with
  39. // Factor and Cap.
  40. Steps int
  41. // A limit on revised values of the duration parameter. If a
  42. // multiplication by the factor parameter would make the duration
  43. // exceed the cap then the duration is set to the cap and the
  44. // steps parameter is set to zero.
  45. Cap time.Duration
  46. }
  47. // Step returns an amount of time to sleep determined by the original
  48. // Duration and Jitter. The backoff is mutated to update its Steps and
  49. // Duration. A nil Backoff always has a zero-duration step.
  50. func (b *Backoff) Step() time.Duration {
  51. if b == nil {
  52. return 0
  53. }
  54. var nextDuration time.Duration
  55. nextDuration, b.Duration, b.Steps = delay(b.Steps, b.Duration, b.Cap, b.Factor, b.Jitter)
  56. return nextDuration
  57. }
  58. // DelayFunc returns a function that will compute the next interval to
  59. // wait given the arguments in b. It does not mutate the original backoff
  60. // but the function is safe to use only from a single goroutine.
  61. func (b Backoff) DelayFunc() DelayFunc {
  62. steps := b.Steps
  63. duration := b.Duration
  64. cap := b.Cap
  65. factor := b.Factor
  66. jitter := b.Jitter
  67. return func() time.Duration {
  68. var nextDuration time.Duration
  69. // jitter is applied per step and is not cumulative over multiple steps
  70. nextDuration, duration, steps = delay(steps, duration, cap, factor, jitter)
  71. return nextDuration
  72. }
  73. }
  74. // Timer returns a timer implementation appropriate to this backoff's parameters
  75. // for use with wait functions.
  76. func (b Backoff) Timer() Timer {
  77. if b.Steps > 1 || b.Jitter != 0 {
  78. return &variableTimer{new: internalClock.NewTimer, fn: b.DelayFunc()}
  79. }
  80. if b.Duration > 0 {
  81. return &fixedTimer{new: internalClock.NewTicker, interval: b.Duration}
  82. }
  83. return newNoopTimer()
  84. }
  85. // delay implements the core delay algorithm used in this package.
  86. func delay(steps int, duration, cap time.Duration, factor, jitter float64) (_ time.Duration, next time.Duration, nextSteps int) {
  87. // when steps is non-positive, do not alter the base duration
  88. if steps < 1 {
  89. if jitter > 0 {
  90. return Jitter(duration, jitter), duration, 0
  91. }
  92. return duration, duration, 0
  93. }
  94. steps--
  95. // calculate the next step's interval
  96. if factor != 0 {
  97. next = time.Duration(float64(duration) * factor)
  98. if cap > 0 && next > cap {
  99. next = cap
  100. steps = 0
  101. }
  102. } else {
  103. next = duration
  104. }
  105. // add jitter for this step
  106. if jitter > 0 {
  107. duration = Jitter(duration, jitter)
  108. }
  109. return duration, next, steps
  110. }
  111. // DelayWithReset returns a DelayFunc that will return the appropriate next interval to
  112. // wait. Every resetInterval the backoff parameters are reset to their initial state.
  113. // This method is safe to invoke from multiple goroutines, but all calls will advance
  114. // the backoff state when Factor is set. If Factor is zero, this method is the same as
  115. // invoking b.DelayFunc() since Steps has no impact without Factor. If resetInterval is
  116. // zero no backoff will be performed as the same calling DelayFunc with a zero factor
  117. // and steps.
  118. func (b Backoff) DelayWithReset(c clock.Clock, resetInterval time.Duration) DelayFunc {
  119. if b.Factor <= 0 {
  120. return b.DelayFunc()
  121. }
  122. if resetInterval <= 0 {
  123. b.Steps = 0
  124. b.Factor = 0
  125. return b.DelayFunc()
  126. }
  127. return (&backoffManager{
  128. backoff: b,
  129. initialBackoff: b,
  130. resetInterval: resetInterval,
  131. clock: c,
  132. lastStart: c.Now(),
  133. timer: nil,
  134. }).Step
  135. }
  136. // Until loops until stop channel is closed, running f every period.
  137. //
  138. // Until is syntactic sugar on top of JitterUntil with zero jitter factor and
  139. // with sliding = true (which means the timer for period starts after the f
  140. // completes).
  141. //
  142. // Contextual logging: UntilWithContext should be used instead of Until in code which supports contextual logging.
  143. func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
  144. JitterUntil(f, period, 0.0, true, stopCh)
  145. }
  146. // UntilWithContext loops until context is done, running f every period.
  147. //
  148. // UntilWithContext is syntactic sugar on top of JitterUntilWithContext
  149. // with zero jitter factor and with sliding = true (which means the timer
  150. // for period starts after the f completes).
  151. func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
  152. JitterUntilWithContext(ctx, f, period, 0.0, true)
  153. }
  154. // NonSlidingUntil loops until stop channel is closed, running f every
  155. // period.
  156. //
  157. // NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
  158. // factor, with sliding = false (meaning the timer for period starts at the same
  159. // time as the function starts).
  160. //
  161. // Contextual logging: NonSlidingUntilWithContext should be used instead of NonSlidingUntil in code which supports contextual logging.
  162. func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
  163. JitterUntil(f, period, 0.0, false, stopCh)
  164. }
  165. // NonSlidingUntilWithContext loops until context is done, running f every
  166. // period.
  167. //
  168. // NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
  169. // with zero jitter factor, with sliding = false (meaning the timer for period
  170. // starts at the same time as the function starts).
  171. func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
  172. JitterUntilWithContext(ctx, f, period, 0.0, false)
  173. }
  174. // JitterUntil loops until stop channel is closed, running f every period.
  175. //
  176. // If jitterFactor is positive, the period is jittered before every run of f.
  177. // If jitterFactor is not positive, the period is unchanged and not jittered.
  178. //
  179. // If sliding is true, the period is computed after f runs. If it is false then
  180. // period includes the runtime for f.
  181. //
  182. // Close stopCh to stop. f may not be invoked if stop channel is already
  183. // closed. Pass NeverStop to if you don't want it stop.
  184. //
  185. // Contextual logging: JitterUntilWithContext should be used instead of JitterUntil in code which supports contextual logging.
  186. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
  187. BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
  188. }
  189. // JitterUntilWithContext loops until context is done, running f every period.
  190. //
  191. // If jitterFactor is positive, the period is jittered before every run of f.
  192. // If jitterFactor is not positive, the period is unchanged and not jittered.
  193. //
  194. // If sliding is true, the period is computed after f runs. If it is false then
  195. // period includes the runtime for f.
  196. //
  197. // Cancel context to stop. f may not be invoked if context is already done.
  198. func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
  199. BackoffUntilWithContext(ctx, f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding)
  200. }
  201. // BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
  202. //
  203. // If sliding is true, the period is computed after f runs. If it is false then
  204. // period includes the runtime for f.
  205. //
  206. // Contextual logging: BackoffUntilWithContext should be used instead of BackoffUntil in code which supports contextual logging.
  207. func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
  208. BackoffUntilWithContext(ContextForChannel(stopCh), func(context.Context) { f() }, backoff, sliding)
  209. }
  210. // BackoffUntilWithContext loops until context is done, run f every duration given by BackoffManager.
  211. //
  212. // If sliding is true, the period is computed after f runs. If it is false then
  213. // period includes the runtime for f.
  214. func BackoffUntilWithContext(ctx context.Context, f func(ctx context.Context), backoff BackoffManager, sliding bool) {
  215. var t clock.Timer
  216. for {
  217. select {
  218. case <-ctx.Done():
  219. return
  220. default:
  221. }
  222. if !sliding {
  223. t = backoff.Backoff()
  224. }
  225. func() {
  226. defer runtime.HandleCrashWithContext(ctx)
  227. f(ctx)
  228. }()
  229. if sliding {
  230. t = backoff.Backoff()
  231. }
  232. // NOTE: b/c there is no priority selection in golang
  233. // it is possible for this to race, meaning we could
  234. // trigger t.C and stopCh, and t.C select falls through.
  235. // In order to mitigate we re-check stopCh at the beginning
  236. // of every loop to prevent extra executions of f().
  237. select {
  238. case <-ctx.Done():
  239. if !t.Stop() {
  240. <-t.C()
  241. }
  242. return
  243. case <-t.C():
  244. }
  245. }
  246. }
  247. // backoffManager provides simple backoff behavior in a threadsafe manner to a caller.
  248. type backoffManager struct {
  249. backoff Backoff
  250. initialBackoff Backoff
  251. resetInterval time.Duration
  252. clock clock.Clock
  253. lock sync.Mutex
  254. lastStart time.Time
  255. timer clock.Timer
  256. }
  257. // Step returns the expected next duration to wait.
  258. func (b *backoffManager) Step() time.Duration {
  259. b.lock.Lock()
  260. defer b.lock.Unlock()
  261. switch {
  262. case b.resetInterval == 0:
  263. b.backoff = b.initialBackoff
  264. case b.clock.Now().Sub(b.lastStart) > b.resetInterval:
  265. b.backoff = b.initialBackoff
  266. b.lastStart = b.clock.Now()
  267. }
  268. return b.backoff.Step()
  269. }
  270. // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer
  271. // for exponential backoff. The returned timer must be drained before calling Backoff() the second
  272. // time.
  273. func (b *backoffManager) Backoff() clock.Timer {
  274. b.lock.Lock()
  275. defer b.lock.Unlock()
  276. if b.timer == nil {
  277. b.timer = b.clock.NewTimer(b.Step())
  278. } else {
  279. b.timer.Reset(b.Step())
  280. }
  281. return b.timer
  282. }
  283. // Timer returns a new Timer instance that shares the clock and the reset behavior with all other
  284. // timers.
  285. func (b *backoffManager) Timer() Timer {
  286. return DelayFunc(b.Step).Timer(b.clock)
  287. }
  288. // BackoffManager manages backoff with a particular scheme based on its underlying implementation.
  289. type BackoffManager interface {
  290. // Backoff returns a shared clock.Timer that is Reset on every invocation. This method is not
  291. // safe for use from multiple threads. It returns a timer for backoff, and caller shall backoff
  292. // until Timer.C() drains. If the second Backoff() is called before the timer from the first
  293. // Backoff() call finishes, the first timer will NOT be drained and result in undetermined
  294. // behavior.
  295. Backoff() clock.Timer
  296. }
  297. // Deprecated: Will be removed when the legacy polling functions are removed.
  298. type exponentialBackoffManagerImpl struct {
  299. backoff *Backoff
  300. backoffTimer clock.Timer
  301. lastBackoffStart time.Time
  302. initialBackoff time.Duration
  303. backoffResetDuration time.Duration
  304. clock clock.Clock
  305. }
  306. // NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
  307. // backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
  308. // This backoff manager is used to reduce load during upstream unhealthiness.
  309. //
  310. // Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
  311. // Backoff struct, use DelayWithReset() to get a DelayFunc that periodically resets itself, and then
  312. // invoke Timer() when calling wait.BackoffUntil.
  313. //
  314. // Instead of:
  315. //
  316. // bm := wait.NewExponentialBackoffManager(init, max, reset, factor, jitter, clock)
  317. // ...
  318. // wait.BackoffUntil(..., bm.Backoff, ...)
  319. //
  320. // Use:
  321. //
  322. // delayFn := wait.Backoff{
  323. // Duration: init,
  324. // Cap: max,
  325. // Steps: int(math.Ceil(float64(max) / float64(init))), // now a required argument
  326. // Factor: factor,
  327. // Jitter: jitter,
  328. // }.DelayWithReset(reset, clock)
  329. // wait.BackoffUntil(..., delayFn.Timer(), ...)
  330. func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
  331. return &exponentialBackoffManagerImpl{
  332. backoff: &Backoff{
  333. Duration: initBackoff,
  334. Factor: backoffFactor,
  335. Jitter: jitter,
  336. // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
  337. // what we ideally need here, we set it to max int and assume we will never use up the steps
  338. Steps: math.MaxInt32,
  339. Cap: maxBackoff,
  340. },
  341. backoffTimer: nil,
  342. initialBackoff: initBackoff,
  343. lastBackoffStart: c.Now(),
  344. backoffResetDuration: resetDuration,
  345. clock: c,
  346. }
  347. }
  348. func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
  349. if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
  350. b.backoff.Steps = math.MaxInt32
  351. b.backoff.Duration = b.initialBackoff
  352. }
  353. b.lastBackoffStart = b.clock.Now()
  354. return b.backoff.Step()
  355. }
  356. // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
  357. // The returned timer must be drained before calling Backoff() the second time
  358. func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
  359. if b.backoffTimer == nil {
  360. b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
  361. } else {
  362. b.backoffTimer.Reset(b.getNextBackoff())
  363. }
  364. return b.backoffTimer
  365. }
  366. // Deprecated: Will be removed when the legacy polling functions are removed.
  367. type jitteredBackoffManagerImpl struct {
  368. clock clock.Clock
  369. duration time.Duration
  370. jitter float64
  371. backoffTimer clock.Timer
  372. }
  373. // NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
  374. // is negative, backoff will not be jittered.
  375. //
  376. // Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
  377. // Backoff struct and invoke Timer() when calling wait.BackoffUntil.
  378. //
  379. // Instead of:
  380. //
  381. // bm := wait.NewJitteredBackoffManager(duration, jitter, clock)
  382. // ...
  383. // wait.BackoffUntil(..., bm.Backoff, ...)
  384. //
  385. // Use:
  386. //
  387. // wait.BackoffUntil(..., wait.Backoff{Duration: duration, Jitter: jitter}.Timer(), ...)
  388. func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
  389. return &jitteredBackoffManagerImpl{
  390. clock: c,
  391. duration: duration,
  392. jitter: jitter,
  393. backoffTimer: nil,
  394. }
  395. }
  396. func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
  397. jitteredPeriod := j.duration
  398. if j.jitter > 0.0 {
  399. jitteredPeriod = Jitter(j.duration, j.jitter)
  400. }
  401. return jitteredPeriod
  402. }
  403. // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
  404. // The returned timer must be drained before calling Backoff() the second time
  405. func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
  406. backoff := j.getNextBackoff()
  407. if j.backoffTimer == nil {
  408. j.backoffTimer = j.clock.NewTimer(backoff)
  409. } else {
  410. j.backoffTimer.Reset(backoff)
  411. }
  412. return j.backoffTimer
  413. }
  414. // ExponentialBackoff repeats a condition check with exponential backoff.
  415. //
  416. // It repeatedly checks the condition and then sleeps, using `backoff.Step()`
  417. // to determine the length of the sleep and adjust Duration and Steps.
  418. // Stops and returns as soon as:
  419. // 1. the condition check returns true or an error,
  420. // 2. `backoff.Steps` checks of the condition have been done, or
  421. // 3. a sleep truncated by the cap on duration has been completed.
  422. // In case (1) the returned error is what the condition function returned.
  423. // In all other cases, ErrWaitTimeout is returned.
  424. //
  425. // Since backoffs are often subject to cancellation, we recommend using
  426. // ExponentialBackoffWithContext and passing a context to the method.
  427. func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
  428. for backoff.Steps > 0 {
  429. if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
  430. return err
  431. }
  432. if backoff.Steps == 1 {
  433. break
  434. }
  435. time.Sleep(backoff.Step())
  436. }
  437. return ErrWaitTimeout
  438. }
  439. // ExponentialBackoffWithContext repeats a condition check with exponential backoff.
  440. // It immediately returns an error if the condition returns an error, the context is cancelled
  441. // or hits the deadline, or if the maximum attempts defined in backoff is exceeded (ErrWaitTimeout).
  442. // If an error is returned by the condition the backoff stops immediately. The condition will
  443. // never be invoked more than backoff.Steps times.
  444. func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error {
  445. for backoff.Steps > 0 {
  446. select {
  447. case <-ctx.Done():
  448. return ctx.Err()
  449. default:
  450. }
  451. if ok, err := runConditionWithCrashProtectionWithContext(ctx, condition); err != nil || ok {
  452. return err
  453. }
  454. if backoff.Steps == 1 {
  455. break
  456. }
  457. waitBeforeRetry := backoff.Step()
  458. select {
  459. case <-ctx.Done():
  460. return ctx.Err()
  461. case <-time.After(waitBeforeRetry):
  462. }
  463. }
  464. return ErrWaitTimeout
  465. }