controller.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "sync"
  19. "time"
  20. "k8s.io/apimachinery/pkg/runtime"
  21. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. clientgofeaturegate "k8s.io/client-go/features"
  24. "k8s.io/utils/clock"
  25. )
  26. // This file implements a low-level controller that is used in
  27. // sharedIndexInformer, which is an implementation of
  28. // SharedIndexInformer. Such informers, in turn, are key components
  29. // in the high level controllers that form the backbone of the
  30. // Kubernetes control plane. Look at those for examples, or the
  31. // example in
  32. // https://github.com/kubernetes/client-go/tree/master/examples/workqueue
  33. // .
  34. // Config contains all the settings for one of these low-level controllers.
  35. type Config struct {
  36. // The queue for your objects - has to be a DeltaFIFO due to
  37. // assumptions in the implementation. Your Process() function
  38. // should accept the output of this Queue's Pop() method.
  39. Queue
  40. // Something that can list and watch your objects.
  41. ListerWatcher
  42. // Process can process a popped Deltas.
  43. Process ProcessFunc
  44. // ProcessBatch can process a batch of popped Deltas, which should return `TransactionError` if not all items
  45. // in the batch were successfully processed.
  46. //
  47. // For batch processing to be used:
  48. // * ProcessBatch must be non-nil
  49. // * Queue must implement QueueWithBatch
  50. // * The client InOrderInformersBatchProcess feature gate must be enabled
  51. //
  52. // If any of those are false, Process is used and no batch processing is done.
  53. ProcessBatch ProcessBatchFunc
  54. // ObjectType is an example object of the type this controller is
  55. // expected to handle.
  56. ObjectType runtime.Object
  57. // ObjectDescription is the description to use when logging type-specific information about this controller.
  58. ObjectDescription string
  59. // FullResyncPeriod is the period at which ShouldResync is considered.
  60. FullResyncPeriod time.Duration
  61. // MinWatchTimeout, if set, will define the minimum timeout for watch requests send
  62. // to kube-apiserver. However, values lower than 5m will not be honored to avoid
  63. // negative performance impact on controlplane.
  64. // Optional - if unset a default value of 5m will be used.
  65. MinWatchTimeout time.Duration
  66. // ShouldResync is periodically used by the reflector to determine
  67. // whether to Resync the Queue. If ShouldResync is `nil` or
  68. // returns true, it means the reflector should proceed with the
  69. // resync.
  70. ShouldResync ShouldResyncFunc
  71. // Called whenever the ListAndWatch drops the connection with an error.
  72. //
  73. // Contextual logging: WatchErrorHandlerWithContext should be used instead of WatchErrorHandler in code which supports contextual logging.
  74. WatchErrorHandler WatchErrorHandler
  75. // Called whenever the ListAndWatch drops the connection with an error
  76. // and WatchErrorHandler is not set.
  77. WatchErrorHandlerWithContext WatchErrorHandlerWithContext
  78. // WatchListPageSize is the requested chunk size of initial and relist watch lists.
  79. WatchListPageSize int64
  80. }
  81. // ShouldResyncFunc is a type of function that indicates if a reflector should perform a
  82. // resync or not. It can be used by a shared informer to support multiple event handlers with custom
  83. // resync periods.
  84. type ShouldResyncFunc func() bool
  85. // ProcessFunc processes a single object.
  86. type ProcessFunc func(obj interface{}, isInInitialList bool) error
  87. // ProcessBatchFunc processes multiple objects in batch.
  88. // The deltas must not contain multiple entries for the same object.
  89. type ProcessBatchFunc func(deltas []Delta, isInInitialList bool) error
  90. // `*controller` implements Controller
  91. type controller struct {
  92. config Config
  93. reflector *Reflector
  94. reflectorMutex sync.RWMutex
  95. clock clock.Clock
  96. }
  97. // Controller is a low-level controller that is parameterized by a
  98. // Config and used in sharedIndexInformer.
  99. type Controller interface {
  100. // RunWithContext does two things. One is to construct and run a Reflector
  101. // to pump objects/notifications from the Config's ListerWatcher
  102. // to the Config's Queue and possibly invoke the occasional Resync
  103. // on that Queue. The other is to repeatedly Pop from the Queue
  104. // and process with the Config's ProcessFunc. Both of these
  105. // continue until the context is canceled.
  106. //
  107. // It's an error to call RunWithContext more than once.
  108. // RunWithContext blocks; call via go.
  109. RunWithContext(ctx context.Context)
  110. // Run does the same as RunWithContext with a stop channel instead of
  111. // a context.
  112. //
  113. // Contextual logging: RunWithcontext should be used instead of Run in code which supports contextual logging.
  114. Run(stopCh <-chan struct{})
  115. // HasSynced delegates to the Config's Queue
  116. HasSynced() bool
  117. // LastSyncResourceVersion delegates to the Reflector when there
  118. // is one, otherwise returns the empty string
  119. LastSyncResourceVersion() string
  120. }
  121. // New makes a new Controller from the given Config.
  122. func New(c *Config) Controller {
  123. ctlr := &controller{
  124. config: *c,
  125. clock: &clock.RealClock{},
  126. }
  127. return ctlr
  128. }
  129. // Run implements [Controller.Run].
  130. func (c *controller) Run(stopCh <-chan struct{}) {
  131. c.RunWithContext(wait.ContextForChannel(stopCh))
  132. }
  133. // RunWithContext implements [Controller.RunWithContext].
  134. func (c *controller) RunWithContext(ctx context.Context) {
  135. defer utilruntime.HandleCrashWithContext(ctx)
  136. go func() {
  137. <-ctx.Done()
  138. c.config.Queue.Close()
  139. }()
  140. r := NewReflectorWithOptions(
  141. c.config.ListerWatcher,
  142. c.config.ObjectType,
  143. c.config.Queue,
  144. ReflectorOptions{
  145. ResyncPeriod: c.config.FullResyncPeriod,
  146. MinWatchTimeout: c.config.MinWatchTimeout,
  147. TypeDescription: c.config.ObjectDescription,
  148. Clock: c.clock,
  149. },
  150. )
  151. r.ShouldResync = c.config.ShouldResync
  152. r.WatchListPageSize = c.config.WatchListPageSize
  153. if c.config.WatchErrorHandler != nil {
  154. r.watchErrorHandler = func(_ context.Context, r *Reflector, err error) {
  155. c.config.WatchErrorHandler(r, err)
  156. }
  157. } else if c.config.WatchErrorHandlerWithContext != nil {
  158. r.watchErrorHandler = c.config.WatchErrorHandlerWithContext
  159. }
  160. c.reflectorMutex.Lock()
  161. c.reflector = r
  162. c.reflectorMutex.Unlock()
  163. var wg wait.Group
  164. wg.StartWithContext(ctx, r.RunWithContext)
  165. wait.UntilWithContext(ctx, c.processLoop, time.Second)
  166. wg.Wait()
  167. }
  168. // Returns true once this controller has completed an initial resource listing
  169. func (c *controller) HasSynced() bool {
  170. return c.config.Queue.HasSynced()
  171. }
  172. func (c *controller) LastSyncResourceVersion() string {
  173. c.reflectorMutex.RLock()
  174. defer c.reflectorMutex.RUnlock()
  175. if c.reflector == nil {
  176. return ""
  177. }
  178. return c.reflector.LastSyncResourceVersion()
  179. }
  180. // processLoop drains the work queue.
  181. // TODO: Consider doing the processing in parallel. This will require a little thought
  182. // to make sure that we don't end up processing the same object multiple times
  183. // concurrently.
  184. func (c *controller) processLoop(ctx context.Context) {
  185. useBatchProcess := false
  186. batchQueue, ok := c.config.Queue.(QueueWithBatch)
  187. if ok && c.config.ProcessBatch != nil && clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformersBatchProcess) {
  188. useBatchProcess = true
  189. }
  190. for {
  191. select {
  192. case <-ctx.Done():
  193. return
  194. default:
  195. var err error
  196. if useBatchProcess {
  197. err = batchQueue.PopBatch(c.config.ProcessBatch)
  198. } else {
  199. // otherwise fallback to non-batch process behavior
  200. _, err = c.config.Pop(PopProcessFunc(c.config.Process))
  201. }
  202. if err != nil {
  203. if errors.Is(err, ErrFIFOClosed) {
  204. return
  205. }
  206. }
  207. }
  208. }
  209. }
  210. // ResourceEventHandler can handle notifications for events that
  211. // happen to a resource. The events are informational only, so you
  212. // can't return an error. The handlers MUST NOT modify the objects
  213. // received; this concerns not only the top level of structure but all
  214. // the data structures reachable from it.
  215. // - OnAdd is called when an object is added.
  216. // - OnUpdate is called when an object is modified. Note that oldObj is the
  217. // last known state of the object-- it is possible that several changes
  218. // were combined together, so you can't use this to see every single
  219. // change. OnUpdate is also called when a re-list happens, and it will
  220. // get called even if nothing changed. This is useful for periodically
  221. // evaluating or syncing something.
  222. // - OnDelete will get the final state of the item if it is known, otherwise
  223. // it will get an object of type DeletedFinalStateUnknown. This can
  224. // happen if the watch is closed and misses the delete event and we don't
  225. // notice the deletion until the subsequent re-list.
  226. type ResourceEventHandler interface {
  227. OnAdd(obj interface{}, isInInitialList bool)
  228. OnUpdate(oldObj, newObj interface{})
  229. OnDelete(obj interface{})
  230. }
  231. // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
  232. // as few of the notification functions as you want while still implementing
  233. // ResourceEventHandler. This adapter does not remove the prohibition against
  234. // modifying the objects.
  235. //
  236. // See ResourceEventHandlerDetailedFuncs if your use needs to propagate
  237. // HasSynced.
  238. type ResourceEventHandlerFuncs struct {
  239. AddFunc func(obj interface{})
  240. UpdateFunc func(oldObj, newObj interface{})
  241. DeleteFunc func(obj interface{})
  242. }
  243. // OnAdd calls AddFunc if it's not nil.
  244. func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
  245. if r.AddFunc != nil {
  246. r.AddFunc(obj)
  247. }
  248. }
  249. // OnUpdate calls UpdateFunc if it's not nil.
  250. func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
  251. if r.UpdateFunc != nil {
  252. r.UpdateFunc(oldObj, newObj)
  253. }
  254. }
  255. // OnDelete calls DeleteFunc if it's not nil.
  256. func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
  257. if r.DeleteFunc != nil {
  258. r.DeleteFunc(obj)
  259. }
  260. }
  261. // ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs
  262. // except its AddFunc accepts the isInInitialList parameter, for propagating
  263. // HasSynced.
  264. type ResourceEventHandlerDetailedFuncs struct {
  265. AddFunc func(obj interface{}, isInInitialList bool)
  266. UpdateFunc func(oldObj, newObj interface{})
  267. DeleteFunc func(obj interface{})
  268. }
  269. // OnAdd calls AddFunc if it's not nil.
  270. func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool) {
  271. if r.AddFunc != nil {
  272. r.AddFunc(obj, isInInitialList)
  273. }
  274. }
  275. // OnUpdate calls UpdateFunc if it's not nil.
  276. func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{}) {
  277. if r.UpdateFunc != nil {
  278. r.UpdateFunc(oldObj, newObj)
  279. }
  280. }
  281. // OnDelete calls DeleteFunc if it's not nil.
  282. func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{}) {
  283. if r.DeleteFunc != nil {
  284. r.DeleteFunc(obj)
  285. }
  286. }
  287. // FilteringResourceEventHandler applies the provided filter to all events coming
  288. // in, ensuring the appropriate nested handler method is invoked. An object
  289. // that starts passing the filter after an update is considered an add, and an
  290. // object that stops passing the filter after an update is considered a delete.
  291. // Like the handlers, the filter MUST NOT modify the objects it is given.
  292. type FilteringResourceEventHandler struct {
  293. FilterFunc func(obj interface{}) bool
  294. Handler ResourceEventHandler
  295. }
  296. // OnAdd calls the nested handler only if the filter succeeds
  297. func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
  298. if !r.FilterFunc(obj) {
  299. return
  300. }
  301. r.Handler.OnAdd(obj, isInInitialList)
  302. }
  303. // OnUpdate ensures the proper handler is called depending on whether the filter matches
  304. func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
  305. newer := r.FilterFunc(newObj)
  306. older := r.FilterFunc(oldObj)
  307. switch {
  308. case newer && older:
  309. r.Handler.OnUpdate(oldObj, newObj)
  310. case newer && !older:
  311. r.Handler.OnAdd(newObj, false)
  312. case !newer && older:
  313. r.Handler.OnDelete(oldObj)
  314. default:
  315. // do nothing
  316. }
  317. }
  318. // OnDelete calls the nested handler only if the filter succeeds
  319. func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
  320. if !r.FilterFunc(obj) {
  321. return
  322. }
  323. r.Handler.OnDelete(obj)
  324. }
  325. // DeletionHandlingMetaNamespaceKeyFunc checks for
  326. // DeletedFinalStateUnknown objects before calling
  327. // MetaNamespaceKeyFunc.
  328. func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
  329. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  330. return d.Key, nil
  331. }
  332. return MetaNamespaceKeyFunc(obj)
  333. }
  334. // DeletionHandlingObjectToName checks for
  335. // DeletedFinalStateUnknown objects before calling
  336. // ObjectToName.
  337. func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
  338. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  339. return ParseObjectName(d.Key)
  340. }
  341. return ObjectToName(obj)
  342. }
  343. // InformerOptions configure a Reflector.
  344. type InformerOptions struct {
  345. // ListerWatcher implements List and Watch functions for the source of the resource
  346. // the informer will be informing about.
  347. ListerWatcher ListerWatcher
  348. // ObjectType is an object of the type that informer is expected to receive.
  349. ObjectType runtime.Object
  350. // Handler defines functions that should called on object mutations.
  351. Handler ResourceEventHandler
  352. // ResyncPeriod is the underlying Reflector's resync period. If non-zero, the store
  353. // is re-synced with that frequency - Modify events are delivered even if objects
  354. // didn't change.
  355. // This is useful for synchronizing objects that configure external resources
  356. // (e.g. configure cloud provider functionalities).
  357. // Optional - if unset, store resyncing is not happening periodically.
  358. ResyncPeriod time.Duration
  359. // MinWatchTimeout, if set, will define the minimum timeout for watch requests send
  360. // to kube-apiserver. However, values lower than 5m will not be honored to avoid
  361. // negative performance impact on controlplane.
  362. // Optional - if unset a default value of 5m will be used.
  363. MinWatchTimeout time.Duration
  364. // Indexers, if set, are the indexers for the received objects to optimize
  365. // certain queries.
  366. // Optional - if unset no indexes are maintained.
  367. Indexers Indexers
  368. // Transform function, if set, will be called on all objects before they will be
  369. // put into the Store and corresponding Add/Modify/Delete handlers will be invoked
  370. // for them.
  371. // Optional - if unset no additional transforming is happening.
  372. Transform TransformFunc
  373. }
  374. // NewInformerWithOptions returns a Store and a controller for populating the store
  375. // while also providing event notifications. You should only used the returned
  376. // Store for Get/List operations; Add/Modify/Deletes will cause the event
  377. // notifications to be faulty.
  378. func NewInformerWithOptions(options InformerOptions) (Store, Controller) {
  379. var clientState Store
  380. if options.Indexers == nil {
  381. clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
  382. } else {
  383. clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers)
  384. }
  385. return clientState, newInformer(clientState, options)
  386. }
  387. // NewInformer returns a Store and a controller for populating the store
  388. // while also providing event notifications. You should only used the returned
  389. // Store for Get/List operations; Add/Modify/Deletes will cause the event
  390. // notifications to be faulty.
  391. //
  392. // Parameters:
  393. // - lw is list and watch functions for the source of the resource you want to
  394. // be informed of.
  395. // - objType is an object of the type that you expect to receive.
  396. // - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
  397. // calls, even if nothing changed). Otherwise, re-list will be delayed as
  398. // long as possible (until the upstream source closes the watch or times out,
  399. // or you stop the controller).
  400. // - h is the object you want notifications sent to.
  401. //
  402. // Deprecated: Use NewInformerWithOptions instead.
  403. func NewInformer(
  404. lw ListerWatcher,
  405. objType runtime.Object,
  406. resyncPeriod time.Duration,
  407. h ResourceEventHandler,
  408. ) (Store, Controller) {
  409. // This will hold the client state, as we know it.
  410. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
  411. options := InformerOptions{
  412. ListerWatcher: lw,
  413. ObjectType: objType,
  414. Handler: h,
  415. ResyncPeriod: resyncPeriod,
  416. }
  417. return clientState, newInformer(clientState, options)
  418. }
  419. // NewIndexerInformer returns an Indexer and a Controller for populating the index
  420. // while also providing event notifications. You should only used the returned
  421. // Index for Get/List operations; Add/Modify/Deletes will cause the event
  422. // notifications to be faulty.
  423. //
  424. // Parameters:
  425. // - lw is list and watch functions for the source of the resource you want to
  426. // be informed of.
  427. // - objType is an object of the type that you expect to receive.
  428. // - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
  429. // calls, even if nothing changed). Otherwise, re-list will be delayed as
  430. // long as possible (until the upstream source closes the watch or times out,
  431. // or you stop the controller).
  432. // - h is the object you want notifications sent to.
  433. // - indexers is the indexer for the received object type.
  434. //
  435. // Deprecated: Use NewInformerWithOptions instead.
  436. func NewIndexerInformer(
  437. lw ListerWatcher,
  438. objType runtime.Object,
  439. resyncPeriod time.Duration,
  440. h ResourceEventHandler,
  441. indexers Indexers,
  442. ) (Indexer, Controller) {
  443. // This will hold the client state, as we know it.
  444. clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
  445. options := InformerOptions{
  446. ListerWatcher: lw,
  447. ObjectType: objType,
  448. Handler: h,
  449. ResyncPeriod: resyncPeriod,
  450. Indexers: indexers,
  451. }
  452. return clientState, newInformer(clientState, options)
  453. }
  454. // NewTransformingInformer returns a Store and a controller for populating
  455. // the store while also providing event notifications. You should only used
  456. // the returned Store for Get/List operations; Add/Modify/Deletes will cause
  457. // the event notifications to be faulty.
  458. // The given transform function will be called on all objects before they will
  459. // put into the Store and corresponding Add/Modify/Delete handlers will
  460. // be invoked for them.
  461. //
  462. // Deprecated: Use NewInformerWithOptions instead.
  463. func NewTransformingInformer(
  464. lw ListerWatcher,
  465. objType runtime.Object,
  466. resyncPeriod time.Duration,
  467. h ResourceEventHandler,
  468. transformer TransformFunc,
  469. ) (Store, Controller) {
  470. // This will hold the client state, as we know it.
  471. clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
  472. options := InformerOptions{
  473. ListerWatcher: lw,
  474. ObjectType: objType,
  475. Handler: h,
  476. ResyncPeriod: resyncPeriod,
  477. Transform: transformer,
  478. }
  479. return clientState, newInformer(clientState, options)
  480. }
  481. // NewTransformingIndexerInformer returns an Indexer and a controller for
  482. // populating the index while also providing event notifications. You should
  483. // only used the returned Index for Get/List operations; Add/Modify/Deletes
  484. // will cause the event notifications to be faulty.
  485. // The given transform function will be called on all objects before they will
  486. // be put into the Index and corresponding Add/Modify/Delete handlers will
  487. // be invoked for them.
  488. //
  489. // Deprecated: Use NewInformerWithOptions instead.
  490. func NewTransformingIndexerInformer(
  491. lw ListerWatcher,
  492. objType runtime.Object,
  493. resyncPeriod time.Duration,
  494. h ResourceEventHandler,
  495. indexers Indexers,
  496. transformer TransformFunc,
  497. ) (Indexer, Controller) {
  498. // This will hold the client state, as we know it.
  499. clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
  500. options := InformerOptions{
  501. ListerWatcher: lw,
  502. ObjectType: objType,
  503. Handler: h,
  504. ResyncPeriod: resyncPeriod,
  505. Indexers: indexers,
  506. Transform: transformer,
  507. }
  508. return clientState, newInformer(clientState, options)
  509. }
  510. // Multiplexes updates in the form of a list of Deltas into a Store, and informs
  511. // a given handler of events OnUpdate, OnAdd, OnDelete
  512. func processDeltas(
  513. // Object which receives event notifications from the given deltas
  514. handler ResourceEventHandler,
  515. clientState Store,
  516. deltas Deltas,
  517. isInInitialList bool,
  518. ) error {
  519. // from oldest to newest
  520. for _, d := range deltas {
  521. obj := d.Object
  522. switch d.Type {
  523. case Sync, Replaced, Added, Updated:
  524. if old, exists, err := clientState.Get(obj); err == nil && exists {
  525. if err := clientState.Update(obj); err != nil {
  526. return err
  527. }
  528. handler.OnUpdate(old, obj)
  529. } else {
  530. if err := clientState.Add(obj); err != nil {
  531. return err
  532. }
  533. handler.OnAdd(obj, isInInitialList)
  534. }
  535. case Deleted:
  536. if err := clientState.Delete(obj); err != nil {
  537. return err
  538. }
  539. handler.OnDelete(obj)
  540. }
  541. }
  542. return nil
  543. }
  544. // processDeltasInBatch applies a batch of Delta objects to the given Store and
  545. // notifies the ResourceEventHandler of add, update, or delete events.
  546. //
  547. // If the Store supports transactions (TransactionStore), all Deltas are applied
  548. // atomically in a single transaction and corresponding handler callbacks are
  549. // executed afterward. Otherwise, each Delta is processed individually.
  550. //
  551. // Returns an error if any Delta or transaction fails. For TransactionError,
  552. // only successful operations trigger callbacks.
  553. func processDeltasInBatch(
  554. handler ResourceEventHandler,
  555. clientState Store,
  556. deltas []Delta,
  557. isInInitialList bool,
  558. ) error {
  559. // from oldest to newest
  560. txns := make([]Transaction, 0)
  561. callbacks := make([]func(), 0)
  562. txnStore, txnSupported := clientState.(TransactionStore)
  563. if !txnSupported {
  564. var errs []error
  565. for _, delta := range deltas {
  566. if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList); err != nil {
  567. errs = append(errs, err)
  568. }
  569. }
  570. if len(errs) > 0 {
  571. return fmt.Errorf("unexpected error when handling deltas: %v", errs)
  572. }
  573. return nil
  574. }
  575. // deltasList is a list of unique objects
  576. for _, d := range deltas {
  577. obj := d.Object
  578. switch d.Type {
  579. case Sync, Replaced, Added, Updated:
  580. // it will only return one old object for each because items are unique
  581. if old, exists, err := clientState.Get(obj); err == nil && exists {
  582. txn := Transaction{
  583. Type: TransactionTypeUpdate,
  584. Object: obj,
  585. }
  586. txns = append(txns, txn)
  587. callbacks = append(callbacks, func() {
  588. handler.OnUpdate(old, obj)
  589. })
  590. } else {
  591. txn := Transaction{
  592. Type: TransactionTypeAdd,
  593. Object: obj,
  594. }
  595. txns = append(txns, txn)
  596. callbacks = append(callbacks, func() {
  597. handler.OnAdd(obj, isInInitialList)
  598. })
  599. }
  600. case Deleted:
  601. txn := Transaction{
  602. Type: TransactionTypeDelete,
  603. Object: obj,
  604. }
  605. txns = append(txns, txn)
  606. callbacks = append(callbacks, func() {
  607. handler.OnDelete(obj)
  608. })
  609. }
  610. }
  611. err := txnStore.Transaction(txns...)
  612. if err != nil {
  613. // if txn had error, only execute the callbacks for the successful ones
  614. for _, i := range err.SuccessfulIndices {
  615. if i < len(callbacks) {
  616. callbacks[i]()
  617. }
  618. }
  619. // formatting the error so txns doesn't escape and keeps allocated in the stack.
  620. return fmt.Errorf("not all items in the batch successfully processed: %s", err.Error())
  621. }
  622. for _, callback := range callbacks {
  623. callback()
  624. }
  625. return nil
  626. }
  627. // newInformer returns a controller for populating the store while also
  628. // providing event notifications.
  629. //
  630. // Parameters
  631. // - clientState is the store you want to populate
  632. // - options contain the options to configure the controller
  633. func newInformer(clientState Store, options InformerOptions) Controller {
  634. // This will hold incoming changes. Note how we pass clientState in as a
  635. // KeyLister, that way resync operations will result in the correct set
  636. // of update/delete deltas.
  637. fifo := newQueueFIFO(clientState, options.Transform)
  638. cfg := &Config{
  639. Queue: fifo,
  640. ListerWatcher: options.ListerWatcher,
  641. ObjectType: options.ObjectType,
  642. FullResyncPeriod: options.ResyncPeriod,
  643. MinWatchTimeout: options.MinWatchTimeout,
  644. Process: func(obj interface{}, isInInitialList bool) error {
  645. if deltas, ok := obj.(Deltas); ok {
  646. return processDeltas(options.Handler, clientState, deltas, isInInitialList)
  647. }
  648. return errors.New("object given as Process argument is not Deltas")
  649. },
  650. ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
  651. return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList)
  652. },
  653. }
  654. return New(cfg)
  655. }
  656. func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
  657. if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
  658. return NewRealFIFOWithOptions(RealFIFOOptions{
  659. KeyFunction: MetaNamespaceKeyFunc,
  660. KnownObjects: clientState,
  661. Transformer: transform,
  662. })
  663. } else {
  664. return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  665. KnownObjects: clientState,
  666. EmitDeltaTypeReplaced: true,
  667. Transformer: transform,
  668. })
  669. }
  670. }