queue.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package workqueue
  14. import (
  15. "sync"
  16. "time"
  17. "k8s.io/apimachinery/pkg/util/sets"
  18. "k8s.io/utils/clock"
  19. )
  20. // Deprecated: Interface is deprecated, use TypedInterface instead.
  21. type Interface TypedInterface[any]
  22. type TypedInterface[T comparable] interface {
  23. Add(item T)
  24. Len() int
  25. Get() (item T, shutdown bool)
  26. Done(item T)
  27. ShutDown()
  28. ShutDownWithDrain()
  29. ShuttingDown() bool
  30. }
  31. // Queue is the underlying storage for items. The functions below are always
  32. // called from the same goroutine.
  33. type Queue[T comparable] interface {
  34. // Touch can be hooked when an existing item is added again. This may be
  35. // useful if the implementation allows priority change for the given item.
  36. Touch(item T)
  37. // Push adds a new item.
  38. Push(item T)
  39. // Len tells the total number of items.
  40. Len() int
  41. // Pop retrieves an item.
  42. Pop() (item T)
  43. }
  44. // DefaultQueue is a slice based FIFO queue.
  45. func DefaultQueue[T comparable]() Queue[T] {
  46. return new(queue[T])
  47. }
  48. // queue is a slice which implements Queue.
  49. type queue[T comparable] []T
  50. func (q *queue[T]) Touch(item T) {}
  51. func (q *queue[T]) Push(item T) {
  52. *q = append(*q, item)
  53. }
  54. func (q *queue[T]) Len() int {
  55. return len(*q)
  56. }
  57. func (q *queue[T]) Pop() (item T) {
  58. item = (*q)[0]
  59. // The underlying array still exists and reference this object, so the object will not be garbage collected.
  60. (*q)[0] = *new(T)
  61. *q = (*q)[1:]
  62. return item
  63. }
  64. // QueueConfig specifies optional configurations to customize an Interface.
  65. // Deprecated: use TypedQueueConfig instead.
  66. type QueueConfig = TypedQueueConfig[any]
  67. type TypedQueueConfig[T comparable] struct {
  68. // Name for the queue. If unnamed, the metrics will not be registered.
  69. Name string
  70. // MetricsProvider optionally allows specifying a metrics provider to use for the queue
  71. // instead of the global provider.
  72. MetricsProvider MetricsProvider
  73. // Clock ability to inject real or fake clock for testing purposes.
  74. Clock clock.WithTicker
  75. // Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue.
  76. Queue Queue[T]
  77. }
  78. // New constructs a new work queue (see the package comment).
  79. //
  80. // Deprecated: use NewTyped instead.
  81. func New() *Type {
  82. return NewWithConfig(QueueConfig{
  83. Name: "",
  84. })
  85. }
  86. // NewTyped constructs a new work queue (see the package comment).
  87. func NewTyped[T comparable]() *Typed[T] {
  88. return NewTypedWithConfig(TypedQueueConfig[T]{
  89. Name: "",
  90. })
  91. }
  92. // NewWithConfig constructs a new workqueue with ability to
  93. // customize different properties.
  94. //
  95. // Deprecated: use NewTypedWithConfig instead.
  96. func NewWithConfig(config QueueConfig) *Type {
  97. return NewTypedWithConfig(config)
  98. }
  99. // NewTypedWithConfig constructs a new workqueue with ability to
  100. // customize different properties.
  101. func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T] {
  102. return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
  103. }
  104. // NewNamed creates a new named queue.
  105. // Deprecated: Use NewWithConfig instead.
  106. func NewNamed(name string) *Type {
  107. return NewWithConfig(QueueConfig{
  108. Name: name,
  109. })
  110. }
  111. // newQueueWithConfig constructs a new named workqueue
  112. // with the ability to customize different properties for testing purposes
  113. func newQueueWithConfig[T comparable](config TypedQueueConfig[T], updatePeriod time.Duration) *Typed[T] {
  114. metricsProvider := globalMetricsProvider
  115. if config.MetricsProvider != nil {
  116. metricsProvider = config.MetricsProvider
  117. }
  118. if config.Clock == nil {
  119. config.Clock = clock.RealClock{}
  120. }
  121. if config.Queue == nil {
  122. config.Queue = DefaultQueue[T]()
  123. }
  124. return newQueue(
  125. config.Clock,
  126. config.Queue,
  127. newQueueMetrics[T](metricsProvider, config.Name, config.Clock),
  128. updatePeriod,
  129. )
  130. }
  131. func newQueue[T comparable](c clock.WithTicker, queue Queue[T], metrics queueMetrics[T], updatePeriod time.Duration) *Typed[T] {
  132. t := &Typed[T]{
  133. clock: c,
  134. queue: queue,
  135. dirty: sets.Set[T]{},
  136. processing: sets.Set[T]{},
  137. cond: sync.NewCond(&sync.Mutex{}),
  138. metrics: metrics,
  139. unfinishedWorkUpdatePeriod: updatePeriod,
  140. stopCh: make(chan struct{}),
  141. }
  142. // Don't start the goroutine for a type of noMetrics so we don't consume
  143. // resources unnecessarily
  144. if _, ok := metrics.(noMetrics[T]); !ok {
  145. t.wg.Go(t.updateUnfinishedWorkLoop)
  146. }
  147. return t
  148. }
  149. const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
  150. // Type is a work queue (see the package comment).
  151. // Deprecated: Use Typed instead.
  152. type Type = Typed[any]
  153. type Typed[t comparable] struct {
  154. // queue defines the order in which we will work on items. Every
  155. // element of queue should be in the dirty set and not in the
  156. // processing set.
  157. queue Queue[t]
  158. // dirty defines all of the items that need to be processed.
  159. dirty sets.Set[t]
  160. // Things that are currently being processed are in the processing set.
  161. // These things may be simultaneously in the dirty set. When we finish
  162. // processing something and remove it from this set, we'll check if
  163. // it's in the dirty set, and if so, add it to the queue.
  164. processing sets.Set[t]
  165. cond *sync.Cond
  166. shuttingDown bool
  167. drain bool
  168. metrics queueMetrics[t]
  169. unfinishedWorkUpdatePeriod time.Duration
  170. clock clock.WithTicker
  171. // wg manages goroutines started by the queue to allow graceful shutdown
  172. // ShutDown() will wait for goroutines to exit before returning.
  173. wg sync.WaitGroup
  174. stopCh chan struct{}
  175. // stopOnce guarantees we only signal shutdown a single time
  176. stopOnce sync.Once
  177. }
  178. // Add marks item as needing processing. When the queue is shutdown new
  179. // items will silently be ignored and not queued or marked as dirty for
  180. // reprocessing.
  181. func (q *Typed[T]) Add(item T) {
  182. q.cond.L.Lock()
  183. defer q.cond.L.Unlock()
  184. if q.shuttingDown {
  185. return
  186. }
  187. if q.dirty.Has(item) {
  188. // the same item is added again before it is processed, call the Touch
  189. // function if the queue cares about it (for e.g, reset its priority)
  190. if !q.processing.Has(item) {
  191. q.queue.Touch(item)
  192. }
  193. return
  194. }
  195. q.metrics.add(item)
  196. q.dirty.Insert(item)
  197. if q.processing.Has(item) {
  198. return
  199. }
  200. q.queue.Push(item)
  201. q.cond.Signal()
  202. }
  203. // Len returns the current queue length, for informational purposes only. You
  204. // shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
  205. // value, that can't be synchronized properly.
  206. func (q *Typed[T]) Len() int {
  207. q.cond.L.Lock()
  208. defer q.cond.L.Unlock()
  209. return q.queue.Len()
  210. }
  211. // Get blocks until it can return an item to be processed. If shutdown = true,
  212. // the caller should end their goroutine. You must call Done with item when you
  213. // have finished processing it.
  214. func (q *Typed[T]) Get() (item T, shutdown bool) {
  215. q.cond.L.Lock()
  216. defer q.cond.L.Unlock()
  217. for q.queue.Len() == 0 && !q.shuttingDown {
  218. q.cond.Wait()
  219. }
  220. if q.queue.Len() == 0 {
  221. // We must be shutting down.
  222. return *new(T), true
  223. }
  224. item = q.queue.Pop()
  225. q.metrics.get(item)
  226. q.processing.Insert(item)
  227. q.dirty.Delete(item)
  228. return item, false
  229. }
  230. // Done marks item as done processing, and if it has been marked as dirty again
  231. // while it was being processed, it will be re-added to the queue for
  232. // re-processing.
  233. func (q *Typed[T]) Done(item T) {
  234. q.cond.L.Lock()
  235. defer q.cond.L.Unlock()
  236. q.metrics.done(item)
  237. q.processing.Delete(item)
  238. if q.dirty.Has(item) {
  239. q.queue.Push(item)
  240. q.cond.Signal()
  241. } else if q.processing.Len() == 0 {
  242. q.cond.Signal()
  243. }
  244. }
  245. // ShutDown will cause q to ignore all new items added to it. Worker
  246. // goroutines will continue processing items in the queue until it is
  247. // empty and then receive the shutdown signal.
  248. func (q *Typed[T]) ShutDown() {
  249. defer q.wg.Wait()
  250. q.stopOnce.Do(func() {
  251. defer close(q.stopCh)
  252. })
  253. q.cond.L.Lock()
  254. defer q.cond.L.Unlock()
  255. q.drain = false
  256. q.shuttingDown = true
  257. q.cond.Broadcast()
  258. }
  259. // ShutDownWithDrain is equivalent to ShutDown but waits until all items
  260. // in the queue have been processed.
  261. // ShutDown can be called after ShutDownWithDrain to force
  262. // ShutDownWithDrain to stop waiting.
  263. // Workers must call Done on an item after processing it, otherwise
  264. // ShutDownWithDrain will block indefinitely.
  265. func (q *Typed[T]) ShutDownWithDrain() {
  266. defer q.wg.Wait()
  267. q.stopOnce.Do(func() {
  268. defer close(q.stopCh)
  269. })
  270. q.cond.L.Lock()
  271. defer q.cond.L.Unlock()
  272. q.drain = true
  273. q.shuttingDown = true
  274. q.cond.Broadcast()
  275. for q.processing.Len() != 0 && q.drain {
  276. q.cond.Wait()
  277. }
  278. }
  279. func (q *Typed[T]) ShuttingDown() bool {
  280. q.cond.L.Lock()
  281. defer q.cond.L.Unlock()
  282. return q.shuttingDown
  283. }
  284. func (q *Typed[T]) updateUnfinishedWork() {
  285. q.cond.L.Lock()
  286. defer q.cond.L.Unlock()
  287. if !q.shuttingDown {
  288. q.metrics.updateUnfinishedWork()
  289. }
  290. }
  291. func (q *Typed[T]) updateUnfinishedWorkLoop() {
  292. t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
  293. defer t.Stop()
  294. for {
  295. select {
  296. case <-t.C():
  297. q.updateUnfinishedWork()
  298. case <-q.stopCh:
  299. return
  300. }
  301. }
  302. }