fifo.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "sync"
  17. "k8s.io/apimachinery/pkg/util/sets"
  18. )
  19. // PopProcessFunc is passed to Pop() method of Queue interface.
  20. // It is supposed to process the accumulator popped from the queue.
  21. type PopProcessFunc func(obj interface{}, isInInitialList bool) error
  22. // ErrFIFOClosed used when FIFO is closed
  23. var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
  24. // Queue extends ReflectorStore with a collection of Store keys to "process".
  25. // Every Add, Update, or Delete may put the object's key in that collection.
  26. // A Queue has a way to derive the corresponding key given an accumulator.
  27. // A Queue can be accessed concurrently from multiple goroutines.
  28. // A Queue can be "closed", after which Pop operations return an error.
  29. type Queue interface {
  30. ReflectorStore
  31. // Pop blocks until there is at least one key to process or the
  32. // Queue is closed. In the latter case Pop returns with an error.
  33. // In the former case Pop atomically picks one key to process,
  34. // removes that (key, accumulator) association from the Store, and
  35. // processes the accumulator. Pop returns the accumulator that
  36. // was processed and the result of processing. The PopProcessFunc
  37. // may return an ErrRequeue{inner} and in this case Pop will (a)
  38. // return that (key, accumulator) association to the Queue as part
  39. // of the atomic processing and (b) return the inner error from
  40. // Pop.
  41. Pop(PopProcessFunc) (interface{}, error)
  42. // HasSynced returns true if the first batch of keys have all been
  43. // popped. The first batch of keys are those of the first Replace
  44. // operation if that happened before any Add, AddIfNotPresent,
  45. // Update, or Delete; otherwise the first batch is empty.
  46. HasSynced() bool
  47. // Close the queue
  48. Close()
  49. }
  50. // QueueWithBatch extends the Queue interface with support for batch processing.
  51. //
  52. // In addition to the standard single-item Pop method, QueueWithBatch provides
  53. // PopBatch, which allows multiple items to be popped and processed together as
  54. // a batch. This can be used to improve processing efficiency when it is
  55. // beneficial to handle multiple queued keys or accumulators in a single
  56. // operation.
  57. // TODO: Consider merging this interface into Queue after feature gate GA
  58. type QueueWithBatch interface {
  59. Queue
  60. // PopBatch behaves similarly to Queue#Pop, but processes multiple keys
  61. // as a batch. The implementation determines the batching strategy,
  62. // such as the number of keys to include per batch.
  63. PopBatch(ProcessBatchFunc) error
  64. }
  65. // Pop is helper function for popping from Queue.
  66. // WARNING: Do NOT use this function in non-test code to avoid races
  67. // unless you really really really really know what you are doing.
  68. //
  69. // NOTE: This function is deprecated and may be removed in the future without
  70. // additional warning.
  71. func Pop(queue Queue) interface{} {
  72. var result interface{}
  73. queue.Pop(func(obj interface{}, isInInitialList bool) error {
  74. result = obj
  75. return nil
  76. })
  77. return result
  78. }
  79. // FIFO is a Queue in which (a) each accumulator is simply the most
  80. // recently provided object and (b) the collection of keys to process
  81. // is a FIFO. The accumulators all start out empty, and deleting an
  82. // object from its accumulator empties the accumulator. The Resync
  83. // operation is a no-op.
  84. //
  85. // Thus: if multiple adds/updates of a single object happen while that
  86. // object's key is in the queue before it has been processed then it
  87. // will only be processed once, and when it is processed the most
  88. // recent version will be processed. This can't be done with a channel
  89. //
  90. // FIFO solves this use case:
  91. // - You want to process every object (exactly) once.
  92. // - You want to process the most recent version of the object when you process it.
  93. // - You do not want to process deleted objects, they should be removed from the queue.
  94. // - You do not want to periodically reprocess objects.
  95. //
  96. // Compare with DeltaFIFO for other use cases.
  97. type FIFO struct {
  98. lock sync.RWMutex
  99. cond sync.Cond
  100. // We depend on the property that every key in `items` is also in `queue`
  101. items map[string]interface{}
  102. queue []string
  103. // populated is true if the first batch of items inserted by Replace() has been populated
  104. // or Delete/Add/Update was called first.
  105. populated bool
  106. // initialPopulationCount is the number of items inserted by the first call of Replace()
  107. initialPopulationCount int
  108. // keyFunc is used to make the key used for queued item insertion and retrieval, and
  109. // should be deterministic.
  110. keyFunc KeyFunc
  111. // Indication the queue is closed.
  112. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  113. // Currently, not used to gate any of CRUD operations.
  114. closed bool
  115. }
  116. var (
  117. _ = Queue(&FIFO{}) // FIFO is a Queue
  118. )
  119. // Close the queue.
  120. func (f *FIFO) Close() {
  121. f.lock.Lock()
  122. defer f.lock.Unlock()
  123. f.closed = true
  124. f.cond.Broadcast()
  125. }
  126. // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
  127. // or the first batch of items inserted by Replace() has been popped.
  128. func (f *FIFO) HasSynced() bool {
  129. f.lock.Lock()
  130. defer f.lock.Unlock()
  131. return f.hasSynced_locked()
  132. }
  133. func (f *FIFO) hasSynced_locked() bool {
  134. return f.populated && f.initialPopulationCount == 0
  135. }
  136. // Add inserts an item, and puts it in the queue. The item is only enqueued
  137. // if it doesn't already exist in the set.
  138. func (f *FIFO) Add(obj interface{}) error {
  139. id, err := f.keyFunc(obj)
  140. if err != nil {
  141. return KeyError{obj, err}
  142. }
  143. f.lock.Lock()
  144. defer f.lock.Unlock()
  145. f.populated = true
  146. if _, exists := f.items[id]; !exists {
  147. f.queue = append(f.queue, id)
  148. }
  149. f.items[id] = obj
  150. f.cond.Broadcast()
  151. return nil
  152. }
  153. // Update is the same as Add in this implementation.
  154. func (f *FIFO) Update(obj interface{}) error {
  155. return f.Add(obj)
  156. }
  157. // Delete removes an item. It doesn't add it to the queue, because
  158. // this implementation assumes the consumer only cares about the objects,
  159. // not the order in which they were created/added.
  160. func (f *FIFO) Delete(obj interface{}) error {
  161. id, err := f.keyFunc(obj)
  162. if err != nil {
  163. return KeyError{obj, err}
  164. }
  165. f.lock.Lock()
  166. defer f.lock.Unlock()
  167. f.populated = true
  168. delete(f.items, id)
  169. return err
  170. }
  171. // IsClosed checks if the queue is closed
  172. func (f *FIFO) IsClosed() bool {
  173. f.lock.Lock()
  174. defer f.lock.Unlock()
  175. return f.closed
  176. }
  177. // Pop waits until an item is ready and processes it. If multiple items are
  178. // ready, they are returned in the order in which they were added/updated.
  179. // The item is removed from the queue (and the store) before it is processed,
  180. // so if you don't successfully process it, it should be added back with
  181. // AddIfNotPresent(). process function is called under lock, so it is safe
  182. // update data structures in it that need to be in sync with the queue.
  183. func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
  184. f.lock.Lock()
  185. defer f.lock.Unlock()
  186. for {
  187. for len(f.queue) == 0 {
  188. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  189. // When Close() is called, the f.closed is set and the condition is broadcasted.
  190. // Which causes this loop to continue and return from the Pop().
  191. if f.closed {
  192. return nil, ErrFIFOClosed
  193. }
  194. f.cond.Wait()
  195. }
  196. isInInitialList := !f.hasSynced_locked()
  197. id := f.queue[0]
  198. f.queue = f.queue[1:]
  199. if f.initialPopulationCount > 0 {
  200. f.initialPopulationCount--
  201. }
  202. item, ok := f.items[id]
  203. if !ok {
  204. // Item may have been deleted subsequently.
  205. continue
  206. }
  207. delete(f.items, id)
  208. err := process(item, isInInitialList)
  209. return item, err
  210. }
  211. }
  212. // Replace will delete the contents of 'f', using instead the given map.
  213. // 'f' takes ownership of the map, you should not reference the map again
  214. // after calling this function. f's queue is reset, too; upon return, it
  215. // will contain the items in the map, in no particular order.
  216. func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
  217. items := make(map[string]interface{}, len(list))
  218. for _, item := range list {
  219. key, err := f.keyFunc(item)
  220. if err != nil {
  221. return KeyError{item, err}
  222. }
  223. items[key] = item
  224. }
  225. f.lock.Lock()
  226. defer f.lock.Unlock()
  227. if !f.populated {
  228. f.populated = true
  229. f.initialPopulationCount = len(items)
  230. }
  231. f.items = items
  232. f.queue = f.queue[:0]
  233. for id := range items {
  234. f.queue = append(f.queue, id)
  235. }
  236. if len(f.queue) > 0 {
  237. f.cond.Broadcast()
  238. }
  239. return nil
  240. }
  241. // Resync will ensure that every object in the Store has its key in the queue.
  242. // This should be a no-op, because that property is maintained by all operations.
  243. func (f *FIFO) Resync() error {
  244. f.lock.Lock()
  245. defer f.lock.Unlock()
  246. inQueue := sets.NewString()
  247. for _, id := range f.queue {
  248. inQueue.Insert(id)
  249. }
  250. for id := range f.items {
  251. if !inQueue.Has(id) {
  252. f.queue = append(f.queue, id)
  253. }
  254. }
  255. if len(f.queue) > 0 {
  256. f.cond.Broadcast()
  257. }
  258. return nil
  259. }
  260. // NewFIFO returns a Store which can be used to queue up items to
  261. // process.
  262. func NewFIFO(keyFunc KeyFunc) *FIFO {
  263. f := &FIFO{
  264. items: map[string]interface{}{},
  265. queue: []string{},
  266. keyFunc: keyFunc,
  267. }
  268. f.cond.L = &f.lock
  269. return f
  270. }