| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package workqueue
- import (
- "math"
- "sync"
- "time"
- "golang.org/x/time/rate"
- )
- type RateLimiter interface {
- // When gets an item and gets to decide how long that item should wait
- When(item interface{}) time.Duration
- // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
- // or for success, we'll stop tracking it
- Forget(item interface{})
- // NumRequeues returns back how many failures the item has had
- NumRequeues(item interface{}) int
- }
- // DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
- // both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
- func DefaultControllerRateLimiter() RateLimiter {
- return NewMaxOfRateLimiter(
- NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
- // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
- &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
- )
- }
- // BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
- type BucketRateLimiter struct {
- *rate.Limiter
- }
- var _ RateLimiter = &BucketRateLimiter{}
- func (r *BucketRateLimiter) When(item interface{}) time.Duration {
- return r.Limiter.Reserve().Delay()
- }
- func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
- return 0
- }
- func (r *BucketRateLimiter) Forget(item interface{}) {
- }
- // ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
- // dealing with max failures and expiration are up to the caller
- type ItemExponentialFailureRateLimiter struct {
- failuresLock sync.Mutex
- failures map[interface{}]int
- baseDelay time.Duration
- maxDelay time.Duration
- }
- var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
- func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
- return &ItemExponentialFailureRateLimiter{
- failures: map[interface{}]int{},
- baseDelay: baseDelay,
- maxDelay: maxDelay,
- }
- }
- func DefaultItemBasedRateLimiter() RateLimiter {
- return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
- }
- func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
- r.failuresLock.Lock()
- defer r.failuresLock.Unlock()
- exp := r.failures[item]
- r.failures[item] = r.failures[item] + 1
- // The backoff is capped such that 'calculated' value never overflows.
- backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
- if backoff > math.MaxInt64 {
- return r.maxDelay
- }
- calculated := time.Duration(backoff)
- if calculated > r.maxDelay {
- return r.maxDelay
- }
- return calculated
- }
- func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
- r.failuresLock.Lock()
- defer r.failuresLock.Unlock()
- return r.failures[item]
- }
- func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
- r.failuresLock.Lock()
- defer r.failuresLock.Unlock()
- delete(r.failures, item)
- }
- // ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
- type ItemFastSlowRateLimiter struct {
- failuresLock sync.Mutex
- failures map[interface{}]int
- maxFastAttempts int
- fastDelay time.Duration
- slowDelay time.Duration
- }
- var _ RateLimiter = &ItemFastSlowRateLimiter{}
- func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
- return &ItemFastSlowRateLimiter{
- failures: map[interface{}]int{},
- fastDelay: fastDelay,
- slowDelay: slowDelay,
- maxFastAttempts: maxFastAttempts,
- }
- }
- func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
- r.failuresLock.Lock()
- defer r.failuresLock.Unlock()
- r.failures[item] = r.failures[item] + 1
- if r.failures[item] <= r.maxFastAttempts {
- return r.fastDelay
- }
- return r.slowDelay
- }
- func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
- r.failuresLock.Lock()
- defer r.failuresLock.Unlock()
- return r.failures[item]
- }
- func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
- r.failuresLock.Lock()
- defer r.failuresLock.Unlock()
- delete(r.failures, item)
- }
- // MaxOfRateLimiter calls every RateLimiter and returns the worst case response
- // When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
- // were separately delayed a longer time.
- type MaxOfRateLimiter struct {
- limiters []RateLimiter
- }
- func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
- ret := time.Duration(0)
- for _, limiter := range r.limiters {
- curr := limiter.When(item)
- if curr > ret {
- ret = curr
- }
- }
- return ret
- }
- func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
- return &MaxOfRateLimiter{limiters: limiters}
- }
- func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
- ret := 0
- for _, limiter := range r.limiters {
- curr := limiter.NumRequeues(item)
- if curr > ret {
- ret = curr
- }
- }
- return ret
- }
- func (r *MaxOfRateLimiter) Forget(item interface{}) {
- for _, limiter := range r.limiters {
- limiter.Forget(item)
- }
- }
|