| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "errors"
- "sync"
- "k8s.io/apimachinery/pkg/util/sets"
- )
- // PopProcessFunc is passed to Pop() method of Queue interface.
- // It is supposed to process the accumulator popped from the queue.
- type PopProcessFunc func(obj interface{}, isInInitialList bool) error
- // ErrFIFOClosed used when FIFO is closed
- var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
- // Queue extends ReflectorStore with a collection of Store keys to "process".
- // Every Add, Update, or Delete may put the object's key in that collection.
- // A Queue has a way to derive the corresponding key given an accumulator.
- // A Queue can be accessed concurrently from multiple goroutines.
- // A Queue can be "closed", after which Pop operations return an error.
- type Queue interface {
- ReflectorStore
- // Pop blocks until there is at least one key to process or the
- // Queue is closed. In the latter case Pop returns with an error.
- // In the former case Pop atomically picks one key to process,
- // removes that (key, accumulator) association from the Store, and
- // processes the accumulator. Pop returns the accumulator that
- // was processed and the result of processing. The PopProcessFunc
- // may return an ErrRequeue{inner} and in this case Pop will (a)
- // return that (key, accumulator) association to the Queue as part
- // of the atomic processing and (b) return the inner error from
- // Pop.
- Pop(PopProcessFunc) (interface{}, error)
- // HasSynced returns true if the first batch of keys have all been
- // popped. The first batch of keys are those of the first Replace
- // operation if that happened before any Add, AddIfNotPresent,
- // Update, or Delete; otherwise the first batch is empty.
- HasSynced() bool
- // Close the queue
- Close()
- }
- // QueueWithBatch extends the Queue interface with support for batch processing.
- //
- // In addition to the standard single-item Pop method, QueueWithBatch provides
- // PopBatch, which allows multiple items to be popped and processed together as
- // a batch. This can be used to improve processing efficiency when it is
- // beneficial to handle multiple queued keys or accumulators in a single
- // operation.
- // TODO: Consider merging this interface into Queue after feature gate GA
- type QueueWithBatch interface {
- Queue
- // PopBatch behaves similarly to Queue#Pop, but processes multiple keys
- // as a batch. The implementation determines the batching strategy,
- // such as the number of keys to include per batch.
- PopBatch(ProcessBatchFunc) error
- }
- // Pop is helper function for popping from Queue.
- // WARNING: Do NOT use this function in non-test code to avoid races
- // unless you really really really really know what you are doing.
- //
- // NOTE: This function is deprecated and may be removed in the future without
- // additional warning.
- func Pop(queue Queue) interface{} {
- var result interface{}
- queue.Pop(func(obj interface{}, isInInitialList bool) error {
- result = obj
- return nil
- })
- return result
- }
- // FIFO is a Queue in which (a) each accumulator is simply the most
- // recently provided object and (b) the collection of keys to process
- // is a FIFO. The accumulators all start out empty, and deleting an
- // object from its accumulator empties the accumulator. The Resync
- // operation is a no-op.
- //
- // Thus: if multiple adds/updates of a single object happen while that
- // object's key is in the queue before it has been processed then it
- // will only be processed once, and when it is processed the most
- // recent version will be processed. This can't be done with a channel
- //
- // FIFO solves this use case:
- // - You want to process every object (exactly) once.
- // - You want to process the most recent version of the object when you process it.
- // - You do not want to process deleted objects, they should be removed from the queue.
- // - You do not want to periodically reprocess objects.
- //
- // Compare with DeltaFIFO for other use cases.
- type FIFO struct {
- lock sync.RWMutex
- cond sync.Cond
- // We depend on the property that every key in `items` is also in `queue`
- items map[string]interface{}
- queue []string
- // populated is true if the first batch of items inserted by Replace() has been populated
- // or Delete/Add/Update was called first.
- populated bool
- // initialPopulationCount is the number of items inserted by the first call of Replace()
- initialPopulationCount int
- // keyFunc is used to make the key used for queued item insertion and retrieval, and
- // should be deterministic.
- keyFunc KeyFunc
- // Indication the queue is closed.
- // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
- // Currently, not used to gate any of CRUD operations.
- closed bool
- }
- var (
- _ = Queue(&FIFO{}) // FIFO is a Queue
- )
- // Close the queue.
- func (f *FIFO) Close() {
- f.lock.Lock()
- defer f.lock.Unlock()
- f.closed = true
- f.cond.Broadcast()
- }
- // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
- // or the first batch of items inserted by Replace() has been popped.
- func (f *FIFO) HasSynced() bool {
- f.lock.Lock()
- defer f.lock.Unlock()
- return f.hasSynced_locked()
- }
- func (f *FIFO) hasSynced_locked() bool {
- return f.populated && f.initialPopulationCount == 0
- }
- // Add inserts an item, and puts it in the queue. The item is only enqueued
- // if it doesn't already exist in the set.
- func (f *FIFO) Add(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- if _, exists := f.items[id]; !exists {
- f.queue = append(f.queue, id)
- }
- f.items[id] = obj
- f.cond.Broadcast()
- return nil
- }
- // Update is the same as Add in this implementation.
- func (f *FIFO) Update(obj interface{}) error {
- return f.Add(obj)
- }
- // Delete removes an item. It doesn't add it to the queue, because
- // this implementation assumes the consumer only cares about the objects,
- // not the order in which they were created/added.
- func (f *FIFO) Delete(obj interface{}) error {
- id, err := f.keyFunc(obj)
- if err != nil {
- return KeyError{obj, err}
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- f.populated = true
- delete(f.items, id)
- return err
- }
- // IsClosed checks if the queue is closed
- func (f *FIFO) IsClosed() bool {
- f.lock.Lock()
- defer f.lock.Unlock()
- return f.closed
- }
- // Pop waits until an item is ready and processes it. If multiple items are
- // ready, they are returned in the order in which they were added/updated.
- // The item is removed from the queue (and the store) before it is processed,
- // so if you don't successfully process it, it should be added back with
- // AddIfNotPresent(). process function is called under lock, so it is safe
- // update data structures in it that need to be in sync with the queue.
- func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
- f.lock.Lock()
- defer f.lock.Unlock()
- for {
- for len(f.queue) == 0 {
- // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
- // When Close() is called, the f.closed is set and the condition is broadcasted.
- // Which causes this loop to continue and return from the Pop().
- if f.closed {
- return nil, ErrFIFOClosed
- }
- f.cond.Wait()
- }
- isInInitialList := !f.hasSynced_locked()
- id := f.queue[0]
- f.queue = f.queue[1:]
- if f.initialPopulationCount > 0 {
- f.initialPopulationCount--
- }
- item, ok := f.items[id]
- if !ok {
- // Item may have been deleted subsequently.
- continue
- }
- delete(f.items, id)
- err := process(item, isInInitialList)
- return item, err
- }
- }
- // Replace will delete the contents of 'f', using instead the given map.
- // 'f' takes ownership of the map, you should not reference the map again
- // after calling this function. f's queue is reset, too; upon return, it
- // will contain the items in the map, in no particular order.
- func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
- items := make(map[string]interface{}, len(list))
- for _, item := range list {
- key, err := f.keyFunc(item)
- if err != nil {
- return KeyError{item, err}
- }
- items[key] = item
- }
- f.lock.Lock()
- defer f.lock.Unlock()
- if !f.populated {
- f.populated = true
- f.initialPopulationCount = len(items)
- }
- f.items = items
- f.queue = f.queue[:0]
- for id := range items {
- f.queue = append(f.queue, id)
- }
- if len(f.queue) > 0 {
- f.cond.Broadcast()
- }
- return nil
- }
- // Resync will ensure that every object in the Store has its key in the queue.
- // This should be a no-op, because that property is maintained by all operations.
- func (f *FIFO) Resync() error {
- f.lock.Lock()
- defer f.lock.Unlock()
- inQueue := sets.NewString()
- for _, id := range f.queue {
- inQueue.Insert(id)
- }
- for id := range f.items {
- if !inQueue.Has(id) {
- f.queue = append(f.queue, id)
- }
- }
- if len(f.queue) > 0 {
- f.cond.Broadcast()
- }
- return nil
- }
- // NewFIFO returns a Store which can be used to queue up items to
- // process.
- func NewFIFO(keyFunc KeyFunc) *FIFO {
- f := &FIFO{
- items: map[string]interface{}{},
- queue: []string{},
- keyFunc: keyFunc,
- }
- f.cond.L = &f.lock
- return f
- }
|