delta_fifo.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "time"
  19. "k8s.io/apimachinery/pkg/util/sets"
  20. "k8s.io/klog/v2"
  21. utiltrace "k8s.io/utils/trace"
  22. )
  23. // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
  24. // optional.
  25. type DeltaFIFOOptions struct {
  26. // KeyFunction is used to figure out what key an object should have. (It's
  27. // exposed in the returned DeltaFIFO's KeyOf() method, with additional
  28. // handling around deleted objects and queue state).
  29. // Optional, the default is MetaNamespaceKeyFunc.
  30. KeyFunction KeyFunc
  31. // KnownObjects is expected to return a list of keys that the consumer of
  32. // this queue "knows about". It is used to decide which items are missing
  33. // when Replace() is called; 'Deleted' deltas are produced for the missing items.
  34. // KnownObjects may be nil if you can tolerate missing deletions on Replace().
  35. KnownObjects KeyListerGetter
  36. // EmitDeltaTypeReplaced indicates that the queue consumer
  37. // understands the Replaced DeltaType. Before the `Replaced` event type was
  38. // added, calls to Replace() were handled the same as Sync(). For
  39. // backwards-compatibility purposes, this is false by default.
  40. // When true, `Replaced` events will be sent for items passed to a Replace() call.
  41. // When false, `Sync` events will be sent instead.
  42. EmitDeltaTypeReplaced bool
  43. // If set, will be called for objects before enqueueing them. Please
  44. // see the comment on TransformFunc for details.
  45. Transformer TransformFunc
  46. // If set, log output will go to this logger instead of klog.Background().
  47. Logger *klog.Logger
  48. }
  49. // DeltaFIFO is like FIFO, but differs in two ways. One is that the
  50. // accumulator associated with a given object's key is not that object
  51. // but rather a Deltas, which is a slice of Delta values for that
  52. // object. Applying an object to a Deltas means to append a Delta
  53. // except when the potentially appended Delta is a Deleted and the
  54. // Deltas already ends with a Deleted. In that case the Deltas does
  55. // not grow, although the terminal Deleted will be replaced by the new
  56. // Deleted if the older Deleted's object is a
  57. // DeletedFinalStateUnknown.
  58. //
  59. // The other difference is that DeltaFIFO has two additional ways that
  60. // an object can be applied to an accumulator: Replaced and Sync.
  61. // If EmitDeltaTypeReplaced is not set to true, Sync will be used in
  62. // replace events for backwards compatibility. Sync is used for periodic
  63. // resync events.
  64. //
  65. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  66. // intended to be the producer, and the consumer is whatever calls
  67. // the Pop() method.
  68. //
  69. // DeltaFIFO solves this use case:
  70. // - You want to process every object change (delta) at most once.
  71. // - When you process an object, you want to see everything
  72. // that's happened to it since you last processed it.
  73. // - You want to process the deletion of some of the objects.
  74. // - You might want to periodically reprocess objects.
  75. //
  76. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  77. // interface{} to satisfy the Store/Queue interfaces, but they
  78. // will always return an object of type Deltas. List() returns
  79. // the newest object from each accumulator in the FIFO.
  80. //
  81. // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
  82. // to list Store keys and to get objects by Store key. The objects in
  83. // question are called "known objects" and this set of objects
  84. // modifies the behavior of the Delete, Replace, and Resync methods
  85. // (each in a different way).
  86. //
  87. // A note on threading: If you call Pop() in parallel from multiple
  88. // threads, you could end up with multiple threads processing slightly
  89. // different versions of the same object.
  90. type DeltaFIFO struct {
  91. // lock/cond protects access to 'items' and 'queue'.
  92. lock sync.RWMutex
  93. cond sync.Cond
  94. // `items` maps a key to a Deltas.
  95. // Each such Deltas has at least one Delta.
  96. items map[string]Deltas
  97. // `queue` maintains FIFO order of keys for consumption in Pop().
  98. // There are no duplicates in `queue`.
  99. // A key is in `queue` if and only if it is in `items`.
  100. queue []string
  101. // populated is true if the first batch of items inserted by Replace() has been populated
  102. // or Delete/Add/Update/AddIfNotPresent was called first.
  103. populated bool
  104. // initialPopulationCount is the number of items inserted by the first call of Replace()
  105. initialPopulationCount int
  106. // keyFunc is used to make the key used for queued item
  107. // insertion and retrieval, and should be deterministic.
  108. keyFunc KeyFunc
  109. // knownObjects list keys that are "known" --- affecting Delete(),
  110. // Replace(), and Resync()
  111. knownObjects KeyListerGetter
  112. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  113. // Currently, not used to gate any of CRUD operations.
  114. closed bool
  115. // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
  116. // DeltaType when Replace() is called (to preserve backwards compat).
  117. emitDeltaTypeReplaced bool
  118. // Called with every object if non-nil.
  119. transformer TransformFunc
  120. // logger is a per-instance logger. This gets chosen when constructing
  121. // the instance, with klog.Background() as default.
  122. logger klog.Logger
  123. }
  124. // TransformFunc allows for transforming an object before it will be processed.
  125. //
  126. // The most common usage pattern is to clean-up some parts of the object to
  127. // reduce component memory usage if a given component doesn't care about them.
  128. //
  129. // New in v1.27: TransformFunc sees the object before any other actor, and it
  130. // is now safe to mutate the object in place instead of making a copy.
  131. //
  132. // It's recommended for the TransformFunc to be idempotent.
  133. // It MUST be idempotent if objects already present in the cache are passed to
  134. // the Replace() to avoid re-mutating them. Default informers do not pass
  135. // existing objects to Replace though.
  136. //
  137. // Note that TransformFunc is called while inserting objects into the
  138. // notification queue and is therefore extremely performance sensitive; please
  139. // do not do anything that will take a long time.
  140. type TransformFunc func(interface{}) (interface{}, error)
  141. // DeltaType is the type of a change (addition, deletion, etc)
  142. type DeltaType string
  143. // Change type definition
  144. const (
  145. Added DeltaType = "Added"
  146. Updated DeltaType = "Updated"
  147. Deleted DeltaType = "Deleted"
  148. // Replaced is emitted when we encountered watch errors and had to do a
  149. // relist. We don't know if the replaced object has changed.
  150. //
  151. // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
  152. // as well. Hence, Replaced is only emitted when the option
  153. // EmitDeltaTypeReplaced is true.
  154. Replaced DeltaType = "Replaced"
  155. // Sync is for synthetic events during a periodic resync.
  156. Sync DeltaType = "Sync"
  157. )
  158. // Delta is a member of Deltas (a list of Delta objects) which
  159. // in its turn is the type stored by a DeltaFIFO. It tells you what
  160. // change happened, and the object's state after* that change.
  161. //
  162. // [*] Unless the change is a deletion, and then you'll get the final
  163. // state of the object before it was deleted.
  164. type Delta struct {
  165. Type DeltaType
  166. Object interface{}
  167. }
  168. // Deltas is a list of one or more 'Delta's to an individual object.
  169. // The oldest delta is at index 0, the newest delta is the last one.
  170. type Deltas []Delta
  171. // NewDeltaFIFO returns a Queue which can be used to process changes to items.
  172. //
  173. // keyFunc is used to figure out what key an object should have. (It is
  174. // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
  175. // around deleted objects and queue state).
  176. //
  177. // 'knownObjects' may be supplied to modify the behavior of Delete,
  178. // Replace, and Resync. It may be nil if you do not need those
  179. // modifications.
  180. //
  181. // TODO: consider merging keyLister with this object, tracking a list of
  182. // "known" keys when Pop() is called. Have to think about how that
  183. // affects error retrying.
  184. //
  185. // NOTE: It is possible to misuse this and cause a race when using an
  186. // external known object source.
  187. // Whether there is a potential race depends on how the consumer
  188. // modifies knownObjects. In Pop(), process function is called under
  189. // lock, so it is safe to update data structures in it that need to be
  190. // in sync with the queue (e.g. knownObjects).
  191. //
  192. // Example:
  193. // In case of sharedIndexInformer being a consumer
  194. // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
  195. // there is no race as knownObjects (s.indexer) is modified safely
  196. // under DeltaFIFO's lock. The only exceptions are GetStore() and
  197. // GetIndexer() methods, which expose ways to modify the underlying
  198. // storage. Currently these two methods are used for creating Lister
  199. // and internal tests.
  200. //
  201. // Also see the comment on DeltaFIFO.
  202. //
  203. // Warning: This constructs a DeltaFIFO that does not differentiate between
  204. // events caused by a call to Replace (e.g., from a relist, which may
  205. // contain object updates), and synthetic events caused by a periodic resync
  206. // (which just emit the existing object). See https://issue.k8s.io/86015 for details.
  207. //
  208. // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
  209. // instead to receive a `Replaced` event depending on the type.
  210. //
  211. // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
  212. func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  213. return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  214. KeyFunction: keyFunc,
  215. KnownObjects: knownObjects,
  216. })
  217. }
  218. // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
  219. // items. See also the comment on DeltaFIFO.
  220. func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
  221. if opts.KeyFunction == nil {
  222. opts.KeyFunction = MetaNamespaceKeyFunc
  223. }
  224. f := &DeltaFIFO{
  225. items: map[string]Deltas{},
  226. queue: []string{},
  227. keyFunc: opts.KeyFunction,
  228. knownObjects: opts.KnownObjects,
  229. emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
  230. transformer: opts.Transformer,
  231. logger: klog.Background(),
  232. }
  233. if opts.Logger != nil {
  234. f.logger = *opts.Logger
  235. }
  236. f.cond.L = &f.lock
  237. return f
  238. }
  239. var (
  240. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  241. _ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
  242. )
  243. var (
  244. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  245. // object with zero length is encountered (should be impossible,
  246. // but included for completeness).
  247. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  248. )
  249. // Close the queue.
  250. func (f *DeltaFIFO) Close() {
  251. f.lock.Lock()
  252. defer f.lock.Unlock()
  253. f.closed = true
  254. f.cond.Broadcast()
  255. }
  256. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  257. // DeletedFinalStateUnknown objects.
  258. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  259. if d, ok := obj.(Deltas); ok {
  260. if len(d) == 0 {
  261. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  262. }
  263. obj = d.Newest().Object
  264. }
  265. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  266. return d.Key, nil
  267. }
  268. return f.keyFunc(obj)
  269. }
  270. // Transformer implements the TransformingStore interface.
  271. func (f *DeltaFIFO) Transformer() TransformFunc {
  272. return f.transformer
  273. }
  274. // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
  275. // or the first batch of items inserted by Replace() has been popped.
  276. func (f *DeltaFIFO) HasSynced() bool {
  277. f.lock.Lock()
  278. defer f.lock.Unlock()
  279. return f.hasSynced_locked()
  280. }
  281. func (f *DeltaFIFO) hasSynced_locked() bool {
  282. return f.populated && f.initialPopulationCount == 0
  283. }
  284. // Add inserts an item, and puts it in the queue. The item is only enqueued
  285. // if it doesn't already exist in the set.
  286. func (f *DeltaFIFO) Add(obj interface{}) error {
  287. f.lock.Lock()
  288. defer f.lock.Unlock()
  289. f.populated = true
  290. return f.queueActionLocked(Added, obj)
  291. }
  292. // Update is just like Add, but makes an Updated Delta.
  293. func (f *DeltaFIFO) Update(obj interface{}) error {
  294. f.lock.Lock()
  295. defer f.lock.Unlock()
  296. f.populated = true
  297. return f.queueActionLocked(Updated, obj)
  298. }
  299. // Delete is just like Add, but makes a Deleted Delta. If the given
  300. // object does not already exist, it will be ignored. (It may have
  301. // already been deleted by a Replace (re-list), for example.) In this
  302. // method `f.knownObjects`, if not nil, provides (via GetByKey)
  303. // _additional_ objects that are considered to already exist.
  304. func (f *DeltaFIFO) Delete(obj interface{}) error {
  305. id, err := f.KeyOf(obj)
  306. if err != nil {
  307. return KeyError{obj, err}
  308. }
  309. f.lock.Lock()
  310. defer f.lock.Unlock()
  311. f.populated = true
  312. if f.knownObjects == nil {
  313. if _, exists := f.items[id]; !exists {
  314. // Presumably, this was deleted when a relist happened.
  315. // Don't provide a second report of the same deletion.
  316. return nil
  317. }
  318. } else {
  319. // We only want to skip the "deletion" action if the object doesn't
  320. // exist in knownObjects and it doesn't have corresponding item in items.
  321. // Note that even if there is a "deletion" action in items, we can ignore it,
  322. // because it will be deduped automatically in "queueActionLocked"
  323. _, exists, err := f.knownObjects.GetByKey(id)
  324. _, itemsExist := f.items[id]
  325. if err == nil && !exists && !itemsExist {
  326. // Presumably, this was deleted when a relist happened.
  327. // Don't provide a second report of the same deletion.
  328. return nil
  329. }
  330. }
  331. // exist in items and/or KnownObjects
  332. return f.queueActionLocked(Deleted, obj)
  333. }
  334. // re-listing and watching can deliver the same update multiple times in any
  335. // order. This will combine the most recent two deltas if they are the same.
  336. func dedupDeltas(deltas Deltas) Deltas {
  337. n := len(deltas)
  338. if n < 2 {
  339. return deltas
  340. }
  341. a := &deltas[n-1]
  342. b := &deltas[n-2]
  343. if out := isDup(a, b); out != nil {
  344. deltas[n-2] = *out
  345. return deltas[:n-1]
  346. }
  347. return deltas
  348. }
  349. // If a & b represent the same event, returns the delta that ought to be kept.
  350. // Otherwise, returns nil.
  351. // TODO: is there anything other than deletions that need deduping?
  352. func isDup(a, b *Delta) *Delta {
  353. if out := isDeletionDup(a, b); out != nil {
  354. return out
  355. }
  356. // TODO: Detect other duplicate situations? Are there any?
  357. return nil
  358. }
  359. // keep the one with the most information if both are deletions.
  360. func isDeletionDup(a, b *Delta) *Delta {
  361. if b.Type != Deleted || a.Type != Deleted {
  362. return nil
  363. }
  364. // Do more sophisticated checks, or is this sufficient?
  365. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  366. return a
  367. }
  368. return b
  369. }
  370. // queueActionLocked appends to the delta list for the object.
  371. // Caller must lock first.
  372. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  373. return f.queueActionInternalLocked(actionType, actionType, obj)
  374. }
  375. // queueActionInternalLocked appends to the delta list for the object.
  376. // The actionType is emitted and must honor emitDeltaTypeReplaced.
  377. // The internalActionType is only used within this function and must
  378. // ignore emitDeltaTypeReplaced.
  379. // Caller must lock first.
  380. func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
  381. id, err := f.KeyOf(obj)
  382. if err != nil {
  383. return KeyError{obj, err}
  384. }
  385. // Every object comes through this code path once, so this is a good
  386. // place to call the transform func.
  387. //
  388. // If obj is a DeletedFinalStateUnknown tombstone or the action is a Sync,
  389. // then the object have already gone through the transformer.
  390. //
  391. // If the objects already present in the cache are passed to Replace(),
  392. // the transformer must be idempotent to avoid re-mutating them,
  393. // or coordinate with all readers from the cache to avoid data races.
  394. // Default informers do not pass existing objects to Replace.
  395. if f.transformer != nil {
  396. _, isTombstone := obj.(DeletedFinalStateUnknown)
  397. if !isTombstone && internalActionType != Sync {
  398. var err error
  399. obj, err = f.transformer(obj)
  400. if err != nil {
  401. return err
  402. }
  403. }
  404. }
  405. oldDeltas := f.items[id]
  406. newDeltas := append(oldDeltas, Delta{actionType, obj})
  407. newDeltas = dedupDeltas(newDeltas)
  408. if len(newDeltas) > 0 {
  409. if _, exists := f.items[id]; !exists {
  410. f.queue = append(f.queue, id)
  411. }
  412. f.items[id] = newDeltas
  413. f.cond.Broadcast()
  414. } else {
  415. // This never happens, because dedupDeltas never returns an empty list
  416. // when given a non-empty list (as it is here).
  417. // If somehow it happens anyway, deal with it but complain.
  418. if oldDeltas == nil {
  419. f.logger.Error(nil, "Impossible dedupDeltas, ignoring", "id", id, "oldDeltas", oldDeltas, "obj", obj)
  420. return nil
  421. }
  422. f.logger.Error(nil, "Impossible dedupDeltas, breaking invariant by storing empty Deltas", "id", id, "oldDeltas", oldDeltas, "obj", obj)
  423. f.items[id] = newDeltas
  424. return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
  425. }
  426. return nil
  427. }
  428. // IsClosed checks if the queue is closed
  429. func (f *DeltaFIFO) IsClosed() bool {
  430. f.lock.Lock()
  431. defer f.lock.Unlock()
  432. return f.closed
  433. }
  434. // Pop blocks until the queue has some items, and then returns one. If
  435. // multiple items are ready, they are returned in the order in which they were
  436. // added/updated. The item is removed from the queue (and the store) before it
  437. // is returned, so if you don't successfully process it, you need to add it back
  438. // with AddIfNotPresent().
  439. // process function is called under lock, so it is safe to update data structures
  440. // in it that need to be in sync with the queue (e.g. knownKeys).
  441. // process should avoid expensive I/O operation so that other queue operations, i.e.
  442. // Add() and Get(), won't be blocked for too long.
  443. //
  444. // Pop returns a 'Deltas', which has a complete list of all the things
  445. // that happened to the object (deltas) while it was sitting in the queue.
  446. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  447. f.lock.Lock()
  448. defer f.lock.Unlock()
  449. for {
  450. for len(f.queue) == 0 {
  451. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  452. // When Close() is called, the f.closed is set and the condition is broadcasted.
  453. // Which causes this loop to continue and return from the Pop().
  454. if f.closed {
  455. return nil, ErrFIFOClosed
  456. }
  457. f.cond.Wait()
  458. }
  459. isInInitialList := !f.hasSynced_locked()
  460. id := f.queue[0]
  461. f.queue = f.queue[1:]
  462. depth := len(f.queue)
  463. if f.initialPopulationCount > 0 {
  464. f.initialPopulationCount--
  465. }
  466. item, ok := f.items[id]
  467. if !ok {
  468. // This should never happen
  469. f.logger.Error(nil, "Inconceivable! Item was in f.queue but not f.items; ignoring", "id", id)
  470. continue
  471. }
  472. delete(f.items, id)
  473. // Only log traces if the queue depth is greater than 10 and it takes more than
  474. // 100 milliseconds to process one item from the queue.
  475. // Queue depth never goes high because processing an item is locking the queue,
  476. // and new items can't be added until processing finish.
  477. // https://github.com/kubernetes/kubernetes/issues/103789
  478. if depth > 10 {
  479. trace := utiltrace.New("DeltaFIFO Pop Process",
  480. utiltrace.Field{Key: "ID", Value: id},
  481. utiltrace.Field{Key: "Depth", Value: depth},
  482. utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
  483. defer trace.LogIfLong(100 * time.Millisecond)
  484. }
  485. err := process(item, isInInitialList)
  486. // Don't need to copyDeltas here, because we're transferring
  487. // ownership to the caller.
  488. return item, err
  489. }
  490. }
  491. // Replace atomically does two things: (1) it adds the given objects
  492. // using the Sync or Replace DeltaType and then (2) it does some deletions.
  493. // In particular: for every pre-existing key K that is not the key of
  494. // an object in `list` there is the effect of
  495. // `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
  496. // object of K. The pre-existing keys are those in the union set of the keys in
  497. // `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
  498. // the one present in the last delta in `f.items`. If there is no delta for K
  499. // in `f.items`, it is the object in `f.knownObjects`
  500. func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
  501. f.lock.Lock()
  502. defer f.lock.Unlock()
  503. keys := make(sets.Set[string], len(list))
  504. // keep backwards compat for old clients
  505. action := Sync
  506. if f.emitDeltaTypeReplaced {
  507. action = Replaced
  508. }
  509. // Add Sync/Replaced action for each new item.
  510. for _, item := range list {
  511. key, err := f.KeyOf(item)
  512. if err != nil {
  513. return KeyError{item, err}
  514. }
  515. keys.Insert(key)
  516. if err := f.queueActionInternalLocked(action, Replaced, item); err != nil {
  517. return fmt.Errorf("couldn't enqueue object: %v", err)
  518. }
  519. }
  520. // Do deletion detection against objects in the queue
  521. queuedDeletions := 0
  522. for k, oldItem := range f.items {
  523. if keys.Has(k) {
  524. continue
  525. }
  526. // Delete pre-existing items not in the new list.
  527. // This could happen if watch deletion event was missed while
  528. // disconnected from apiserver.
  529. var deletedObj interface{}
  530. if n := oldItem.Newest(); n != nil {
  531. deletedObj = n.Object
  532. // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
  533. if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
  534. deletedObj = d.Obj
  535. }
  536. }
  537. queuedDeletions++
  538. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  539. return err
  540. }
  541. }
  542. if f.knownObjects != nil {
  543. // Detect deletions for objects not present in the queue, but present in KnownObjects
  544. knownKeys := f.knownObjects.ListKeys()
  545. for _, k := range knownKeys {
  546. if keys.Has(k) {
  547. continue
  548. }
  549. if len(f.items[k]) > 0 {
  550. continue
  551. }
  552. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  553. if err != nil {
  554. deletedObj = nil
  555. f.logger.Error(err, "Unexpected error during lookup, placing DeleteFinalStateUnknown marker without object", "key", k)
  556. } else if !exists {
  557. deletedObj = nil
  558. f.logger.Info("Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", k)
  559. }
  560. queuedDeletions++
  561. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  562. return err
  563. }
  564. }
  565. }
  566. if !f.populated {
  567. f.populated = true
  568. f.initialPopulationCount = keys.Len() + queuedDeletions
  569. }
  570. return nil
  571. }
  572. // Resync adds, with a Sync type of Delta, every object listed by
  573. // `f.knownObjects` whose key is not already queued for processing.
  574. // If `f.knownObjects` is `nil` then Resync does nothing.
  575. func (f *DeltaFIFO) Resync() error {
  576. f.lock.Lock()
  577. defer f.lock.Unlock()
  578. if f.knownObjects == nil {
  579. return nil
  580. }
  581. keys := f.knownObjects.ListKeys()
  582. for _, k := range keys {
  583. if err := f.syncKeyLocked(k); err != nil {
  584. return err
  585. }
  586. }
  587. return nil
  588. }
  589. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  590. obj, exists, err := f.knownObjects.GetByKey(key)
  591. if err != nil {
  592. f.logger.Error(err, "Unexpected error during lookup, unable to queue object for sync", "key", key)
  593. return nil
  594. } else if !exists {
  595. f.logger.Info("Key does not exist in known objects store, unable to queue object for sync", "key", key)
  596. return nil
  597. }
  598. // If we are doing Resync() and there is already an event queued for that object,
  599. // we ignore the Resync for it. This is to avoid the race, in which the resync
  600. // comes with the previous value of object (since queueing an event for the object
  601. // doesn't trigger changing the underlying store <knownObjects>.
  602. id, err := f.KeyOf(obj)
  603. if err != nil {
  604. return KeyError{obj, err}
  605. }
  606. if len(f.items[id]) > 0 {
  607. return nil
  608. }
  609. if err := f.queueActionLocked(Sync, obj); err != nil {
  610. return fmt.Errorf("couldn't queue object: %v", err)
  611. }
  612. return nil
  613. }
  614. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  615. type KeyListerGetter interface {
  616. KeyLister
  617. KeyGetter
  618. }
  619. // A KeyLister is anything that knows how to list its keys.
  620. type KeyLister interface {
  621. ListKeys() []string
  622. }
  623. // A KeyGetter is anything that knows how to get the value stored under a given key.
  624. type KeyGetter interface {
  625. // GetByKey returns the value associated with the key, or sets exists=false.
  626. GetByKey(key string) (value interface{}, exists bool, err error)
  627. }
  628. // Oldest is a convenience function that returns the oldest delta, or
  629. // nil if there are no deltas.
  630. func (d Deltas) Oldest() *Delta {
  631. if len(d) > 0 {
  632. return &d[0]
  633. }
  634. return nil
  635. }
  636. // Newest is a convenience function that returns the newest delta, or
  637. // nil if there are no deltas.
  638. func (d Deltas) Newest() *Delta {
  639. if n := len(d); n > 0 {
  640. return &d[n-1]
  641. }
  642. return nil
  643. }
  644. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  645. // the objects in the slice. This allows Get/List to return an object that we
  646. // know won't be clobbered by a subsequent modifications.
  647. func copyDeltas(d Deltas) Deltas {
  648. d2 := make(Deltas, len(d))
  649. copy(d2, d)
  650. return d2
  651. }
  652. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
  653. // was deleted but the watch deletion event was missed while disconnected from
  654. // apiserver. In this case we don't know the final "resting" state of the object, so
  655. // there's a chance the included `Obj` is stale.
  656. type DeletedFinalStateUnknown struct {
  657. Key string
  658. Obj interface{}
  659. }