delta_fifo.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "k8s.io/klog/v2"
  20. )
  21. // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
  22. // optional.
  23. type DeltaFIFOOptions struct {
  24. // KeyFunction is used to figure out what key an object should have. (It's
  25. // exposed in the returned DeltaFIFO's KeyOf() method, with additional
  26. // handling around deleted objects and queue state).
  27. // Optional, the default is MetaNamespaceKeyFunc.
  28. KeyFunction KeyFunc
  29. // KnownObjects is expected to return a list of keys that the consumer of
  30. // this queue "knows about". It is used to decide which items are missing
  31. // when Replace() is called; 'Deleted' deltas are produced for the missing items.
  32. // KnownObjects may be nil if you can tolerate missing deletions on Replace().
  33. KnownObjects KeyListerGetter
  34. // EmitDeltaTypeReplaced indicates that the queue consumer
  35. // understands the Replaced DeltaType. Before the `Replaced` event type was
  36. // added, calls to Replace() were handled the same as Sync(). For
  37. // backwards-compatibility purposes, this is false by default.
  38. // When true, `Replaced` events will be sent for items passed to a Replace() call.
  39. // When false, `Sync` events will be sent instead.
  40. EmitDeltaTypeReplaced bool
  41. }
  42. // DeltaFIFO is like FIFO, but differs in two ways. One is that the
  43. // accumulator associated with a given object's key is not that object
  44. // but rather a Deltas, which is a slice of Delta values for that
  45. // object. Applying an object to a Deltas means to append a Delta
  46. // except when the potentially appended Delta is a Deleted and the
  47. // Deltas already ends with a Deleted. In that case the Deltas does
  48. // not grow, although the terminal Deleted will be replaced by the new
  49. // Deleted if the older Deleted's object is a
  50. // DeletedFinalStateUnknown.
  51. //
  52. // The other difference is that DeltaFIFO has two additional ways that
  53. // an object can be applied to an accumulator: Replaced and Sync.
  54. // If EmitDeltaTypeReplaced is not set to true, Sync will be used in
  55. // replace events for backwards compatibility. Sync is used for periodic
  56. // resync events.
  57. //
  58. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  59. // intended to be the producer, and the consumer is whatever calls
  60. // the Pop() method.
  61. //
  62. // DeltaFIFO solves this use case:
  63. // * You want to process every object change (delta) at most once.
  64. // * When you process an object, you want to see everything
  65. // that's happened to it since you last processed it.
  66. // * You want to process the deletion of some of the objects.
  67. // * You might want to periodically reprocess objects.
  68. //
  69. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  70. // interface{} to satisfy the Store/Queue interfaces, but they
  71. // will always return an object of type Deltas. List() returns
  72. // the newest object from each accumulator in the FIFO.
  73. //
  74. // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
  75. // to list Store keys and to get objects by Store key. The objects in
  76. // question are called "known objects" and this set of objects
  77. // modifies the behavior of the Delete, Replace, and Resync methods
  78. // (each in a different way).
  79. //
  80. // A note on threading: If you call Pop() in parallel from multiple
  81. // threads, you could end up with multiple threads processing slightly
  82. // different versions of the same object.
  83. type DeltaFIFO struct {
  84. // lock/cond protects access to 'items' and 'queue'.
  85. lock sync.RWMutex
  86. cond sync.Cond
  87. // `items` maps a key to a Deltas.
  88. // Each such Deltas has at least one Delta.
  89. items map[string]Deltas
  90. // `queue` maintains FIFO order of keys for consumption in Pop().
  91. // There are no duplicates in `queue`.
  92. // A key is in `queue` if and only if it is in `items`.
  93. queue []string
  94. // populated is true if the first batch of items inserted by Replace() has been populated
  95. // or Delete/Add/Update/AddIfNotPresent was called first.
  96. populated bool
  97. // initialPopulationCount is the number of items inserted by the first call of Replace()
  98. initialPopulationCount int
  99. // keyFunc is used to make the key used for queued item
  100. // insertion and retrieval, and should be deterministic.
  101. keyFunc KeyFunc
  102. // knownObjects list keys that are "known" --- affecting Delete(),
  103. // Replace(), and Resync()
  104. knownObjects KeyListerGetter
  105. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  106. // Currently, not used to gate any of CRED operations.
  107. closed bool
  108. // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
  109. // DeltaType when Replace() is called (to preserve backwards compat).
  110. emitDeltaTypeReplaced bool
  111. }
  112. // DeltaType is the type of a change (addition, deletion, etc)
  113. type DeltaType string
  114. // Change type definition
  115. const (
  116. Added DeltaType = "Added"
  117. Updated DeltaType = "Updated"
  118. Deleted DeltaType = "Deleted"
  119. // Replaced is emitted when we encountered watch errors and had to do a
  120. // relist. We don't know if the replaced object has changed.
  121. //
  122. // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
  123. // as well. Hence, Replaced is only emitted when the option
  124. // EmitDeltaTypeReplaced is true.
  125. Replaced DeltaType = "Replaced"
  126. // Sync is for synthetic events during a periodic resync.
  127. Sync DeltaType = "Sync"
  128. )
  129. // Delta is a member of Deltas (a list of Delta objects) which
  130. // in its turn is the type stored by a DeltaFIFO. It tells you what
  131. // change happened, and the object's state after* that change.
  132. //
  133. // [*] Unless the change is a deletion, and then you'll get the final
  134. // state of the object before it was deleted.
  135. type Delta struct {
  136. Type DeltaType
  137. Object interface{}
  138. }
  139. // Deltas is a list of one or more 'Delta's to an individual object.
  140. // The oldest delta is at index 0, the newest delta is the last one.
  141. type Deltas []Delta
  142. // NewDeltaFIFO returns a Queue which can be used to process changes to items.
  143. //
  144. // keyFunc is used to figure out what key an object should have. (It is
  145. // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
  146. // around deleted objects and queue state).
  147. //
  148. // 'knownObjects' may be supplied to modify the behavior of Delete,
  149. // Replace, and Resync. It may be nil if you do not need those
  150. // modifications.
  151. //
  152. // TODO: consider merging keyLister with this object, tracking a list of
  153. // "known" keys when Pop() is called. Have to think about how that
  154. // affects error retrying.
  155. // NOTE: It is possible to misuse this and cause a race when using an
  156. // external known object source.
  157. // Whether there is a potential race depends on how the consumer
  158. // modifies knownObjects. In Pop(), process function is called under
  159. // lock, so it is safe to update data structures in it that need to be
  160. // in sync with the queue (e.g. knownObjects).
  161. //
  162. // Example:
  163. // In case of sharedIndexInformer being a consumer
  164. // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
  165. // src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
  166. // there is no race as knownObjects (s.indexer) is modified safely
  167. // under DeltaFIFO's lock. The only exceptions are GetStore() and
  168. // GetIndexer() methods, which expose ways to modify the underlying
  169. // storage. Currently these two methods are used for creating Lister
  170. // and internal tests.
  171. //
  172. // Also see the comment on DeltaFIFO.
  173. //
  174. // Warning: This constructs a DeltaFIFO that does not differentiate between
  175. // events caused by a call to Replace (e.g., from a relist, which may
  176. // contain object updates), and synthetic events caused by a periodic resync
  177. // (which just emit the existing object). See https://issue.k8s.io/86015 for details.
  178. //
  179. // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
  180. // instead to receive a `Replaced` event depending on the type.
  181. //
  182. // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
  183. func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  184. return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  185. KeyFunction: keyFunc,
  186. KnownObjects: knownObjects,
  187. })
  188. }
  189. // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
  190. // items. See also the comment on DeltaFIFO.
  191. func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
  192. if opts.KeyFunction == nil {
  193. opts.KeyFunction = MetaNamespaceKeyFunc
  194. }
  195. f := &DeltaFIFO{
  196. items: map[string]Deltas{},
  197. queue: []string{},
  198. keyFunc: opts.KeyFunction,
  199. knownObjects: opts.KnownObjects,
  200. emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
  201. }
  202. f.cond.L = &f.lock
  203. return f
  204. }
  205. var (
  206. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  207. )
  208. var (
  209. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  210. // object with zero length is encountered (should be impossible,
  211. // but included for completeness).
  212. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  213. )
  214. // Close the queue.
  215. func (f *DeltaFIFO) Close() {
  216. f.lock.Lock()
  217. defer f.lock.Unlock()
  218. f.closed = true
  219. f.cond.Broadcast()
  220. }
  221. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  222. // DeletedFinalStateUnknown objects.
  223. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  224. if d, ok := obj.(Deltas); ok {
  225. if len(d) == 0 {
  226. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  227. }
  228. obj = d.Newest().Object
  229. }
  230. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  231. return d.Key, nil
  232. }
  233. return f.keyFunc(obj)
  234. }
  235. // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
  236. // or the first batch of items inserted by Replace() has been popped.
  237. func (f *DeltaFIFO) HasSynced() bool {
  238. f.lock.Lock()
  239. defer f.lock.Unlock()
  240. return f.populated && f.initialPopulationCount == 0
  241. }
  242. // Add inserts an item, and puts it in the queue. The item is only enqueued
  243. // if it doesn't already exist in the set.
  244. func (f *DeltaFIFO) Add(obj interface{}) error {
  245. f.lock.Lock()
  246. defer f.lock.Unlock()
  247. f.populated = true
  248. return f.queueActionLocked(Added, obj)
  249. }
  250. // Update is just like Add, but makes an Updated Delta.
  251. func (f *DeltaFIFO) Update(obj interface{}) error {
  252. f.lock.Lock()
  253. defer f.lock.Unlock()
  254. f.populated = true
  255. return f.queueActionLocked(Updated, obj)
  256. }
  257. // Delete is just like Add, but makes a Deleted Delta. If the given
  258. // object does not already exist, it will be ignored. (It may have
  259. // already been deleted by a Replace (re-list), for example.) In this
  260. // method `f.knownObjects`, if not nil, provides (via GetByKey)
  261. // _additional_ objects that are considered to already exist.
  262. func (f *DeltaFIFO) Delete(obj interface{}) error {
  263. id, err := f.KeyOf(obj)
  264. if err != nil {
  265. return KeyError{obj, err}
  266. }
  267. f.lock.Lock()
  268. defer f.lock.Unlock()
  269. f.populated = true
  270. if f.knownObjects == nil {
  271. if _, exists := f.items[id]; !exists {
  272. // Presumably, this was deleted when a relist happened.
  273. // Don't provide a second report of the same deletion.
  274. return nil
  275. }
  276. } else {
  277. // We only want to skip the "deletion" action if the object doesn't
  278. // exist in knownObjects and it doesn't have corresponding item in items.
  279. // Note that even if there is a "deletion" action in items, we can ignore it,
  280. // because it will be deduped automatically in "queueActionLocked"
  281. _, exists, err := f.knownObjects.GetByKey(id)
  282. _, itemsExist := f.items[id]
  283. if err == nil && !exists && !itemsExist {
  284. // Presumably, this was deleted when a relist happened.
  285. // Don't provide a second report of the same deletion.
  286. return nil
  287. }
  288. }
  289. // exist in items and/or KnownObjects
  290. return f.queueActionLocked(Deleted, obj)
  291. }
  292. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  293. // present in the set, it is neither enqueued nor added to the set.
  294. //
  295. // This is useful in a single producer/consumer scenario so that the consumer can
  296. // safely retry items without contending with the producer and potentially enqueueing
  297. // stale items.
  298. //
  299. // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
  300. // different from the Add/Update/Delete functions.
  301. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
  302. deltas, ok := obj.(Deltas)
  303. if !ok {
  304. return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
  305. }
  306. id, err := f.KeyOf(deltas.Newest().Object)
  307. if err != nil {
  308. return KeyError{obj, err}
  309. }
  310. f.lock.Lock()
  311. defer f.lock.Unlock()
  312. f.addIfNotPresent(id, deltas)
  313. return nil
  314. }
  315. // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
  316. // already holds the fifo lock.
  317. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
  318. f.populated = true
  319. if _, exists := f.items[id]; exists {
  320. return
  321. }
  322. f.queue = append(f.queue, id)
  323. f.items[id] = deltas
  324. f.cond.Broadcast()
  325. }
  326. // re-listing and watching can deliver the same update multiple times in any
  327. // order. This will combine the most recent two deltas if they are the same.
  328. func dedupDeltas(deltas Deltas) Deltas {
  329. n := len(deltas)
  330. if n < 2 {
  331. return deltas
  332. }
  333. a := &deltas[n-1]
  334. b := &deltas[n-2]
  335. if out := isDup(a, b); out != nil {
  336. // `a` and `b` are duplicates. Only keep the one returned from isDup().
  337. // TODO: This extra array allocation and copy seems unnecessary if
  338. // all we do to dedup is compare the new delta with the last element
  339. // in `items`, which could be done by mutating `items` directly.
  340. // Might be worth profiling and investigating if it is safe to optimize.
  341. d := append(Deltas{}, deltas[:n-2]...)
  342. return append(d, *out)
  343. }
  344. return deltas
  345. }
  346. // If a & b represent the same event, returns the delta that ought to be kept.
  347. // Otherwise, returns nil.
  348. // TODO: is there anything other than deletions that need deduping?
  349. func isDup(a, b *Delta) *Delta {
  350. if out := isDeletionDup(a, b); out != nil {
  351. return out
  352. }
  353. // TODO: Detect other duplicate situations? Are there any?
  354. return nil
  355. }
  356. // keep the one with the most information if both are deletions.
  357. func isDeletionDup(a, b *Delta) *Delta {
  358. if b.Type != Deleted || a.Type != Deleted {
  359. return nil
  360. }
  361. // Do more sophisticated checks, or is this sufficient?
  362. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  363. return a
  364. }
  365. return b
  366. }
  367. // queueActionLocked appends to the delta list for the object.
  368. // Caller must lock first.
  369. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  370. id, err := f.KeyOf(obj)
  371. if err != nil {
  372. return KeyError{obj, err}
  373. }
  374. oldDeltas := f.items[id]
  375. newDeltas := append(oldDeltas, Delta{actionType, obj})
  376. newDeltas = dedupDeltas(newDeltas)
  377. if len(newDeltas) > 0 {
  378. if _, exists := f.items[id]; !exists {
  379. f.queue = append(f.queue, id)
  380. }
  381. f.items[id] = newDeltas
  382. f.cond.Broadcast()
  383. } else {
  384. // This never happens, because dedupDeltas never returns an empty list
  385. // when given a non-empty list (as it is here).
  386. // If somehow it happens anyway, deal with it but complain.
  387. if oldDeltas == nil {
  388. klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
  389. return nil
  390. }
  391. klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
  392. f.items[id] = newDeltas
  393. return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
  394. }
  395. return nil
  396. }
  397. // List returns a list of all the items; it returns the object
  398. // from the most recent Delta.
  399. // You should treat the items returned inside the deltas as immutable.
  400. func (f *DeltaFIFO) List() []interface{} {
  401. f.lock.RLock()
  402. defer f.lock.RUnlock()
  403. return f.listLocked()
  404. }
  405. func (f *DeltaFIFO) listLocked() []interface{} {
  406. list := make([]interface{}, 0, len(f.items))
  407. for _, item := range f.items {
  408. list = append(list, item.Newest().Object)
  409. }
  410. return list
  411. }
  412. // ListKeys returns a list of all the keys of the objects currently
  413. // in the FIFO.
  414. func (f *DeltaFIFO) ListKeys() []string {
  415. f.lock.RLock()
  416. defer f.lock.RUnlock()
  417. list := make([]string, 0, len(f.items))
  418. for key := range f.items {
  419. list = append(list, key)
  420. }
  421. return list
  422. }
  423. // Get returns the complete list of deltas for the requested item,
  424. // or sets exists=false.
  425. // You should treat the items returned inside the deltas as immutable.
  426. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  427. key, err := f.KeyOf(obj)
  428. if err != nil {
  429. return nil, false, KeyError{obj, err}
  430. }
  431. return f.GetByKey(key)
  432. }
  433. // GetByKey returns the complete list of deltas for the requested item,
  434. // setting exists=false if that list is empty.
  435. // You should treat the items returned inside the deltas as immutable.
  436. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  437. f.lock.RLock()
  438. defer f.lock.RUnlock()
  439. d, exists := f.items[key]
  440. if exists {
  441. // Copy item's slice so operations on this slice
  442. // won't interfere with the object we return.
  443. d = copyDeltas(d)
  444. }
  445. return d, exists, nil
  446. }
  447. // IsClosed checks if the queue is closed
  448. func (f *DeltaFIFO) IsClosed() bool {
  449. f.lock.Lock()
  450. defer f.lock.Unlock()
  451. return f.closed
  452. }
  453. // Pop blocks until the queue has some items, and then returns one. If
  454. // multiple items are ready, they are returned in the order in which they were
  455. // added/updated. The item is removed from the queue (and the store) before it
  456. // is returned, so if you don't successfully process it, you need to add it back
  457. // with AddIfNotPresent().
  458. // process function is called under lock, so it is safe to update data structures
  459. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  460. // may return an instance of ErrRequeue with a nested error to indicate the current
  461. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  462. // process should avoid expensive I/O operation so that other queue operations, i.e.
  463. // Add() and Get(), won't be blocked for too long.
  464. //
  465. // Pop returns a 'Deltas', which has a complete list of all the things
  466. // that happened to the object (deltas) while it was sitting in the queue.
  467. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  468. f.lock.Lock()
  469. defer f.lock.Unlock()
  470. for {
  471. for len(f.queue) == 0 {
  472. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  473. // When Close() is called, the f.closed is set and the condition is broadcasted.
  474. // Which causes this loop to continue and return from the Pop().
  475. if f.closed {
  476. return nil, ErrFIFOClosed
  477. }
  478. f.cond.Wait()
  479. }
  480. id := f.queue[0]
  481. f.queue = f.queue[1:]
  482. if f.initialPopulationCount > 0 {
  483. f.initialPopulationCount--
  484. }
  485. item, ok := f.items[id]
  486. if !ok {
  487. // This should never happen
  488. klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
  489. continue
  490. }
  491. delete(f.items, id)
  492. err := process(item)
  493. if e, ok := err.(ErrRequeue); ok {
  494. f.addIfNotPresent(id, item)
  495. err = e.Err
  496. }
  497. // Don't need to copyDeltas here, because we're transferring
  498. // ownership to the caller.
  499. return item, err
  500. }
  501. }
  502. // Replace atomically does two things: (1) it adds the given objects
  503. // using the Sync or Replace DeltaType and then (2) it does some deletions.
  504. // In particular: for every pre-existing key K that is not the key of
  505. // an object in `list` there is the effect of
  506. // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
  507. // of K. If `f.knownObjects == nil` then the pre-existing keys are
  508. // those in `f.items` and the current object of K is the `.Newest()`
  509. // of the Deltas associated with K. Otherwise the pre-existing keys
  510. // are those listed by `f.knownObjects` and the current object of K is
  511. // what `f.knownObjects.GetByKey(K)` returns.
  512. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
  513. f.lock.Lock()
  514. defer f.lock.Unlock()
  515. keys := make(sets.String, len(list))
  516. // keep backwards compat for old clients
  517. action := Sync
  518. if f.emitDeltaTypeReplaced {
  519. action = Replaced
  520. }
  521. // Add Sync/Replaced action for each new item.
  522. for _, item := range list {
  523. key, err := f.KeyOf(item)
  524. if err != nil {
  525. return KeyError{item, err}
  526. }
  527. keys.Insert(key)
  528. if err := f.queueActionLocked(action, item); err != nil {
  529. return fmt.Errorf("couldn't enqueue object: %v", err)
  530. }
  531. }
  532. if f.knownObjects == nil {
  533. // Do deletion detection against our own list.
  534. queuedDeletions := 0
  535. for k, oldItem := range f.items {
  536. if keys.Has(k) {
  537. continue
  538. }
  539. // Delete pre-existing items not in the new list.
  540. // This could happen if watch deletion event was missed while
  541. // disconnected from apiserver.
  542. var deletedObj interface{}
  543. if n := oldItem.Newest(); n != nil {
  544. deletedObj = n.Object
  545. }
  546. queuedDeletions++
  547. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  548. return err
  549. }
  550. }
  551. if !f.populated {
  552. f.populated = true
  553. // While there shouldn't be any queued deletions in the initial
  554. // population of the queue, it's better to be on the safe side.
  555. f.initialPopulationCount = keys.Len() + queuedDeletions
  556. }
  557. return nil
  558. }
  559. // Detect deletions not already in the queue.
  560. knownKeys := f.knownObjects.ListKeys()
  561. queuedDeletions := 0
  562. for _, k := range knownKeys {
  563. if keys.Has(k) {
  564. continue
  565. }
  566. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  567. if err != nil {
  568. deletedObj = nil
  569. klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
  570. } else if !exists {
  571. deletedObj = nil
  572. klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
  573. }
  574. queuedDeletions++
  575. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  576. return err
  577. }
  578. }
  579. if !f.populated {
  580. f.populated = true
  581. f.initialPopulationCount = keys.Len() + queuedDeletions
  582. }
  583. return nil
  584. }
  585. // Resync adds, with a Sync type of Delta, every object listed by
  586. // `f.knownObjects` whose key is not already queued for processing.
  587. // If `f.knownObjects` is `nil` then Resync does nothing.
  588. func (f *DeltaFIFO) Resync() error {
  589. f.lock.Lock()
  590. defer f.lock.Unlock()
  591. if f.knownObjects == nil {
  592. return nil
  593. }
  594. keys := f.knownObjects.ListKeys()
  595. for _, k := range keys {
  596. if err := f.syncKeyLocked(k); err != nil {
  597. return err
  598. }
  599. }
  600. return nil
  601. }
  602. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  603. obj, exists, err := f.knownObjects.GetByKey(key)
  604. if err != nil {
  605. klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  606. return nil
  607. } else if !exists {
  608. klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  609. return nil
  610. }
  611. // If we are doing Resync() and there is already an event queued for that object,
  612. // we ignore the Resync for it. This is to avoid the race, in which the resync
  613. // comes with the previous value of object (since queueing an event for the object
  614. // doesn't trigger changing the underlying store <knownObjects>.
  615. id, err := f.KeyOf(obj)
  616. if err != nil {
  617. return KeyError{obj, err}
  618. }
  619. if len(f.items[id]) > 0 {
  620. return nil
  621. }
  622. if err := f.queueActionLocked(Sync, obj); err != nil {
  623. return fmt.Errorf("couldn't queue object: %v", err)
  624. }
  625. return nil
  626. }
  627. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  628. type KeyListerGetter interface {
  629. KeyLister
  630. KeyGetter
  631. }
  632. // A KeyLister is anything that knows how to list its keys.
  633. type KeyLister interface {
  634. ListKeys() []string
  635. }
  636. // A KeyGetter is anything that knows how to get the value stored under a given key.
  637. type KeyGetter interface {
  638. // GetByKey returns the value associated with the key, or sets exists=false.
  639. GetByKey(key string) (value interface{}, exists bool, err error)
  640. }
  641. // Oldest is a convenience function that returns the oldest delta, or
  642. // nil if there are no deltas.
  643. func (d Deltas) Oldest() *Delta {
  644. if len(d) > 0 {
  645. return &d[0]
  646. }
  647. return nil
  648. }
  649. // Newest is a convenience function that returns the newest delta, or
  650. // nil if there are no deltas.
  651. func (d Deltas) Newest() *Delta {
  652. if n := len(d); n > 0 {
  653. return &d[n-1]
  654. }
  655. return nil
  656. }
  657. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  658. // the objects in the slice. This allows Get/List to return an object that we
  659. // know won't be clobbered by a subsequent modifications.
  660. func copyDeltas(d Deltas) Deltas {
  661. d2 := make(Deltas, len(d))
  662. copy(d2, d)
  663. return d2
  664. }
  665. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
  666. // was deleted but the watch deletion event was missed while disconnected from
  667. // apiserver. In this case we don't know the final "resting" state of the object, so
  668. // there's a chance the included `Obj` is stale.
  669. type DeletedFinalStateUnknown struct {
  670. Key string
  671. Obj interface{}
  672. }