shared_informer.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/apimachinery/pkg/api/meta"
  19. "k8s.io/apimachinery/pkg/runtime"
  20. "k8s.io/apimachinery/pkg/util/clock"
  21. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  22. "k8s.io/apimachinery/pkg/util/wait"
  23. "k8s.io/utils/buffer"
  24. "k8s.io/klog/v2"
  25. )
  26. // SharedInformer provides eventually consistent linkage of its
  27. // clients to the authoritative state of a given collection of
  28. // objects. An object is identified by its API group, kind/resource,
  29. // namespace (if any), and name; the `ObjectMeta.UID` is not part of
  30. // an object's ID as far as this contract is concerned. One
  31. // SharedInformer provides linkage to objects of a particular API
  32. // group and kind/resource. The linked object collection of a
  33. // SharedInformer may be further restricted to one namespace (if
  34. // applicable) and/or by label selector and/or field selector.
  35. //
  36. // The authoritative state of an object is what apiservers provide
  37. // access to, and an object goes through a strict sequence of states.
  38. // An object state is either (1) present with a ResourceVersion and
  39. // other appropriate content or (2) "absent".
  40. //
  41. // A SharedInformer maintains a local cache --- exposed by GetStore(),
  42. // by GetIndexer() in the case of an indexed informer, and possibly by
  43. // machinery involved in creating and/or accessing the informer --- of
  44. // the state of each relevant object. This cache is eventually
  45. // consistent with the authoritative state. This means that, unless
  46. // prevented by persistent communication problems, if ever a
  47. // particular object ID X is authoritatively associated with a state S
  48. // then for every SharedInformer I whose collection includes (X, S)
  49. // eventually either (1) I's cache associates X with S or a later
  50. // state of X, (2) I is stopped, or (3) the authoritative state
  51. // service for X terminates. To be formally complete, we say that the
  52. // absent state meets any restriction by label selector or field
  53. // selector.
  54. //
  55. // For a given informer and relevant object ID X, the sequence of
  56. // states that appears in the informer's cache is a subsequence of the
  57. // states authoritatively associated with X. That is, some states
  58. // might never appear in the cache but ordering among the appearing
  59. // states is correct. Note, however, that there is no promise about
  60. // ordering between states seen for different objects.
  61. //
  62. // The local cache starts out empty, and gets populated and updated
  63. // during `Run()`.
  64. //
  65. // As a simple example, if a collection of objects is henceforth
  66. // unchanging, a SharedInformer is created that links to that
  67. // collection, and that SharedInformer is `Run()` then that
  68. // SharedInformer's cache eventually holds an exact copy of that
  69. // collection (unless it is stopped too soon, the authoritative state
  70. // service ends, or communication problems between the two
  71. // persistently thwart achievement).
  72. //
  73. // As another simple example, if the local cache ever holds a
  74. // non-absent state for some object ID and the object is eventually
  75. // removed from the authoritative state then eventually the object is
  76. // removed from the local cache (unless the SharedInformer is stopped
  77. // too soon, the authoritative state service ends, or communication
  78. // problems persistently thwart the desired result).
  79. //
  80. // The keys in the Store are of the form namespace/name for namespaced
  81. // objects, and are simply the name for non-namespaced objects.
  82. // Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
  83. // a given object, and `SplitMetaNamespaceKey(key)` to split a key
  84. // into its constituent parts.
  85. //
  86. // Every query against the local cache is answered entirely from one
  87. // snapshot of the cache's state. Thus, the result of a `List` call
  88. // will not contain two entries with the same namespace and name.
  89. //
  90. // A client is identified here by a ResourceEventHandler. For every
  91. // update to the SharedInformer's local cache and for every client
  92. // added before `Run()`, eventually either the SharedInformer is
  93. // stopped or the client is notified of the update. A client added
  94. // after `Run()` starts gets a startup batch of notifications of
  95. // additions of the objects existing in the cache at the time that
  96. // client was added; also, for every update to the SharedInformer's
  97. // local cache after that client was added, eventually either the
  98. // SharedInformer is stopped or that client is notified of that
  99. // update. Client notifications happen after the corresponding cache
  100. // update and, in the case of a SharedIndexInformer, after the
  101. // corresponding index updates. It is possible that additional cache
  102. // and index updates happen before such a prescribed notification.
  103. // For a given SharedInformer and client, the notifications are
  104. // delivered sequentially. For a given SharedInformer, client, and
  105. // object ID, the notifications are delivered in order. Because
  106. // `ObjectMeta.UID` has no role in identifying objects, it is possible
  107. // that when (1) object O1 with ID (e.g. namespace and name) X and
  108. // `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
  109. // and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
  110. // created the informer's clients are not notified of (1) and (2) but
  111. // rather are notified only of an update from O1 to O2. Clients that
  112. // need to detect such cases might do so by comparing the `ObjectMeta.UID`
  113. // field of the old and the new object in the code that handles update
  114. // notifications (i.e. `OnUpdate` method of ResourceEventHandler).
  115. //
  116. // A client must process each notification promptly; a SharedInformer
  117. // is not engineered to deal well with a large backlog of
  118. // notifications to deliver. Lengthy processing should be passed off
  119. // to something else, for example through a
  120. // `client-go/util/workqueue`.
  121. //
  122. // A delete notification exposes the last locally known non-absent
  123. // state, except that its ResourceVersion is replaced with a
  124. // ResourceVersion in which the object is actually absent.
  125. type SharedInformer interface {
  126. // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
  127. // period. Events to a single handler are delivered sequentially, but there is no coordination
  128. // between different handlers.
  129. AddEventHandler(handler ResourceEventHandler)
  130. // AddEventHandlerWithResyncPeriod adds an event handler to the
  131. // shared informer with the requested resync period; zero means
  132. // this handler does not care about resyncs. The resync operation
  133. // consists of delivering to the handler an update notification
  134. // for every object in the informer's local cache; it does not add
  135. // any interactions with the authoritative storage. Some
  136. // informers do no resyncs at all, not even for handlers added
  137. // with a non-zero resyncPeriod. For an informer that does
  138. // resyncs, and for each handler that requests resyncs, that
  139. // informer develops a nominal resync period that is no shorter
  140. // than the requested period but may be longer. The actual time
  141. // between any two resyncs may be longer than the nominal period
  142. // because the implementation takes time to do work and there may
  143. // be competing load and scheduling noise.
  144. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
  145. // GetStore returns the informer's local cache as a Store.
  146. GetStore() Store
  147. // GetController is deprecated, it does nothing useful
  148. GetController() Controller
  149. // Run starts and runs the shared informer, returning after it stops.
  150. // The informer will be stopped when stopCh is closed.
  151. Run(stopCh <-chan struct{})
  152. // HasSynced returns true if the shared informer's store has been
  153. // informed by at least one full LIST of the authoritative state
  154. // of the informer's object collection. This is unrelated to "resync".
  155. HasSynced() bool
  156. // LastSyncResourceVersion is the resource version observed when last synced with the underlying
  157. // store. The value returned is not synchronized with access to the underlying store and is not
  158. // thread-safe.
  159. LastSyncResourceVersion() string
  160. // The WatchErrorHandler is called whenever ListAndWatch drops the
  161. // connection with an error. After calling this handler, the informer
  162. // will backoff and retry.
  163. //
  164. // The default implementation looks at the error type and tries to log
  165. // the error message at an appropriate level.
  166. //
  167. // There's only one handler, so if you call this multiple times, last one
  168. // wins; calling after the informer has been started returns an error.
  169. //
  170. // The handler is intended for visibility, not to e.g. pause the consumers.
  171. // The handler should return quickly - any expensive processing should be
  172. // offloaded.
  173. SetWatchErrorHandler(handler WatchErrorHandler) error
  174. }
  175. // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
  176. type SharedIndexInformer interface {
  177. SharedInformer
  178. // AddIndexers add indexers to the informer before it starts.
  179. AddIndexers(indexers Indexers) error
  180. GetIndexer() Indexer
  181. }
  182. // NewSharedInformer creates a new instance for the listwatcher.
  183. func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
  184. return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
  185. }
  186. // NewSharedIndexInformer creates a new instance for the listwatcher.
  187. // The created informer will not do resyncs if the given
  188. // defaultEventHandlerResyncPeriod is zero. Otherwise: for each
  189. // handler that with a non-zero requested resync period, whether added
  190. // before or after the informer starts, the nominal resync period is
  191. // the requested resync period rounded up to a multiple of the
  192. // informer's resync checking period. Such an informer's resync
  193. // checking period is established when the informer starts running,
  194. // and is the maximum of (a) the minimum of the resync periods
  195. // requested before the informer starts and the
  196. // defaultEventHandlerResyncPeriod given here and (b) the constant
  197. // `minimumResyncPeriod` defined in this file.
  198. func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  199. realClock := &clock.RealClock{}
  200. sharedIndexInformer := &sharedIndexInformer{
  201. processor: &sharedProcessor{clock: realClock},
  202. indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
  203. listerWatcher: lw,
  204. objectType: exampleObject,
  205. resyncCheckPeriod: defaultEventHandlerResyncPeriod,
  206. defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
  207. cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
  208. clock: realClock,
  209. }
  210. return sharedIndexInformer
  211. }
  212. // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
  213. type InformerSynced func() bool
  214. const (
  215. // syncedPollPeriod controls how often you look at the status of your sync funcs
  216. syncedPollPeriod = 100 * time.Millisecond
  217. // initialBufferSize is the initial number of event notifications that can be buffered.
  218. initialBufferSize = 1024
  219. )
  220. // WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
  221. // indicating that the caller identified by name is waiting for syncs, followed by
  222. // either a successful or failed sync.
  223. func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  224. klog.Infof("Waiting for caches to sync for %s", controllerName)
  225. if !WaitForCacheSync(stopCh, cacheSyncs...) {
  226. utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
  227. return false
  228. }
  229. klog.Infof("Caches are synced for %s ", controllerName)
  230. return true
  231. }
  232. // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
  233. // if the controller should shutdown
  234. // callers should prefer WaitForNamedCacheSync()
  235. func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  236. err := wait.PollImmediateUntil(syncedPollPeriod,
  237. func() (bool, error) {
  238. for _, syncFunc := range cacheSyncs {
  239. if !syncFunc() {
  240. return false, nil
  241. }
  242. }
  243. return true, nil
  244. },
  245. stopCh)
  246. if err != nil {
  247. klog.V(2).Infof("stop requested")
  248. return false
  249. }
  250. klog.V(4).Infof("caches populated")
  251. return true
  252. }
  253. // `*sharedIndexInformer` implements SharedIndexInformer and has three
  254. // main components. One is an indexed local cache, `indexer Indexer`.
  255. // The second main component is a Controller that pulls
  256. // objects/notifications using the ListerWatcher and pushes them into
  257. // a DeltaFIFO --- whose knownObjects is the informer's local cache
  258. // --- while concurrently Popping Deltas values from that fifo and
  259. // processing them with `sharedIndexInformer::HandleDeltas`. Each
  260. // invocation of HandleDeltas, which is done with the fifo's lock
  261. // held, processes each Delta in turn. For each Delta this both
  262. // updates the local cache and stuffs the relevant notification into
  263. // the sharedProcessor. The third main component is that
  264. // sharedProcessor, which is responsible for relaying those
  265. // notifications to each of the informer's clients.
  266. type sharedIndexInformer struct {
  267. indexer Indexer
  268. controller Controller
  269. processor *sharedProcessor
  270. cacheMutationDetector MutationDetector
  271. listerWatcher ListerWatcher
  272. // objectType is an example object of the type this informer is
  273. // expected to handle. Only the type needs to be right, except
  274. // that when that is `unstructured.Unstructured` the object's
  275. // `"apiVersion"` and `"kind"` must also be right.
  276. objectType runtime.Object
  277. // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
  278. // shouldResync to check if any of our listeners need a resync.
  279. resyncCheckPeriod time.Duration
  280. // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
  281. // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
  282. // value).
  283. defaultEventHandlerResyncPeriod time.Duration
  284. // clock allows for testability
  285. clock clock.Clock
  286. started, stopped bool
  287. startedLock sync.Mutex
  288. // blockDeltas gives a way to stop all event distribution so that a late event handler
  289. // can safely join the shared informer.
  290. blockDeltas sync.Mutex
  291. // Called whenever the ListAndWatch drops the connection with an error.
  292. watchErrorHandler WatchErrorHandler
  293. }
  294. // dummyController hides the fact that a SharedInformer is different from a dedicated one
  295. // where a caller can `Run`. The run method is disconnected in this case, because higher
  296. // level logic will decide when to start the SharedInformer and related controller.
  297. // Because returning information back is always asynchronous, the legacy callers shouldn't
  298. // notice any change in behavior.
  299. type dummyController struct {
  300. informer *sharedIndexInformer
  301. }
  302. func (v *dummyController) Run(stopCh <-chan struct{}) {
  303. }
  304. func (v *dummyController) HasSynced() bool {
  305. return v.informer.HasSynced()
  306. }
  307. func (v *dummyController) LastSyncResourceVersion() string {
  308. return ""
  309. }
  310. type updateNotification struct {
  311. oldObj interface{}
  312. newObj interface{}
  313. }
  314. type addNotification struct {
  315. newObj interface{}
  316. }
  317. type deleteNotification struct {
  318. oldObj interface{}
  319. }
  320. func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
  321. s.startedLock.Lock()
  322. defer s.startedLock.Unlock()
  323. if s.started {
  324. return fmt.Errorf("informer has already started")
  325. }
  326. s.watchErrorHandler = handler
  327. return nil
  328. }
  329. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  330. defer utilruntime.HandleCrash()
  331. fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  332. KnownObjects: s.indexer,
  333. EmitDeltaTypeReplaced: true,
  334. })
  335. cfg := &Config{
  336. Queue: fifo,
  337. ListerWatcher: s.listerWatcher,
  338. ObjectType: s.objectType,
  339. FullResyncPeriod: s.resyncCheckPeriod,
  340. RetryOnError: false,
  341. ShouldResync: s.processor.shouldResync,
  342. Process: s.HandleDeltas,
  343. WatchErrorHandler: s.watchErrorHandler,
  344. }
  345. func() {
  346. s.startedLock.Lock()
  347. defer s.startedLock.Unlock()
  348. s.controller = New(cfg)
  349. s.controller.(*controller).clock = s.clock
  350. s.started = true
  351. }()
  352. // Separate stop channel because Processor should be stopped strictly after controller
  353. processorStopCh := make(chan struct{})
  354. var wg wait.Group
  355. defer wg.Wait() // Wait for Processor to stop
  356. defer close(processorStopCh) // Tell Processor to stop
  357. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  358. wg.StartWithChannel(processorStopCh, s.processor.run)
  359. defer func() {
  360. s.startedLock.Lock()
  361. defer s.startedLock.Unlock()
  362. s.stopped = true // Don't want any new listeners
  363. }()
  364. s.controller.Run(stopCh)
  365. }
  366. func (s *sharedIndexInformer) HasSynced() bool {
  367. s.startedLock.Lock()
  368. defer s.startedLock.Unlock()
  369. if s.controller == nil {
  370. return false
  371. }
  372. return s.controller.HasSynced()
  373. }
  374. func (s *sharedIndexInformer) LastSyncResourceVersion() string {
  375. s.startedLock.Lock()
  376. defer s.startedLock.Unlock()
  377. if s.controller == nil {
  378. return ""
  379. }
  380. return s.controller.LastSyncResourceVersion()
  381. }
  382. func (s *sharedIndexInformer) GetStore() Store {
  383. return s.indexer
  384. }
  385. func (s *sharedIndexInformer) GetIndexer() Indexer {
  386. return s.indexer
  387. }
  388. func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
  389. s.startedLock.Lock()
  390. defer s.startedLock.Unlock()
  391. if s.started {
  392. return fmt.Errorf("informer has already started")
  393. }
  394. return s.indexer.AddIndexers(indexers)
  395. }
  396. func (s *sharedIndexInformer) GetController() Controller {
  397. return &dummyController{informer: s}
  398. }
  399. func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  400. s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
  401. }
  402. func determineResyncPeriod(desired, check time.Duration) time.Duration {
  403. if desired == 0 {
  404. return desired
  405. }
  406. if check == 0 {
  407. klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
  408. return 0
  409. }
  410. if desired < check {
  411. klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
  412. return check
  413. }
  414. return desired
  415. }
  416. const minimumResyncPeriod = 1 * time.Second
  417. func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
  418. s.startedLock.Lock()
  419. defer s.startedLock.Unlock()
  420. if s.stopped {
  421. klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
  422. return
  423. }
  424. if resyncPeriod > 0 {
  425. if resyncPeriod < minimumResyncPeriod {
  426. klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
  427. resyncPeriod = minimumResyncPeriod
  428. }
  429. if resyncPeriod < s.resyncCheckPeriod {
  430. if s.started {
  431. klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
  432. resyncPeriod = s.resyncCheckPeriod
  433. } else {
  434. // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
  435. // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
  436. // accordingly
  437. s.resyncCheckPeriod = resyncPeriod
  438. s.processor.resyncCheckPeriodChanged(resyncPeriod)
  439. }
  440. }
  441. }
  442. listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
  443. if !s.started {
  444. s.processor.addListener(listener)
  445. return
  446. }
  447. // in order to safely join, we have to
  448. // 1. stop sending add/update/delete notifications
  449. // 2. do a list against the store
  450. // 3. send synthetic "Add" events to the new handler
  451. // 4. unblock
  452. s.blockDeltas.Lock()
  453. defer s.blockDeltas.Unlock()
  454. s.processor.addListener(listener)
  455. for _, item := range s.indexer.List() {
  456. listener.add(addNotification{newObj: item})
  457. }
  458. }
  459. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  460. s.blockDeltas.Lock()
  461. defer s.blockDeltas.Unlock()
  462. // from oldest to newest
  463. for _, d := range obj.(Deltas) {
  464. switch d.Type {
  465. case Sync, Replaced, Added, Updated:
  466. s.cacheMutationDetector.AddObject(d.Object)
  467. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
  468. if err := s.indexer.Update(d.Object); err != nil {
  469. return err
  470. }
  471. isSync := false
  472. switch {
  473. case d.Type == Sync:
  474. // Sync events are only propagated to listeners that requested resync
  475. isSync = true
  476. case d.Type == Replaced:
  477. if accessor, err := meta.Accessor(d.Object); err == nil {
  478. if oldAccessor, err := meta.Accessor(old); err == nil {
  479. // Replaced events that didn't change resourceVersion are treated as resync events
  480. // and only propagated to listeners that requested resync
  481. isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
  482. }
  483. }
  484. }
  485. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
  486. } else {
  487. if err := s.indexer.Add(d.Object); err != nil {
  488. return err
  489. }
  490. s.processor.distribute(addNotification{newObj: d.Object}, false)
  491. }
  492. case Deleted:
  493. if err := s.indexer.Delete(d.Object); err != nil {
  494. return err
  495. }
  496. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
  497. }
  498. }
  499. return nil
  500. }
  501. // sharedProcessor has a collection of processorListener and can
  502. // distribute a notification object to its listeners. There are two
  503. // kinds of distribute operations. The sync distributions go to a
  504. // subset of the listeners that (a) is recomputed in the occasional
  505. // calls to shouldResync and (b) every listener is initially put in.
  506. // The non-sync distributions go to every listener.
  507. type sharedProcessor struct {
  508. listenersStarted bool
  509. listenersLock sync.RWMutex
  510. listeners []*processorListener
  511. syncingListeners []*processorListener
  512. clock clock.Clock
  513. wg wait.Group
  514. }
  515. func (p *sharedProcessor) addListener(listener *processorListener) {
  516. p.listenersLock.Lock()
  517. defer p.listenersLock.Unlock()
  518. p.addListenerLocked(listener)
  519. if p.listenersStarted {
  520. p.wg.Start(listener.run)
  521. p.wg.Start(listener.pop)
  522. }
  523. }
  524. func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
  525. p.listeners = append(p.listeners, listener)
  526. p.syncingListeners = append(p.syncingListeners, listener)
  527. }
  528. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  529. p.listenersLock.RLock()
  530. defer p.listenersLock.RUnlock()
  531. if sync {
  532. for _, listener := range p.syncingListeners {
  533. listener.add(obj)
  534. }
  535. } else {
  536. for _, listener := range p.listeners {
  537. listener.add(obj)
  538. }
  539. }
  540. }
  541. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  542. func() {
  543. p.listenersLock.RLock()
  544. defer p.listenersLock.RUnlock()
  545. for _, listener := range p.listeners {
  546. p.wg.Start(listener.run)
  547. p.wg.Start(listener.pop)
  548. }
  549. p.listenersStarted = true
  550. }()
  551. <-stopCh
  552. p.listenersLock.RLock()
  553. defer p.listenersLock.RUnlock()
  554. for _, listener := range p.listeners {
  555. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
  556. }
  557. p.wg.Wait() // Wait for all .pop() and .run() to stop
  558. }
  559. // shouldResync queries every listener to determine if any of them need a resync, based on each
  560. // listener's resyncPeriod.
  561. func (p *sharedProcessor) shouldResync() bool {
  562. p.listenersLock.Lock()
  563. defer p.listenersLock.Unlock()
  564. p.syncingListeners = []*processorListener{}
  565. resyncNeeded := false
  566. now := p.clock.Now()
  567. for _, listener := range p.listeners {
  568. // need to loop through all the listeners to see if they need to resync so we can prepare any
  569. // listeners that are going to be resyncing.
  570. if listener.shouldResync(now) {
  571. resyncNeeded = true
  572. p.syncingListeners = append(p.syncingListeners, listener)
  573. listener.determineNextResync(now)
  574. }
  575. }
  576. return resyncNeeded
  577. }
  578. func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
  579. p.listenersLock.RLock()
  580. defer p.listenersLock.RUnlock()
  581. for _, listener := range p.listeners {
  582. resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
  583. listener.setResyncPeriod(resyncPeriod)
  584. }
  585. }
  586. // processorListener relays notifications from a sharedProcessor to
  587. // one ResourceEventHandler --- using two goroutines, two unbuffered
  588. // channels, and an unbounded ring buffer. The `add(notification)`
  589. // function sends the given notification to `addCh`. One goroutine
  590. // runs `pop()`, which pumps notifications from `addCh` to `nextCh`
  591. // using storage in the ring buffer while `nextCh` is not keeping up.
  592. // Another goroutine runs `run()`, which receives notifications from
  593. // `nextCh` and synchronously invokes the appropriate handler method.
  594. //
  595. // processorListener also keeps track of the adjusted requested resync
  596. // period of the listener.
  597. type processorListener struct {
  598. nextCh chan interface{}
  599. addCh chan interface{}
  600. handler ResourceEventHandler
  601. // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
  602. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
  603. // added until we OOM.
  604. // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
  605. // we should try to do something better.
  606. pendingNotifications buffer.RingGrowing
  607. // requestedResyncPeriod is how frequently the listener wants a
  608. // full resync from the shared informer, but modified by two
  609. // adjustments. One is imposing a lower bound,
  610. // `minimumResyncPeriod`. The other is another lower bound, the
  611. // sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
  612. // in AddEventHandlerWithResyncPeriod invocations made after the
  613. // sharedProcessor starts and (b) only if the informer does
  614. // resyncs at all.
  615. requestedResyncPeriod time.Duration
  616. // resyncPeriod is the threshold that will be used in the logic
  617. // for this listener. This value differs from
  618. // requestedResyncPeriod only when the sharedIndexInformer does
  619. // not do resyncs, in which case the value here is zero. The
  620. // actual time between resyncs depends on when the
  621. // sharedProcessor's `shouldResync` function is invoked and when
  622. // the sharedIndexInformer processes `Sync` type Delta objects.
  623. resyncPeriod time.Duration
  624. // nextResync is the earliest time the listener should get a full resync
  625. nextResync time.Time
  626. // resyncLock guards access to resyncPeriod and nextResync
  627. resyncLock sync.Mutex
  628. }
  629. func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
  630. ret := &processorListener{
  631. nextCh: make(chan interface{}),
  632. addCh: make(chan interface{}),
  633. handler: handler,
  634. pendingNotifications: *buffer.NewRingGrowing(bufferSize),
  635. requestedResyncPeriod: requestedResyncPeriod,
  636. resyncPeriod: resyncPeriod,
  637. }
  638. ret.determineNextResync(now)
  639. return ret
  640. }
  641. func (p *processorListener) add(notification interface{}) {
  642. p.addCh <- notification
  643. }
  644. func (p *processorListener) pop() {
  645. defer utilruntime.HandleCrash()
  646. defer close(p.nextCh) // Tell .run() to stop
  647. var nextCh chan<- interface{}
  648. var notification interface{}
  649. for {
  650. select {
  651. case nextCh <- notification:
  652. // Notification dispatched
  653. var ok bool
  654. notification, ok = p.pendingNotifications.ReadOne()
  655. if !ok { // Nothing to pop
  656. nextCh = nil // Disable this select case
  657. }
  658. case notificationToAdd, ok := <-p.addCh:
  659. if !ok {
  660. return
  661. }
  662. if notification == nil { // No notification to pop (and pendingNotifications is empty)
  663. // Optimize the case - skip adding to pendingNotifications
  664. notification = notificationToAdd
  665. nextCh = p.nextCh
  666. } else { // There is already a notification waiting to be dispatched
  667. p.pendingNotifications.WriteOne(notificationToAdd)
  668. }
  669. }
  670. }
  671. }
  672. func (p *processorListener) run() {
  673. // this call blocks until the channel is closed. When a panic happens during the notification
  674. // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
  675. // the next notification will be attempted. This is usually better than the alternative of never
  676. // delivering again.
  677. stopCh := make(chan struct{})
  678. wait.Until(func() {
  679. for next := range p.nextCh {
  680. switch notification := next.(type) {
  681. case updateNotification:
  682. p.handler.OnUpdate(notification.oldObj, notification.newObj)
  683. case addNotification:
  684. p.handler.OnAdd(notification.newObj)
  685. case deleteNotification:
  686. p.handler.OnDelete(notification.oldObj)
  687. default:
  688. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
  689. }
  690. }
  691. // the only way to get here is if the p.nextCh is empty and closed
  692. close(stopCh)
  693. }, 1*time.Second, stopCh)
  694. }
  695. // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
  696. // this always returns false.
  697. func (p *processorListener) shouldResync(now time.Time) bool {
  698. p.resyncLock.Lock()
  699. defer p.resyncLock.Unlock()
  700. if p.resyncPeriod == 0 {
  701. return false
  702. }
  703. return now.After(p.nextResync) || now.Equal(p.nextResync)
  704. }
  705. func (p *processorListener) determineNextResync(now time.Time) {
  706. p.resyncLock.Lock()
  707. defer p.resyncLock.Unlock()
  708. p.nextResync = now.Add(p.resyncPeriod)
  709. }
  710. func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
  711. p.resyncLock.Lock()
  712. defer p.resyncLock.Unlock()
  713. p.resyncPeriod = resyncPeriod
  714. }