default_rate_limiters.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package workqueue
  14. import (
  15. "math"
  16. "sync"
  17. "time"
  18. "golang.org/x/time/rate"
  19. )
  20. type RateLimiter interface {
  21. // When gets an item and gets to decide how long that item should wait
  22. When(item interface{}) time.Duration
  23. // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
  24. // or for success, we'll stop tracking it
  25. Forget(item interface{})
  26. // NumRequeues returns back how many failures the item has had
  27. NumRequeues(item interface{}) int
  28. }
  29. // DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
  30. // both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
  31. func DefaultControllerRateLimiter() RateLimiter {
  32. return NewMaxOfRateLimiter(
  33. NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
  34. // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
  35. &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
  36. )
  37. }
  38. // BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
  39. type BucketRateLimiter struct {
  40. *rate.Limiter
  41. }
  42. var _ RateLimiter = &BucketRateLimiter{}
  43. func (r *BucketRateLimiter) When(item interface{}) time.Duration {
  44. return r.Limiter.Reserve().Delay()
  45. }
  46. func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
  47. return 0
  48. }
  49. func (r *BucketRateLimiter) Forget(item interface{}) {
  50. }
  51. // ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
  52. // dealing with max failures and expiration are up to the caller
  53. type ItemExponentialFailureRateLimiter struct {
  54. failuresLock sync.Mutex
  55. failures map[interface{}]int
  56. baseDelay time.Duration
  57. maxDelay time.Duration
  58. }
  59. var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
  60. func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
  61. return &ItemExponentialFailureRateLimiter{
  62. failures: map[interface{}]int{},
  63. baseDelay: baseDelay,
  64. maxDelay: maxDelay,
  65. }
  66. }
  67. func DefaultItemBasedRateLimiter() RateLimiter {
  68. return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
  69. }
  70. func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
  71. r.failuresLock.Lock()
  72. defer r.failuresLock.Unlock()
  73. exp := r.failures[item]
  74. r.failures[item] = r.failures[item] + 1
  75. // The backoff is capped such that 'calculated' value never overflows.
  76. backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
  77. if backoff > math.MaxInt64 {
  78. return r.maxDelay
  79. }
  80. calculated := time.Duration(backoff)
  81. if calculated > r.maxDelay {
  82. return r.maxDelay
  83. }
  84. return calculated
  85. }
  86. func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
  87. r.failuresLock.Lock()
  88. defer r.failuresLock.Unlock()
  89. return r.failures[item]
  90. }
  91. func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
  92. r.failuresLock.Lock()
  93. defer r.failuresLock.Unlock()
  94. delete(r.failures, item)
  95. }
  96. // ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
  97. type ItemFastSlowRateLimiter struct {
  98. failuresLock sync.Mutex
  99. failures map[interface{}]int
  100. maxFastAttempts int
  101. fastDelay time.Duration
  102. slowDelay time.Duration
  103. }
  104. var _ RateLimiter = &ItemFastSlowRateLimiter{}
  105. func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
  106. return &ItemFastSlowRateLimiter{
  107. failures: map[interface{}]int{},
  108. fastDelay: fastDelay,
  109. slowDelay: slowDelay,
  110. maxFastAttempts: maxFastAttempts,
  111. }
  112. }
  113. func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
  114. r.failuresLock.Lock()
  115. defer r.failuresLock.Unlock()
  116. r.failures[item] = r.failures[item] + 1
  117. if r.failures[item] <= r.maxFastAttempts {
  118. return r.fastDelay
  119. }
  120. return r.slowDelay
  121. }
  122. func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
  123. r.failuresLock.Lock()
  124. defer r.failuresLock.Unlock()
  125. return r.failures[item]
  126. }
  127. func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
  128. r.failuresLock.Lock()
  129. defer r.failuresLock.Unlock()
  130. delete(r.failures, item)
  131. }
  132. // MaxOfRateLimiter calls every RateLimiter and returns the worst case response
  133. // When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
  134. // were separately delayed a longer time.
  135. type MaxOfRateLimiter struct {
  136. limiters []RateLimiter
  137. }
  138. func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
  139. ret := time.Duration(0)
  140. for _, limiter := range r.limiters {
  141. curr := limiter.When(item)
  142. if curr > ret {
  143. ret = curr
  144. }
  145. }
  146. return ret
  147. }
  148. func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
  149. return &MaxOfRateLimiter{limiters: limiters}
  150. }
  151. func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
  152. ret := 0
  153. for _, limiter := range r.limiters {
  154. curr := limiter.NumRequeues(item)
  155. if curr > ret {
  156. ret = curr
  157. }
  158. }
  159. return ret
  160. }
  161. func (r *MaxOfRateLimiter) Forget(item interface{}) {
  162. for _, limiter := range r.limiters {
  163. limiter.Forget(item)
  164. }
  165. }