shared_informer.go 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "sync"
  19. "time"
  20. "k8s.io/apimachinery/pkg/api/meta"
  21. "k8s.io/apimachinery/pkg/runtime"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. "k8s.io/apimachinery/pkg/util/wait"
  24. "k8s.io/client-go/tools/cache/synctrack"
  25. "k8s.io/utils/buffer"
  26. "k8s.io/utils/clock"
  27. "k8s.io/utils/ptr"
  28. "k8s.io/klog/v2"
  29. clientgofeaturegate "k8s.io/client-go/features"
  30. )
  31. // SharedInformer provides eventually consistent linkage of its
  32. // clients to the authoritative state of a given collection of
  33. // objects. An object is identified by its API group, kind/resource,
  34. // namespace (if any), and name; the `ObjectMeta.UID` is not part of
  35. // an object's ID as far as this contract is concerned. One
  36. // SharedInformer provides linkage to objects of a particular API
  37. // group and kind/resource. The linked object collection of a
  38. // SharedInformer may be further restricted to one namespace (if
  39. // applicable) and/or by label selector and/or field selector.
  40. //
  41. // The authoritative state of an object is what apiservers provide
  42. // access to, and an object goes through a strict sequence of states.
  43. // An object state is either (1) present with a ResourceVersion and
  44. // other appropriate content or (2) "absent".
  45. //
  46. // A SharedInformer maintains a local cache --- exposed by GetStore(),
  47. // by GetIndexer() in the case of an indexed informer, and possibly by
  48. // machinery involved in creating and/or accessing the informer --- of
  49. // the state of each relevant object. This cache is eventually
  50. // consistent with the authoritative state. This means that, unless
  51. // prevented by persistent communication problems, if ever a
  52. // particular object ID X is authoritatively associated with a state S
  53. // then for every SharedInformer I whose collection includes (X, S)
  54. // eventually either (1) I's cache associates X with S or a later
  55. // state of X, (2) I is stopped, or (3) the authoritative state
  56. // service for X terminates. To be formally complete, we say that the
  57. // absent state meets any restriction by label selector or field
  58. // selector.
  59. //
  60. // For a given informer and relevant object ID X, the sequence of
  61. // states that appears in the informer's cache is a subsequence of the
  62. // states authoritatively associated with X. That is, some states
  63. // might never appear in the cache but ordering among the appearing
  64. // states is correct. Note, however, that there is no promise about
  65. // ordering between states seen for different objects.
  66. //
  67. // The local cache starts out empty, and gets populated and updated
  68. // during `Run()`.
  69. //
  70. // As a simple example, if a collection of objects is henceforth
  71. // unchanging, a SharedInformer is created that links to that
  72. // collection, and that SharedInformer is `Run()` then that
  73. // SharedInformer's cache eventually holds an exact copy of that
  74. // collection (unless it is stopped too soon, the authoritative state
  75. // service ends, or communication problems between the two
  76. // persistently thwart achievement).
  77. //
  78. // As another simple example, if the local cache ever holds a
  79. // non-absent state for some object ID and the object is eventually
  80. // removed from the authoritative state then eventually the object is
  81. // removed from the local cache (unless the SharedInformer is stopped
  82. // too soon, the authoritative state service ends, or communication
  83. // problems persistently thwart the desired result).
  84. //
  85. // The keys in the Store are of the form namespace/name for namespaced
  86. // objects, and are simply the name for non-namespaced objects.
  87. // Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
  88. // a given object, and `SplitMetaNamespaceKey(key)` to split a key
  89. // into its constituent parts.
  90. //
  91. // Every query against the local cache is answered entirely from one
  92. // snapshot of the cache's state. Thus, the result of a `List` call
  93. // will not contain two entries with the same namespace and name.
  94. //
  95. // A client is identified here by a ResourceEventHandler. For every
  96. // update to the SharedInformer's local cache and for every client
  97. // added before `Run()`, eventually either the SharedInformer is
  98. // stopped or the client is notified of the update. A client added
  99. // after `Run()` starts gets a startup batch of notifications of
  100. // additions of the objects existing in the cache at the time that
  101. // client was added; also, for every update to the SharedInformer's
  102. // local cache after that client was added, eventually either the
  103. // SharedInformer is stopped or that client is notified of that
  104. // update. Client notifications happen after the corresponding cache
  105. // update and, in the case of a SharedIndexInformer, after the
  106. // corresponding index updates. It is possible that additional cache
  107. // and index updates happen before such a prescribed notification.
  108. // For a given SharedInformer and client, the notifications are
  109. // delivered sequentially. For a given SharedInformer, client, and
  110. // object ID, the notifications are delivered in order. Because
  111. // `ObjectMeta.UID` has no role in identifying objects, it is possible
  112. // that when (1) object O1 with ID (e.g. namespace and name) X and
  113. // `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
  114. // and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
  115. // created the informer's clients are not notified of (1) and (2) but
  116. // rather are notified only of an update from O1 to O2. Clients that
  117. // need to detect such cases might do so by comparing the `ObjectMeta.UID`
  118. // field of the old and the new object in the code that handles update
  119. // notifications (i.e. `OnUpdate` method of ResourceEventHandler).
  120. //
  121. // A client must process each notification promptly; a SharedInformer
  122. // is not engineered to deal well with a large backlog of
  123. // notifications to deliver. Lengthy processing should be passed off
  124. // to something else, for example through a
  125. // `client-go/util/workqueue`.
  126. //
  127. // A delete notification exposes the last locally known non-absent
  128. // state, except that its ResourceVersion is replaced with a
  129. // ResourceVersion in which the object is actually absent.
  130. type SharedInformer interface {
  131. // AddEventHandler adds an event handler to the shared informer using
  132. // the shared informer's resync period. Events to a single handler are
  133. // delivered sequentially, but there is no coordination between
  134. // different handlers.
  135. // It returns a registration handle for the handler that can be used to
  136. // remove the handler again, or to tell if the handler is synced (has
  137. // seen every item in the initial list).
  138. //
  139. // Contextual logging: AddEventHandlerWithOptions together with a logger in the options should be used instead of AddEventHandler in code which supports contextual logging.
  140. AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
  141. // AddEventHandlerWithResyncPeriod adds an event handler to the
  142. // shared informer with the requested resync period; zero means
  143. // this handler does not care about resyncs. The resync operation
  144. // consists of delivering to the handler an update notification
  145. // for every object in the informer's local cache; it does not add
  146. // any interactions with the authoritative storage. Some
  147. // informers do no resyncs at all, not even for handlers added
  148. // with a non-zero resyncPeriod. For an informer that does
  149. // resyncs, and for each handler that requests resyncs, that
  150. // informer develops a nominal resync period that is no shorter
  151. // than the requested period but may be longer. The actual time
  152. // between any two resyncs may be longer than the nominal period
  153. // because the implementation takes time to do work and there may
  154. // be competing load and scheduling noise.
  155. // It returns a registration handle for the handler that can be used to remove
  156. // the handler again and an error if the handler cannot be added.
  157. //
  158. // Contextual logging: AddEventHandlerWithOptions together with a logger in the options should be used instead of AddEventHandlerWithResyncPeriod in code which supports contextual logging.
  159. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
  160. // AddEventHandlerWithOptions is a variant of AddEventHandlerWithResyncPeriod where
  161. // all optional parameters are passed in a struct.
  162. AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error)
  163. // RemoveEventHandler removes a formerly added event handler given by
  164. // its registration handle.
  165. // This function is guaranteed to be idempotent, and thread-safe.
  166. RemoveEventHandler(handle ResourceEventHandlerRegistration) error
  167. // GetStore returns the informer's local cache as a Store.
  168. GetStore() Store
  169. // GetController is deprecated, it does nothing useful
  170. GetController() Controller
  171. // Run starts and runs the shared informer, returning after it stops.
  172. // The informer will be stopped when stopCh is closed.
  173. //
  174. // Contextual logging: RunWithContext should be used instead of Run in code which uses contextual logging.
  175. Run(stopCh <-chan struct{})
  176. // RunWithContext starts and runs the shared informer, returning after it stops.
  177. // The informer will be stopped when the context is canceled.
  178. RunWithContext(ctx context.Context)
  179. // HasSynced returns true if the shared informer's store has been
  180. // informed by at least one full LIST of the authoritative state
  181. // of the informer's object collection. This is unrelated to "resync".
  182. //
  183. // Note that this doesn't tell you if an individual handler is synced!!
  184. // For that, please call HasSynced on the handle returned by
  185. // AddEventHandler.
  186. HasSynced() bool
  187. // LastSyncResourceVersion is the resource version observed when last synced with the underlying
  188. // store. The value returned is not synchronized with access to the underlying store and is not
  189. // thread-safe.
  190. LastSyncResourceVersion() string
  191. // The WatchErrorHandler is called whenever ListAndWatch drops the
  192. // connection with an error. After calling this handler, the informer
  193. // will backoff and retry.
  194. //
  195. // The default implementation looks at the error type and tries to log
  196. // the error message at an appropriate level.
  197. //
  198. // There's only one handler, so if you call this multiple times, last one
  199. // wins; calling after the informer has been started returns an error.
  200. //
  201. // The handler is intended for visibility, not to e.g. pause the consumers.
  202. // The handler should return quickly - any expensive processing should be
  203. // offloaded.
  204. //
  205. // Contextual logging: SetWatchErrorHandlerWithContext should be used instead of SetWatchErrorHandler in code which supports contextual logging.
  206. SetWatchErrorHandler(handler WatchErrorHandler) error
  207. // SetWatchErrorHandlerWithContext is a variant of SetWatchErrorHandler where
  208. // the handler is passed an additional context parameter.
  209. SetWatchErrorHandlerWithContext(handler WatchErrorHandlerWithContext) error
  210. // The TransformFunc is called for each object which is about to be stored.
  211. //
  212. // This function is intended for you to take the opportunity to
  213. // remove, transform, or normalize fields. One use case is to strip unused
  214. // metadata fields out of objects to save on RAM cost.
  215. //
  216. // Must be set before starting the informer.
  217. //
  218. // Please see the comment on TransformFunc for more details.
  219. SetTransform(handler TransformFunc) error
  220. // IsStopped reports whether the informer has already been stopped.
  221. // Adding event handlers to already stopped informers is not possible.
  222. // An informer already stopped will never be started again.
  223. IsStopped() bool
  224. }
  225. // Opaque interface representing the registration of ResourceEventHandler for
  226. // a SharedInformer. Must be supplied back to the same SharedInformer's
  227. // `RemoveEventHandler` to unregister the handlers.
  228. //
  229. // Also used to tell if the handler is synced (has had all items in the initial
  230. // list delivered).
  231. type ResourceEventHandlerRegistration interface {
  232. // HasSynced reports if both the parent has synced and all pre-sync
  233. // events have been delivered.
  234. HasSynced() bool
  235. }
  236. // Optional configuration options for [SharedInformer.AddEventHandlerWithOptions].
  237. // May be left empty.
  238. type HandlerOptions struct {
  239. // Logger overrides the default klog.Background() logger.
  240. Logger *klog.Logger
  241. // ResyncPeriod requests a certain resync period from an informer. Zero
  242. // means the handler does not care about resyncs. Not all informers do
  243. // resyncs, even if requested. See
  244. // [SharedInformer.AddEventHandlerWithResyncPeriod] for details.
  245. //
  246. // If nil, the default resync period of the shared informer is used.
  247. ResyncPeriod *time.Duration
  248. }
  249. // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
  250. type SharedIndexInformer interface {
  251. SharedInformer
  252. AddIndexers(indexers Indexers) error
  253. GetIndexer() Indexer
  254. }
  255. // NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
  256. func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
  257. return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
  258. }
  259. // NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
  260. // NewSharedIndexInformerWithOptions for full details.
  261. func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  262. return NewSharedIndexInformerWithOptions(
  263. lw,
  264. exampleObject,
  265. SharedIndexInformerOptions{
  266. ResyncPeriod: defaultEventHandlerResyncPeriod,
  267. Indexers: indexers,
  268. },
  269. )
  270. }
  271. // NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
  272. // The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each
  273. // handler that with a non-zero requested resync period, whether added
  274. // before or after the informer starts, the nominal resync period is
  275. // the requested resync period rounded up to a multiple of the
  276. // informer's resync checking period. Such an informer's resync
  277. // checking period is established when the informer starts running,
  278. // and is the maximum of (a) the minimum of the resync periods
  279. // requested before the informer starts and the
  280. // options.ResyncPeriod given here and (b) the constant
  281. // `minimumResyncPeriod` defined in this file.
  282. func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
  283. realClock := &clock.RealClock{}
  284. return &sharedIndexInformer{
  285. indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
  286. processor: &sharedProcessor{clock: realClock},
  287. listerWatcher: lw,
  288. objectType: exampleObject,
  289. objectDescription: options.ObjectDescription,
  290. resyncCheckPeriod: options.ResyncPeriod,
  291. defaultEventHandlerResyncPeriod: options.ResyncPeriod,
  292. clock: realClock,
  293. cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
  294. }
  295. }
  296. // SharedIndexInformerOptions configures a sharedIndexInformer.
  297. type SharedIndexInformerOptions struct {
  298. // ResyncPeriod is the default event handler resync period and resync check
  299. // period. If unset/unspecified, these are defaulted to 0 (do not resync).
  300. ResyncPeriod time.Duration
  301. // Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
  302. Indexers Indexers
  303. // ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
  304. // underlying Reflector's type description.
  305. ObjectDescription string
  306. }
  307. // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
  308. type InformerSynced func() bool
  309. const (
  310. // syncedPollPeriod controls how often you look at the status of your sync funcs
  311. syncedPollPeriod = 100 * time.Millisecond
  312. // initialBufferSize is the initial number of event notifications that can be buffered.
  313. initialBufferSize = 1024
  314. )
  315. // WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
  316. // indicating that the caller identified by name is waiting for syncs, followed by
  317. // either a successful or failed sync.
  318. //
  319. // Contextual logging: WaitForNamedCacheSyncWithContext should be used instead of WaitForNamedCacheSync in code which supports contextual logging.
  320. func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  321. klog.Background().Info("Waiting for caches to sync", "controller", controllerName)
  322. if !WaitForCacheSync(stopCh, cacheSyncs...) {
  323. utilruntime.HandleErrorWithContext(context.Background(), nil, "Unable to sync caches", "controller", controllerName)
  324. return false
  325. }
  326. klog.Background().Info("Caches are synced", "controller", controllerName)
  327. return true
  328. }
  329. // WaitForNamedCacheSyncWithContext is a wrapper around WaitForCacheSyncWithContext that generates log messages
  330. // indicating that the caller is waiting for syncs, followed by either a successful or failed sync.
  331. //
  332. // Contextual logging can be used to identify the caller in those log messages. The log level is zero,
  333. // the same as in [WaitForNamedCacheSync]. If this is too verbose, then store a logger with an increased
  334. // threshold in the context:
  335. //
  336. // WaitForNamedCacheSyncWithContext(klog.NewContext(ctx, logger.V(5)), ...)
  337. func WaitForNamedCacheSyncWithContext(ctx context.Context, cacheSyncs ...InformerSynced) bool {
  338. logger := klog.FromContext(ctx)
  339. logger.Info("Waiting for caches to sync")
  340. if !WaitForCacheSync(ctx.Done(), cacheSyncs...) {
  341. utilruntime.HandleErrorWithContext(ctx, nil, "Unable to sync caches")
  342. return false
  343. }
  344. logger.Info("Caches are synced")
  345. return true
  346. }
  347. // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
  348. // if the controller should shutdown
  349. // callers should prefer WaitForNamedCacheSync()
  350. func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  351. err := wait.PollImmediateUntil(syncedPollPeriod,
  352. func() (bool, error) {
  353. for _, syncFunc := range cacheSyncs {
  354. if !syncFunc() {
  355. return false, nil
  356. }
  357. }
  358. return true, nil
  359. },
  360. stopCh)
  361. if err != nil {
  362. return false
  363. }
  364. return true
  365. }
  366. // `*sharedIndexInformer` implements SharedIndexInformer and has three
  367. // main components. One is an indexed local cache, `indexer Indexer`.
  368. // The second main component is a Controller that pulls
  369. // objects/notifications using the ListerWatcher and pushes them into
  370. // a DeltaFIFO --- whose knownObjects is the informer's local cache
  371. // --- while concurrently Popping Deltas values from that fifo and
  372. // processing them with `sharedIndexInformer::HandleDeltas`. Each
  373. // invocation of HandleDeltas, which is done with the fifo's lock
  374. // held, processes each Delta in turn. For each Delta this both
  375. // updates the local cache and stuffs the relevant notification into
  376. // the sharedProcessor. The third main component is that
  377. // sharedProcessor, which is responsible for relaying those
  378. // notifications to each of the informer's clients.
  379. type sharedIndexInformer struct {
  380. indexer Indexer
  381. controller Controller
  382. processor *sharedProcessor
  383. cacheMutationDetector MutationDetector
  384. listerWatcher ListerWatcher
  385. // objectType is an example object of the type this informer is expected to handle. If set, an event
  386. // with an object with a mismatching type is dropped instead of being delivered to listeners.
  387. objectType runtime.Object
  388. // objectDescription is the description of this informer's objects. This typically defaults to
  389. objectDescription string
  390. // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
  391. // shouldResync to check if any of our listeners need a resync.
  392. resyncCheckPeriod time.Duration
  393. // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
  394. // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
  395. // value).
  396. defaultEventHandlerResyncPeriod time.Duration
  397. // clock allows for testability
  398. clock clock.Clock
  399. started, stopped bool
  400. startedLock sync.Mutex
  401. // blockDeltas gives a way to stop all event distribution so that a late event handler
  402. // can safely join the shared informer.
  403. blockDeltas sync.Mutex
  404. // Called whenever the ListAndWatch drops the connection with an error.
  405. watchErrorHandler WatchErrorHandlerWithContext
  406. transform TransformFunc
  407. }
  408. // dummyController hides the fact that a SharedInformer is different from a dedicated one
  409. // where a caller can `Run`. The run method is disconnected in this case, because higher
  410. // level logic will decide when to start the SharedInformer and related controller.
  411. // Because returning information back is always asynchronous, the legacy callers shouldn't
  412. // notice any change in behavior.
  413. type dummyController struct {
  414. informer *sharedIndexInformer
  415. }
  416. func (v *dummyController) RunWithContext(context.Context) {
  417. }
  418. func (v *dummyController) Run(stopCh <-chan struct{}) {
  419. }
  420. func (v *dummyController) HasSynced() bool {
  421. return v.informer.HasSynced()
  422. }
  423. func (v *dummyController) LastSyncResourceVersion() string {
  424. if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) {
  425. return v.informer.LastSyncResourceVersion()
  426. }
  427. return ""
  428. }
  429. type updateNotification struct {
  430. oldObj interface{}
  431. newObj interface{}
  432. }
  433. type addNotification struct {
  434. newObj interface{}
  435. isInInitialList bool
  436. }
  437. type deleteNotification struct {
  438. oldObj interface{}
  439. }
  440. func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
  441. return s.SetWatchErrorHandlerWithContext(func(_ context.Context, r *Reflector, err error) {
  442. handler(r, err)
  443. })
  444. }
  445. func (s *sharedIndexInformer) SetWatchErrorHandlerWithContext(handler WatchErrorHandlerWithContext) error {
  446. s.startedLock.Lock()
  447. defer s.startedLock.Unlock()
  448. if s.started {
  449. return fmt.Errorf("informer has already started")
  450. }
  451. s.watchErrorHandler = handler
  452. return nil
  453. }
  454. func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
  455. s.startedLock.Lock()
  456. defer s.startedLock.Unlock()
  457. if s.started {
  458. return fmt.Errorf("informer has already started")
  459. }
  460. s.transform = handler
  461. return nil
  462. }
  463. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  464. s.RunWithContext(wait.ContextForChannel(stopCh))
  465. }
  466. func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
  467. defer utilruntime.HandleCrashWithContext(ctx)
  468. logger := klog.FromContext(ctx)
  469. if s.HasStarted() {
  470. logger.Info("Warning: the sharedIndexInformer has started, run more than once is not allowed")
  471. return
  472. }
  473. func() {
  474. s.startedLock.Lock()
  475. defer s.startedLock.Unlock()
  476. fifo := newQueueFIFO(s.indexer, s.transform)
  477. cfg := &Config{
  478. Queue: fifo,
  479. ListerWatcher: s.listerWatcher,
  480. ObjectType: s.objectType,
  481. ObjectDescription: s.objectDescription,
  482. FullResyncPeriod: s.resyncCheckPeriod,
  483. ShouldResync: s.processor.shouldResync,
  484. Process: s.HandleDeltas,
  485. ProcessBatch: s.HandleBatchDeltas,
  486. WatchErrorHandlerWithContext: s.watchErrorHandler,
  487. }
  488. s.controller = New(cfg)
  489. s.controller.(*controller).clock = s.clock
  490. s.started = true
  491. }()
  492. // Separate stop context because Processor should be stopped strictly after controller.
  493. // Cancelation in the parent context is ignored and all values are passed on,
  494. // including - but not limited to - a logger.
  495. processorStopCtx, stopProcessor := context.WithCancelCause(context.WithoutCancel(ctx))
  496. var wg wait.Group
  497. defer wg.Wait() // Wait for Processor to stop
  498. defer stopProcessor(errors.New("informer is stopping")) // Tell Processor to stop
  499. // TODO: extend the MutationDetector interface so that it optionally
  500. // has a RunWithContext method that we can use here.
  501. wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run)
  502. wg.StartWithContext(processorStopCtx, s.processor.run)
  503. defer func() {
  504. s.startedLock.Lock()
  505. defer s.startedLock.Unlock()
  506. s.stopped = true // Don't want any new listeners
  507. }()
  508. s.controller.RunWithContext(ctx)
  509. }
  510. func (s *sharedIndexInformer) HasStarted() bool {
  511. s.startedLock.Lock()
  512. defer s.startedLock.Unlock()
  513. return s.started
  514. }
  515. func (s *sharedIndexInformer) HasSynced() bool {
  516. s.startedLock.Lock()
  517. defer s.startedLock.Unlock()
  518. if s.controller == nil {
  519. return false
  520. }
  521. return s.controller.HasSynced()
  522. }
  523. func (s *sharedIndexInformer) LastSyncResourceVersion() string {
  524. s.startedLock.Lock()
  525. defer s.startedLock.Unlock()
  526. if s.controller == nil {
  527. return ""
  528. }
  529. return s.controller.LastSyncResourceVersion()
  530. }
  531. func (s *sharedIndexInformer) GetStore() Store {
  532. return s.indexer
  533. }
  534. func (s *sharedIndexInformer) GetIndexer() Indexer {
  535. return s.indexer
  536. }
  537. func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
  538. s.startedLock.Lock()
  539. defer s.startedLock.Unlock()
  540. if s.stopped {
  541. return fmt.Errorf("indexer was not added because it has stopped already")
  542. }
  543. return s.indexer.AddIndexers(indexers)
  544. }
  545. func (s *sharedIndexInformer) GetController() Controller {
  546. return &dummyController{informer: s}
  547. }
  548. func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
  549. return s.AddEventHandlerWithOptions(handler, HandlerOptions{})
  550. }
  551. func determineResyncPeriod(logger klog.Logger, desired, check time.Duration) time.Duration {
  552. if desired == 0 {
  553. return desired
  554. }
  555. if check == 0 {
  556. logger.Info("Warning: the specified resyncPeriod is invalid because this shared informer doesn't support resyncing", "desired", desired)
  557. return 0
  558. }
  559. if desired < check {
  560. logger.Info("Warning: the specified resyncPeriod is being increased to the minimum resyncCheckPeriod", "desired", desired, "resyncCheckPeriod", check)
  561. return check
  562. }
  563. return desired
  564. }
  565. const minimumResyncPeriod = 1 * time.Second
  566. func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
  567. return s.AddEventHandlerWithOptions(handler, HandlerOptions{ResyncPeriod: &resyncPeriod})
  568. }
  569. func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error) {
  570. s.startedLock.Lock()
  571. defer s.startedLock.Unlock()
  572. if s.stopped {
  573. return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
  574. }
  575. logger := ptr.Deref(options.Logger, klog.Background())
  576. resyncPeriod := ptr.Deref(options.ResyncPeriod, s.defaultEventHandlerResyncPeriod)
  577. if resyncPeriod > 0 {
  578. if resyncPeriod < minimumResyncPeriod {
  579. logger.Info("Warning: resync period is too small. Changing it to the minimum allowed value", "resyncPeriod", resyncPeriod, "minimumResyncPeriod", minimumResyncPeriod)
  580. resyncPeriod = minimumResyncPeriod
  581. }
  582. if resyncPeriod < s.resyncCheckPeriod {
  583. if s.started {
  584. logger.Info("Warning: resync period is smaller than resync check period and the informer has already started. Changing it to the resync check period", "resyncPeriod", resyncPeriod, "resyncCheckPeriod", s.resyncCheckPeriod)
  585. resyncPeriod = s.resyncCheckPeriod
  586. } else {
  587. // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
  588. // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
  589. // accordingly
  590. s.resyncCheckPeriod = resyncPeriod
  591. s.processor.resyncCheckPeriodChanged(logger, resyncPeriod)
  592. }
  593. }
  594. }
  595. listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
  596. if !s.started {
  597. return s.processor.addListener(listener), nil
  598. }
  599. // in order to safely join, we have to
  600. // 1. stop sending add/update/delete notifications
  601. // 2. do a list against the store
  602. // 3. send synthetic "Add" events to the new handler
  603. // 4. unblock
  604. s.blockDeltas.Lock()
  605. defer s.blockDeltas.Unlock()
  606. handle := s.processor.addListener(listener)
  607. for _, item := range s.indexer.List() {
  608. // Note that we enqueue these notifications with the lock held
  609. // and before returning the handle. That means there is never a
  610. // chance for anyone to call the handle's HasSynced method in a
  611. // state when it would falsely return true (i.e., when the
  612. // shared informer is synced but it has not observed an Add
  613. // with isInitialList being true, nor when the thread
  614. // processing notifications somehow goes faster than this
  615. // thread adding them and the counter is temporarily zero).
  616. listener.add(addNotification{newObj: item, isInInitialList: true})
  617. }
  618. return handle, nil
  619. }
  620. func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
  621. s.blockDeltas.Lock()
  622. defer s.blockDeltas.Unlock()
  623. if deltas, ok := obj.(Deltas); ok {
  624. return processDeltas(s, s.indexer, deltas, isInInitialList)
  625. }
  626. return errors.New("object given as Process argument is not Deltas")
  627. }
  628. func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error {
  629. s.blockDeltas.Lock()
  630. defer s.blockDeltas.Unlock()
  631. return processDeltasInBatch(s, s.indexer, deltas, isInInitialList)
  632. }
  633. // Conforms to ResourceEventHandler
  634. func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
  635. // Invocation of this function is locked under s.blockDeltas, so it is
  636. // safe to distribute the notification
  637. s.cacheMutationDetector.AddObject(obj)
  638. s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
  639. }
  640. // Conforms to ResourceEventHandler
  641. func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
  642. isSync := false
  643. // If is a Sync event, isSync should be true
  644. // If is a Replaced event, isSync is true if resource version is unchanged.
  645. // If RV is unchanged: this is a Sync/Replaced event, so isSync is true
  646. if accessor, err := meta.Accessor(new); err == nil {
  647. if oldAccessor, err := meta.Accessor(old); err == nil {
  648. // Events that didn't change resourceVersion are treated as resync events
  649. // and only propagated to listeners that requested resync
  650. isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
  651. }
  652. }
  653. // Invocation of this function is locked under s.blockDeltas, so it is
  654. // safe to distribute the notification
  655. s.cacheMutationDetector.AddObject(new)
  656. s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
  657. }
  658. // Conforms to ResourceEventHandler
  659. func (s *sharedIndexInformer) OnDelete(old interface{}) {
  660. // Invocation of this function is locked under s.blockDeltas, so it is
  661. // safe to distribute the notification
  662. s.processor.distribute(deleteNotification{oldObj: old}, false)
  663. }
  664. // IsStopped reports whether the informer has already been stopped
  665. func (s *sharedIndexInformer) IsStopped() bool {
  666. s.startedLock.Lock()
  667. defer s.startedLock.Unlock()
  668. return s.stopped
  669. }
  670. func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
  671. s.startedLock.Lock()
  672. defer s.startedLock.Unlock()
  673. // in order to safely remove, we have to
  674. // 1. stop sending add/update/delete notifications
  675. // 2. remove and stop listener
  676. // 3. unblock
  677. s.blockDeltas.Lock()
  678. defer s.blockDeltas.Unlock()
  679. return s.processor.removeListener(handle)
  680. }
  681. // sharedProcessor has a collection of processorListener and can
  682. // distribute a notification object to its listeners. There are two
  683. // kinds of distribute operations. The sync distributions go to a
  684. // subset of the listeners that (a) is recomputed in the occasional
  685. // calls to shouldResync and (b) every listener is initially put in.
  686. // The non-sync distributions go to every listener.
  687. type sharedProcessor struct {
  688. listenersStarted bool
  689. listenersLock sync.RWMutex
  690. // Map from listeners to whether or not they are currently syncing
  691. listeners map[*processorListener]bool
  692. clock clock.Clock
  693. wg wait.Group
  694. }
  695. func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
  696. p.listenersLock.RLock()
  697. defer p.listenersLock.RUnlock()
  698. if p.listeners == nil {
  699. return nil
  700. }
  701. if result, ok := registration.(*processorListener); ok {
  702. if _, exists := p.listeners[result]; exists {
  703. return result
  704. }
  705. }
  706. return nil
  707. }
  708. func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
  709. p.listenersLock.Lock()
  710. defer p.listenersLock.Unlock()
  711. if p.listeners == nil {
  712. p.listeners = make(map[*processorListener]bool)
  713. }
  714. p.listeners[listener] = true
  715. if p.listenersStarted {
  716. p.wg.Start(listener.run)
  717. p.wg.Start(listener.pop)
  718. }
  719. return listener
  720. }
  721. func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
  722. p.listenersLock.Lock()
  723. defer p.listenersLock.Unlock()
  724. listener, ok := handle.(*processorListener)
  725. if !ok {
  726. return fmt.Errorf("invalid key type %t", handle)
  727. } else if p.listeners == nil {
  728. // No listeners are registered, do nothing
  729. return nil
  730. } else if _, exists := p.listeners[listener]; !exists {
  731. // Listener is not registered, just do nothing
  732. return nil
  733. }
  734. delete(p.listeners, listener)
  735. if p.listenersStarted {
  736. close(listener.addCh)
  737. }
  738. return nil
  739. }
  740. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  741. p.listenersLock.RLock()
  742. defer p.listenersLock.RUnlock()
  743. for listener, isSyncing := range p.listeners {
  744. switch {
  745. case !sync:
  746. // non-sync messages are delivered to every listener
  747. listener.add(obj)
  748. case isSyncing:
  749. // sync messages are delivered to every syncing listener
  750. listener.add(obj)
  751. default:
  752. // skipping a sync obj for a non-syncing listener
  753. }
  754. }
  755. }
  756. func (p *sharedProcessor) run(ctx context.Context) {
  757. func() {
  758. p.listenersLock.RLock()
  759. defer p.listenersLock.RUnlock()
  760. for listener := range p.listeners {
  761. p.wg.Start(listener.run)
  762. p.wg.Start(listener.pop)
  763. }
  764. p.listenersStarted = true
  765. }()
  766. <-ctx.Done()
  767. p.listenersLock.Lock()
  768. defer p.listenersLock.Unlock()
  769. for listener := range p.listeners {
  770. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
  771. }
  772. // Wipe out list of listeners since they are now closed
  773. // (processorListener cannot be re-used)
  774. p.listeners = nil
  775. // Reset to false since no listeners are running
  776. p.listenersStarted = false
  777. p.wg.Wait() // Wait for all .pop() and .run() to stop
  778. }
  779. // shouldResync queries every listener to determine if any of them need a resync, based on each
  780. // listener's resyncPeriod.
  781. func (p *sharedProcessor) shouldResync() bool {
  782. p.listenersLock.Lock()
  783. defer p.listenersLock.Unlock()
  784. resyncNeeded := false
  785. now := p.clock.Now()
  786. for listener := range p.listeners {
  787. // need to loop through all the listeners to see if they need to resync so we can prepare any
  788. // listeners that are going to be resyncing.
  789. shouldResync := listener.shouldResync(now)
  790. p.listeners[listener] = shouldResync
  791. if shouldResync {
  792. resyncNeeded = true
  793. listener.determineNextResync(now)
  794. }
  795. }
  796. return resyncNeeded
  797. }
  798. func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncCheckPeriod time.Duration) {
  799. p.listenersLock.RLock()
  800. defer p.listenersLock.RUnlock()
  801. for listener := range p.listeners {
  802. resyncPeriod := determineResyncPeriod(
  803. logger, listener.requestedResyncPeriod, resyncCheckPeriod)
  804. listener.setResyncPeriod(resyncPeriod)
  805. }
  806. }
  807. // processorListener relays notifications from a sharedProcessor to
  808. // one ResourceEventHandler --- using two goroutines, two unbuffered
  809. // channels, and an unbounded ring buffer. The `add(notification)`
  810. // function sends the given notification to `addCh`. One goroutine
  811. // runs `pop()`, which pumps notifications from `addCh` to `nextCh`
  812. // using storage in the ring buffer while `nextCh` is not keeping up.
  813. // Another goroutine runs `run()`, which receives notifications from
  814. // `nextCh` and synchronously invokes the appropriate handler method.
  815. //
  816. // processorListener also keeps track of the adjusted requested resync
  817. // period of the listener.
  818. type processorListener struct {
  819. logger klog.Logger
  820. nextCh chan interface{}
  821. addCh chan interface{}
  822. handler ResourceEventHandler
  823. syncTracker *synctrack.SingleFileTracker
  824. // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
  825. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
  826. // added until we OOM.
  827. // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
  828. // we should try to do something better.
  829. pendingNotifications buffer.RingGrowing
  830. // requestedResyncPeriod is how frequently the listener wants a
  831. // full resync from the shared informer, but modified by two
  832. // adjustments. One is imposing a lower bound,
  833. // `minimumResyncPeriod`. The other is another lower bound, the
  834. // sharedIndexInformer's `resyncCheckPeriod`, that is imposed (a) only
  835. // in AddEventHandlerWithResyncPeriod invocations made after the
  836. // sharedIndexInformer starts and (b) only if the informer does
  837. // resyncs at all.
  838. requestedResyncPeriod time.Duration
  839. // resyncPeriod is the threshold that will be used in the logic
  840. // for this listener. This value differs from
  841. // requestedResyncPeriod only when the sharedIndexInformer does
  842. // not do resyncs, in which case the value here is zero. The
  843. // actual time between resyncs depends on when the
  844. // sharedProcessor's `shouldResync` function is invoked and when
  845. // the sharedIndexInformer processes `Sync` type Delta objects.
  846. resyncPeriod time.Duration
  847. // nextResync is the earliest time the listener should get a full resync
  848. nextResync time.Time
  849. // resyncLock guards access to resyncPeriod and nextResync
  850. resyncLock sync.Mutex
  851. }
  852. // HasSynced returns true if the source informer has synced, and all
  853. // corresponding events have been delivered.
  854. func (p *processorListener) HasSynced() bool {
  855. return p.syncTracker.HasSynced()
  856. }
  857. func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
  858. ret := &processorListener{
  859. logger: logger,
  860. nextCh: make(chan interface{}),
  861. addCh: make(chan interface{}),
  862. handler: handler,
  863. syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
  864. pendingNotifications: *buffer.NewRingGrowing(bufferSize),
  865. requestedResyncPeriod: requestedResyncPeriod,
  866. resyncPeriod: resyncPeriod,
  867. }
  868. ret.determineNextResync(now)
  869. return ret
  870. }
  871. func (p *processorListener) add(notification interface{}) {
  872. if a, ok := notification.(addNotification); ok && a.isInInitialList {
  873. p.syncTracker.Start()
  874. }
  875. p.addCh <- notification
  876. }
  877. func (p *processorListener) pop() {
  878. defer utilruntime.HandleCrashWithLogger(p.logger)
  879. defer close(p.nextCh) // Tell .run() to stop
  880. var nextCh chan<- interface{}
  881. var notification interface{}
  882. for {
  883. select {
  884. case nextCh <- notification:
  885. // Notification dispatched
  886. var ok bool
  887. notification, ok = p.pendingNotifications.ReadOne()
  888. if !ok { // Nothing to pop
  889. nextCh = nil // Disable this select case
  890. }
  891. case notificationToAdd, ok := <-p.addCh:
  892. if !ok {
  893. return
  894. }
  895. if notification == nil { // No notification to pop (and pendingNotifications is empty)
  896. // Optimize the case - skip adding to pendingNotifications
  897. notification = notificationToAdd
  898. nextCh = p.nextCh
  899. } else { // There is already a notification waiting to be dispatched
  900. p.pendingNotifications.WriteOne(notificationToAdd)
  901. }
  902. }
  903. }
  904. }
  905. func (p *processorListener) run() {
  906. // this call blocks until the channel is closed. When a panic happens during the notification
  907. // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
  908. // the next notification will be attempted. This is usually better than the alternative of never
  909. // delivering again.
  910. //
  911. // This only applies if utilruntime is configured to not panic, which is not the default.
  912. sleepAfterCrash := false
  913. for next := range p.nextCh {
  914. if sleepAfterCrash {
  915. // Sleep before processing the next item.
  916. time.Sleep(time.Second)
  917. }
  918. func() {
  919. // Gets reset below, but only if we get that far.
  920. sleepAfterCrash = true
  921. defer utilruntime.HandleCrashWithLogger(p.logger)
  922. switch notification := next.(type) {
  923. case updateNotification:
  924. p.handler.OnUpdate(notification.oldObj, notification.newObj)
  925. case addNotification:
  926. p.handler.OnAdd(notification.newObj, notification.isInInitialList)
  927. if notification.isInInitialList {
  928. p.syncTracker.Finished()
  929. }
  930. case deleteNotification:
  931. p.handler.OnDelete(notification.oldObj)
  932. default:
  933. utilruntime.HandleErrorWithLogger(p.logger, nil, "unrecognized notification", "notificationType", fmt.Sprintf("%T", next))
  934. }
  935. sleepAfterCrash = false
  936. }()
  937. }
  938. }
  939. // shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0,
  940. // this always returns false.
  941. func (p *processorListener) shouldResync(now time.Time) bool {
  942. p.resyncLock.Lock()
  943. defer p.resyncLock.Unlock()
  944. if p.resyncPeriod == 0 {
  945. return false
  946. }
  947. return now.After(p.nextResync) || now.Equal(p.nextResync)
  948. }
  949. func (p *processorListener) determineNextResync(now time.Time) {
  950. p.resyncLock.Lock()
  951. defer p.resyncLock.Unlock()
  952. p.nextResync = now.Add(p.resyncPeriod)
  953. }
  954. func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
  955. p.resyncLock.Lock()
  956. defer p.resyncLock.Unlock()
  957. p.resyncPeriod = resyncPeriod
  958. }