delaying_queue.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package workqueue
  14. import (
  15. "container/heap"
  16. "sync"
  17. "time"
  18. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  19. "k8s.io/klog/v2"
  20. "k8s.io/utils/clock"
  21. )
  22. // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
  23. // requeue items after failures without ending up in a hot-loop.
  24. //
  25. // Deprecated: use TypedDelayingInterface instead.
  26. type DelayingInterface TypedDelayingInterface[any]
  27. // TypedDelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
  28. // requeue items after failures without ending up in a hot-loop.
  29. type TypedDelayingInterface[T comparable] interface {
  30. TypedInterface[T]
  31. // AddAfter adds an item to the workqueue after the indicated duration has passed
  32. AddAfter(item T, duration time.Duration)
  33. }
  34. // DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
  35. //
  36. // Deprecated: use TypedDelayingQueueConfig instead.
  37. type DelayingQueueConfig = TypedDelayingQueueConfig[any]
  38. // TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
  39. type TypedDelayingQueueConfig[T comparable] struct {
  40. // An optional logger. The name of the queue does *not* get added to it, this should
  41. // be done by the caller if desired.
  42. Logger *klog.Logger
  43. // Name for the queue. If unnamed, the metrics will not be registered.
  44. Name string
  45. // MetricsProvider optionally allows specifying a metrics provider to use for the queue
  46. // instead of the global provider.
  47. MetricsProvider MetricsProvider
  48. // Clock optionally allows injecting a real or fake clock for testing purposes.
  49. Clock clock.WithTicker
  50. // Queue optionally allows injecting custom queue Interface instead of the default one.
  51. Queue TypedInterface[T]
  52. }
  53. // NewDelayingQueue constructs a new workqueue with delayed queuing ability.
  54. // NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
  55. // NewDelayingQueueWithConfig instead and specify a name.
  56. //
  57. // Deprecated: use NewTypedDelayingQueue instead.
  58. func NewDelayingQueue() DelayingInterface {
  59. return NewDelayingQueueWithConfig(DelayingQueueConfig{})
  60. }
  61. // NewTypedDelayingQueue constructs a new workqueue with delayed queuing ability.
  62. // NewTypedDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
  63. // NewTypedDelayingQueueWithConfig instead and specify a name.
  64. func NewTypedDelayingQueue[T comparable]() TypedDelayingInterface[T] {
  65. return NewTypedDelayingQueueWithConfig(TypedDelayingQueueConfig[T]{})
  66. }
  67. // NewDelayingQueueWithConfig constructs a new workqueue with options to
  68. // customize different properties.
  69. //
  70. // Deprecated: use NewTypedDelayingQueueWithConfig instead.
  71. func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
  72. return NewTypedDelayingQueueWithConfig[any](config)
  73. }
  74. // TypedNewDelayingQueue exists for backwards compatibility only.
  75. //
  76. // Deprecated: use NewTypedDelayingQueueWithConfig instead.
  77. func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T] {
  78. return NewTypedDelayingQueue[T]()
  79. }
  80. // NewTypedDelayingQueueWithConfig constructs a new workqueue with options to
  81. // customize different properties.
  82. func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T] {
  83. logger := klog.Background()
  84. if config.Logger != nil {
  85. logger = *config.Logger
  86. }
  87. if config.Clock == nil {
  88. config.Clock = clock.RealClock{}
  89. }
  90. if config.Queue == nil {
  91. config.Queue = NewTypedWithConfig[T](TypedQueueConfig[T]{
  92. Name: config.Name,
  93. MetricsProvider: config.MetricsProvider,
  94. Clock: config.Clock,
  95. })
  96. }
  97. return newDelayingQueue(logger, config.Clock, config.Queue, config.Name, config.MetricsProvider)
  98. }
  99. // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
  100. // inject custom queue Interface instead of the default one
  101. // Deprecated: Use NewDelayingQueueWithConfig instead.
  102. func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
  103. return NewDelayingQueueWithConfig(DelayingQueueConfig{
  104. Name: name,
  105. Queue: q,
  106. })
  107. }
  108. // NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
  109. // Deprecated: Use NewDelayingQueueWithConfig instead.
  110. func NewNamedDelayingQueue(name string) DelayingInterface {
  111. return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
  112. }
  113. // NewDelayingQueueWithCustomClock constructs a new named workqueue
  114. // with ability to inject real or fake clock for testing purposes.
  115. // Deprecated: Use NewDelayingQueueWithConfig instead.
  116. func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
  117. return NewDelayingQueueWithConfig(DelayingQueueConfig{
  118. Name: name,
  119. Clock: clock,
  120. })
  121. }
  122. func newDelayingQueue[T comparable](logger klog.Logger, clock clock.WithTicker, q TypedInterface[T], name string, provider MetricsProvider) *delayingType[T] {
  123. ret := &delayingType[T]{
  124. TypedInterface: q,
  125. clock: clock,
  126. heartbeat: clock.NewTicker(maxWait),
  127. stopCh: make(chan struct{}),
  128. waitingForAddCh: make(chan *waitFor[T], 1000),
  129. metrics: newRetryMetrics(name, provider),
  130. }
  131. go ret.waitingLoop(logger)
  132. return ret
  133. }
  134. // delayingType wraps an Interface and provides delayed re-enquing
  135. type delayingType[T comparable] struct {
  136. TypedInterface[T]
  137. // clock tracks time for delayed firing
  138. clock clock.Clock
  139. // stopCh lets us signal a shutdown to the waiting loop
  140. stopCh chan struct{}
  141. // stopOnce guarantees we only signal shutdown a single time
  142. stopOnce sync.Once
  143. // heartbeat ensures we wait no more than maxWait before firing
  144. heartbeat clock.Ticker
  145. // waitingForAddCh is a buffered channel that feeds waitingForAdd
  146. waitingForAddCh chan *waitFor[T]
  147. // metrics counts the number of retries
  148. metrics retryMetrics
  149. }
  150. // waitFor holds the data to add and the time it should be added
  151. type waitFor[T any] struct {
  152. data T
  153. readyAt time.Time
  154. // index in the priority queue (heap)
  155. index int
  156. }
  157. // waitForPriorityQueue implements a priority queue for waitFor items.
  158. //
  159. // waitForPriorityQueue implements heap.Interface. The item occurring next in
  160. // time (i.e., the item with the smallest readyAt) is at the root (index 0).
  161. // Peek returns this minimum item at index 0. Pop returns the minimum item after
  162. // it has been removed from the queue and placed at index Len()-1 by
  163. // container/heap. Push adds an item at index Len(), and container/heap
  164. // percolates it into the correct location.
  165. type waitForPriorityQueue[T any] []*waitFor[T]
  166. func (pq waitForPriorityQueue[T]) Len() int {
  167. return len(pq)
  168. }
  169. func (pq waitForPriorityQueue[T]) Less(i, j int) bool {
  170. return pq[i].readyAt.Before(pq[j].readyAt)
  171. }
  172. func (pq waitForPriorityQueue[T]) Swap(i, j int) {
  173. pq[i], pq[j] = pq[j], pq[i]
  174. pq[i].index = i
  175. pq[j].index = j
  176. }
  177. // Push adds an item to the queue. Push should not be called directly; instead,
  178. // use `heap.Push`.
  179. func (pq *waitForPriorityQueue[T]) Push(x interface{}) {
  180. n := len(*pq)
  181. item := x.(*waitFor[T])
  182. item.index = n
  183. *pq = append(*pq, item)
  184. }
  185. // Pop removes an item from the queue. Pop should not be called directly;
  186. // instead, use `heap.Pop`.
  187. func (pq *waitForPriorityQueue[T]) Pop() interface{} {
  188. n := len(*pq)
  189. item := (*pq)[n-1]
  190. item.index = -1
  191. *pq = (*pq)[0:(n - 1)]
  192. return item
  193. }
  194. // Peek returns the item at the beginning of the queue, without removing the
  195. // item or otherwise mutating the queue. It is safe to call directly.
  196. func (pq waitForPriorityQueue[T]) Peek() interface{} {
  197. return pq[0]
  198. }
  199. // ShutDown stops the queue. After the queue drains, the returned shutdown bool
  200. // on Get() will be true. This method may be invoked more than once.
  201. func (q *delayingType[T]) ShutDown() {
  202. q.stopOnce.Do(func() {
  203. q.TypedInterface.ShutDown()
  204. close(q.stopCh)
  205. q.heartbeat.Stop()
  206. })
  207. }
  208. // AddAfter adds the given item to the work queue after the given delay
  209. func (q *delayingType[T]) AddAfter(item T, duration time.Duration) {
  210. // don't add if we're already shutting down
  211. if q.ShuttingDown() {
  212. return
  213. }
  214. q.metrics.retry()
  215. // immediately add things with no delay
  216. if duration <= 0 {
  217. q.Add(item)
  218. return
  219. }
  220. select {
  221. case <-q.stopCh:
  222. // unblock if ShutDown() is called
  223. case q.waitingForAddCh <- &waitFor[T]{data: item, readyAt: q.clock.Now().Add(duration)}:
  224. }
  225. }
  226. // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
  227. // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
  228. // expired item sitting for more than 10 seconds.
  229. const maxWait = 10 * time.Second
  230. // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
  231. func (q *delayingType[T]) waitingLoop(logger klog.Logger) {
  232. defer utilruntime.HandleCrashWithLogger(logger)
  233. // Make a placeholder channel to use when there are no items in our list
  234. never := make(<-chan time.Time)
  235. // Make a timer that expires when the item at the head of the waiting queue is ready
  236. var nextReadyAtTimer clock.Timer
  237. waitingForQueue := &waitForPriorityQueue[T]{}
  238. heap.Init(waitingForQueue)
  239. waitingEntryByData := map[T]*waitFor[T]{}
  240. for {
  241. if q.TypedInterface.ShuttingDown() {
  242. return
  243. }
  244. now := q.clock.Now()
  245. // Add ready entries
  246. for waitingForQueue.Len() > 0 {
  247. entry := waitingForQueue.Peek().(*waitFor[T])
  248. if entry.readyAt.After(now) {
  249. break
  250. }
  251. entry = heap.Pop(waitingForQueue).(*waitFor[T])
  252. q.Add(entry.data)
  253. delete(waitingEntryByData, entry.data)
  254. }
  255. // Set up a wait for the first item's readyAt (if one exists)
  256. nextReadyAt := never
  257. if waitingForQueue.Len() > 0 {
  258. if nextReadyAtTimer != nil {
  259. nextReadyAtTimer.Stop()
  260. }
  261. entry := waitingForQueue.Peek().(*waitFor[T])
  262. nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
  263. nextReadyAt = nextReadyAtTimer.C()
  264. }
  265. select {
  266. case <-q.stopCh:
  267. return
  268. case <-q.heartbeat.C():
  269. // continue the loop, which will add ready items
  270. case <-nextReadyAt:
  271. // continue the loop, which will add ready items
  272. case waitEntry := <-q.waitingForAddCh:
  273. if waitEntry.readyAt.After(q.clock.Now()) {
  274. insert(waitingForQueue, waitingEntryByData, waitEntry)
  275. } else {
  276. q.Add(waitEntry.data)
  277. }
  278. drained := false
  279. for !drained {
  280. select {
  281. case waitEntry := <-q.waitingForAddCh:
  282. if waitEntry.readyAt.After(q.clock.Now()) {
  283. insert(waitingForQueue, waitingEntryByData, waitEntry)
  284. } else {
  285. q.Add(waitEntry.data)
  286. }
  287. default:
  288. drained = true
  289. }
  290. }
  291. }
  292. }
  293. }
  294. // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
  295. func insert[T comparable](q *waitForPriorityQueue[T], knownEntries map[T]*waitFor[T], entry *waitFor[T]) {
  296. // if the entry already exists, update the time only if it would cause the item to be queued sooner
  297. existing, exists := knownEntries[entry.data]
  298. if exists {
  299. if existing.readyAt.After(entry.readyAt) {
  300. existing.readyAt = entry.readyAt
  301. heap.Fix(q, existing.index)
  302. }
  303. return
  304. }
  305. heap.Push(q, entry)
  306. knownEntries[entry.data] = entry
  307. }