| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "math/rand"
- "reflect"
- "strings"
- "sync"
- "time"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/api/meta"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/naming"
- utilnet "k8s.io/apimachinery/pkg/util/net"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- clientfeatures "k8s.io/client-go/features"
- "k8s.io/client-go/tools/pager"
- "k8s.io/client-go/util/watchlist"
- "k8s.io/klog/v2"
- "k8s.io/utils/clock"
- "k8s.io/utils/ptr"
- "k8s.io/utils/trace"
- )
- const defaultExpectedTypeName = "<unspecified>"
- // We try to spread the load on apiserver by setting timeouts for
- // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
- var defaultMinWatchTimeout = 5 * time.Minute
- // ReflectorStore is the subset of cache.Store that the reflector uses
- type ReflectorStore interface {
- // Add adds the given object to the accumulator associated with the given object's key
- Add(obj interface{}) error
- // Update updates the given object in the accumulator associated with the given object's key
- Update(obj interface{}) error
- // Delete deletes the given object from the accumulator associated with the given object's key
- Delete(obj interface{}) error
- // Replace will delete the contents of the store, using instead the
- // given list. Store takes ownership of the list, you should not reference
- // it after calling this function.
- Replace([]interface{}, string) error
- // Resync is meaningless in the terms appearing here but has
- // meaning in some implementations that have non-trivial
- // additional behavior (e.g., DeltaFIFO).
- Resync() error
- }
- // TransformingStore is an optional interface that can be implemented by the provided store.
- // If implemented on the provided store reflector will use the same transformer in its internal stores.
- type TransformingStore interface {
- ReflectorStore
- Transformer() TransformFunc
- }
- // Reflector watches a specified resource and causes all changes to be reflected in the given store.
- type Reflector struct {
- // name identifies this reflector. By default, it will be a file:line if possible.
- name string
- // The name of the type we expect to place in the store. The name
- // will be the stringification of expectedGVK if provided, and the
- // stringification of expectedType otherwise. It is for display
- // only, and should not be used for parsing or comparison.
- typeDescription string
- // An example object of the type we expect to place in the store.
- // Only the type needs to be right, except that when that is
- // `unstructured.Unstructured` the object's `"apiVersion"` and
- // `"kind"` must also be right.
- expectedType reflect.Type
- // The GVK of the object we expect to place in the store if unstructured.
- expectedGVK *schema.GroupVersionKind
- // The destination to sync up with the watch source
- store ReflectorStore
- // listerWatcher is used to perform lists and watches.
- listerWatcher ListerWatcherWithContext
- // backoff manages backoff of ListWatch
- backoffManager wait.BackoffManager
- resyncPeriod time.Duration
- // minWatchTimeout defines the minimum timeout for watch requests.
- minWatchTimeout time.Duration
- // clock allows tests to manipulate time
- clock clock.Clock
- // paginatedResult defines whether pagination should be forced for list calls.
- // It is set based on the result of the initial list call.
- paginatedResult bool
- // lastSyncResourceVersion is the resource version token last
- // observed when doing a sync with the underlying store
- // it is thread safe, but not synchronized with the underlying store
- lastSyncResourceVersion string
- // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
- // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
- isLastSyncResourceVersionUnavailable bool
- // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
- lastSyncResourceVersionMutex sync.RWMutex
- // Called whenever the ListAndWatch drops the connection with an error.
- watchErrorHandler WatchErrorHandlerWithContext
- // WatchListPageSize is the requested chunk size of initial and resync watch lists.
- // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
- // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
- // it will turn off pagination to allow serving them from watch cache.
- // NOTE: It should be used carefully as paginated lists are always served directly from
- // etcd, which is significantly less efficient and may lead to serious performance and
- // scalability problems.
- WatchListPageSize int64
- // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
- ShouldResync func() bool
- // MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
- MaxInternalErrorRetryDuration time.Duration
- // useWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
- // Streaming has the primary advantage of using fewer server's resources to fetch data.
- //
- // The old behaviour establishes a LIST request which gets data in chunks.
- // Paginated list is less efficient and depending on the actual size of objects
- // might result in an increased memory consumption of the APIServer.
- //
- // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
- useWatchList bool
- }
- func (r *Reflector) Name() string {
- return r.name
- }
- func (r *Reflector) TypeDescription() string {
- return r.typeDescription
- }
- // ResourceVersionUpdater is an interface that allows store implementation to
- // track the current resource version of the reflector. This is especially
- // important if storage bookmarks are enabled.
- type ResourceVersionUpdater interface {
- // UpdateResourceVersion is called each time current resource version of the reflector
- // is updated.
- UpdateResourceVersion(resourceVersion string)
- }
- // The WatchErrorHandler is called whenever ListAndWatch drops the
- // connection with an error. After calling this handler, the informer
- // will backoff and retry.
- //
- // The default implementation looks at the error type and tries to log
- // the error message at an appropriate level.
- //
- // Implementations of this handler may display the error message in other
- // ways. Implementations should return quickly - any expensive processing
- // should be offloaded.
- type WatchErrorHandler func(r *Reflector, err error)
- // The WatchErrorHandler is called whenever ListAndWatch drops the
- // connection with an error. After calling this handler, the informer
- // will backoff and retry.
- //
- // The default implementation looks at the error type and tries to log
- // the error message at an appropriate level.
- //
- // Implementations of this handler may display the error message in other
- // ways. Implementations should return quickly - any expensive processing
- // should be offloaded.
- type WatchErrorHandlerWithContext func(ctx context.Context, r *Reflector, err error)
- // DefaultWatchErrorHandler is the default implementation of WatchErrorHandlerWithContext.
- func DefaultWatchErrorHandler(ctx context.Context, r *Reflector, err error) {
- switch {
- case isExpiredError(err):
- // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
- // has a semantic that it returns data at least as fresh as provided RV.
- // So first try to LIST with setting RV to resource version of last observed object.
- klog.FromContext(ctx).V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
- case err == io.EOF:
- // watch closed normally
- case err == io.ErrUnexpectedEOF:
- klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "reflector", r.name, "type", r.typeDescription, "err", err)
- default:
- utilruntime.HandleErrorWithContext(ctx, err, "Failed to watch", "reflector", r.name, "type", r.typeDescription)
- }
- }
- // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
- // The indexer is configured to key on namespace
- func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
- indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
- reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
- return indexer, reflector
- }
- // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
- // that is outside this package. See NewReflectorWithOptions for further information.
- func NewReflector(lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
- return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
- }
- // NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
- // information.
- func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
- return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
- }
- // ReflectorOptions configures a Reflector.
- type ReflectorOptions struct {
- // Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
- // in the call stack that is outside this package.
- Name string
- // TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
- // using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
- // "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
- // are set, the type description is the string encoding of those. Otherwise, the type description is set to the
- // go type of expectedType..
- TypeDescription string
- // ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
- // (do not resync).
- ResyncPeriod time.Duration
- // MinWatchTimeout, if non-zero, defines the minimum timeout for watch requests send to kube-apiserver.
- // However, values lower than 5m will not be honored to avoid negative performance impact on controlplane.
- MinWatchTimeout time.Duration
- // Clock allows tests to control time. If unset defaults to clock.RealClock{}
- Clock clock.Clock
- }
- // NewReflectorWithOptions creates a new Reflector object which will keep the
- // given store up to date with the server's contents for the given
- // resource. Reflector promises to only put things in the store that
- // have the type of expectedType, unless expectedType is nil. If
- // resyncPeriod is non-zero, then the reflector will periodically
- // consult its ShouldResync function to determine whether to invoke
- // the Store's Resync operation; `ShouldResync==nil` means always
- // "yes". This enables you to use reflectors to periodically process
- // everything as well as incrementally processing the things that
- // change.
- func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store ReflectorStore, options ReflectorOptions) *Reflector {
- reflectorClock := options.Clock
- if reflectorClock == nil {
- reflectorClock = clock.RealClock{}
- }
- minWatchTimeout := defaultMinWatchTimeout
- if options.MinWatchTimeout > defaultMinWatchTimeout {
- minWatchTimeout = options.MinWatchTimeout
- }
- r := &Reflector{
- name: options.Name,
- resyncPeriod: options.ResyncPeriod,
- minWatchTimeout: minWatchTimeout,
- typeDescription: options.TypeDescription,
- listerWatcher: ToListerWatcherWithContext(lw),
- store: store,
- // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
- // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
- // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
- backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
- clock: reflectorClock,
- watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
- expectedType: reflect.TypeOf(expectedType),
- }
- if r.name == "" {
- r.name = naming.GetNameFromCallsite(internalPackages...)
- }
- if r.typeDescription == "" {
- r.typeDescription = getTypeDescriptionFromObject(expectedType)
- }
- if r.expectedGVK == nil {
- r.expectedGVK = getExpectedGVKFromObject(expectedType)
- }
- r.useWatchList = clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient)
- if r.useWatchList && watchlist.DoesClientNotSupportWatchListSemantics(lw) {
- // Using klog.TODO() here because switching to a caller-provided contextual logger
- // would require an API change and updating all existing call sites.
- klog.TODO().V(2).Info(
- "The provided ListWatcher doesn't support WatchList semantics. The feature will be disabled. If you are using a custom client, check the documentation of watchlist.DoesClientNotSupportWatchListSemantics() method",
- "listWatcherType", fmt.Sprintf("%T", lw),
- "feature", clientfeatures.WatchListClient,
- )
- r.useWatchList = false
- }
- return r
- }
- func getTypeDescriptionFromObject(expectedType interface{}) string {
- if expectedType == nil {
- return defaultExpectedTypeName
- }
- reflectDescription := reflect.TypeOf(expectedType).String()
- obj, ok := expectedType.(*unstructured.Unstructured)
- if !ok {
- return reflectDescription
- }
- gvk := obj.GroupVersionKind()
- if gvk.Empty() {
- return reflectDescription
- }
- return gvk.String()
- }
- func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
- obj, ok := expectedType.(*unstructured.Unstructured)
- if !ok {
- return nil
- }
- gvk := obj.GroupVersionKind()
- if gvk.Empty() {
- return nil
- }
- return &gvk
- }
- // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
- // call chains to NewReflector, so they'd be low entropy names for reflectors
- var internalPackages = []string{"client-go/tools/cache/"}
- // Run repeatedly uses the reflector's ListAndWatch to fetch all the
- // objects and subsequent deltas.
- // Run will exit when stopCh is closed.
- //
- // Contextual logging: RunWithContext should be used instead of Run in code which supports contextual logging.
- func (r *Reflector) Run(stopCh <-chan struct{}) {
- r.RunWithContext(wait.ContextForChannel(stopCh))
- }
- // RunWithContext repeatedly uses the reflector's ListAndWatch to fetch all the
- // objects and subsequent deltas.
- // Run will exit when the context is canceled.
- func (r *Reflector) RunWithContext(ctx context.Context) {
- logger := klog.FromContext(ctx)
- logger.V(3).Info("Starting reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
- wait.BackoffUntil(func() {
- if err := r.ListAndWatchWithContext(ctx); err != nil {
- r.watchErrorHandler(ctx, r, err)
- }
- }, r.backoffManager, true, ctx.Done())
- logger.V(3).Info("Stopping reflector", "type", r.typeDescription, "resyncPeriod", r.resyncPeriod, "reflector", r.name)
- }
- var (
- // Used to indicate that watching stopped because of a signal from the stop
- // channel passed in from a client of the reflector.
- errorStopRequested = errors.New("stop requested")
- )
- // resyncChan returns a channel which will receive something when a resync is
- // required, and a cleanup function.
- func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
- if r.resyncPeriod == 0 {
- // nothing will ever be sent down this channel
- return nil, func() bool { return false }
- }
- // The cleanup function is required: imagine the scenario where watches
- // always fail so we end up listing frequently. Then, if we don't
- // manually stop the timer, we could end up with many timers active
- // concurrently.
- t := r.clock.NewTimer(r.resyncPeriod)
- return t.C(), t.Stop
- }
- // ListAndWatch first lists all items and get the resource version at the moment of call,
- // and then use the resource version to watch.
- // It returns error if ListAndWatch didn't even try to initialize watch.
- //
- // Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging.
- func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
- return r.ListAndWatchWithContext(wait.ContextForChannel(stopCh))
- }
- // ListAndWatchWithContext first lists all items and get the resource version at the moment of call,
- // and then use the resource version to watch.
- // It returns error if ListAndWatchWithContext didn't even try to initialize watch.
- func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
- logger := klog.FromContext(ctx)
- logger.V(3).Info("Listing and watching", "type", r.typeDescription, "reflector", r.name)
- var err error
- var w watch.Interface
- fallbackToList := !r.useWatchList
- defer func() {
- if w != nil {
- w.Stop()
- }
- }()
- if r.useWatchList {
- w, err = r.watchList(ctx)
- if w == nil && err == nil {
- // stopCh was closed
- return nil
- }
- if err != nil {
- logger.V(4).Info(
- "Data couldn't be fetched in watchlist mode. Falling back to regular list. This is expected if watchlist is not supported or disabled in kube-apiserver.",
- "err", err,
- )
- fallbackToList = true
- // ensure that we won't accidentally pass some garbage down the watch.
- w = nil
- }
- }
- if fallbackToList {
- err = r.list(ctx)
- if err != nil {
- return err
- }
- }
- logger.V(2).Info("Caches populated", "type", r.typeDescription, "reflector", r.name)
- return r.watchWithResync(ctx, w)
- }
- // startResync periodically calls r.store.Resync() method.
- // Note that this method is blocking and should be
- // called in a separate goroutine.
- func (r *Reflector) startResync(ctx context.Context, resyncerrc chan error) {
- logger := klog.FromContext(ctx)
- resyncCh, cleanup := r.resyncChan()
- defer func() {
- cleanup() // Call the last one written into cleanup
- }()
- for {
- select {
- case <-resyncCh:
- case <-ctx.Done():
- return
- }
- if r.ShouldResync == nil || r.ShouldResync() {
- logger.V(4).Info("Forcing resync", "reflector", r.name)
- if err := r.store.Resync(); err != nil {
- resyncerrc <- err
- return
- }
- }
- cleanup()
- resyncCh, cleanup = r.resyncChan()
- }
- }
- // watchWithResync runs watch with startResync in the background.
- func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) error {
- resyncerrc := make(chan error, 1)
- cancelCtx, cancel := context.WithCancel(ctx)
- // Waiting for completion of the goroutine is relevant for race detector.
- // Without this, there is a race between "this function returns + code
- // waiting for it" and "goroutine does something".
- var wg wait.Group
- defer func() {
- cancel()
- wg.Wait()
- }()
- wg.Start(func() {
- r.startResync(cancelCtx, resyncerrc)
- })
- return r.watch(ctx, w, resyncerrc)
- }
- // watch starts a watch request with the server, consumes watch events, and
- // restarts the watch until an exit scenario is reached.
- //
- // If a watch is provided, it will be used, otherwise another will be started.
- // If the watcher has started, it will always be stopped before returning.
- func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
- stopCh := ctx.Done()
- logger := klog.FromContext(ctx)
- var err error
- retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
- defer func() {
- if w != nil {
- w.Stop()
- }
- }()
- for {
- // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
- select {
- case <-stopCh:
- // we can only end up here when the stopCh
- // was closed after a successful watchlist or list request
- return nil
- default:
- }
- // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
- start := r.clock.Now()
- if w == nil {
- timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
- options := metav1.ListOptions{
- ResourceVersion: r.LastSyncResourceVersion(),
- // We want to avoid situations of hanging watchers. Stop any watchers that do not
- // receive any events within the timeout window.
- TimeoutSeconds: &timeoutSeconds,
- // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
- // Reflector doesn't assume bookmarks are returned at all (if the server do not support
- // watch bookmarks, it will ignore this field).
- AllowWatchBookmarks: true,
- }
- w, err = r.listerWatcher.WatchWithContext(ctx, options)
- if err != nil {
- if canRetry := isWatchErrorRetriable(err); canRetry {
- logger.V(4).Info("Watch failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
- select {
- case <-stopCh:
- return nil
- case <-r.backoffManager.Backoff().C():
- continue
- }
- }
- return err
- }
- }
- err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
- r.clock, resyncerrc)
- // handleWatch always stops the watcher. So we don't need to here.
- // Just set it to nil to trigger a retry on the next loop.
- w = nil
- retry.After(err)
- if err != nil {
- if !errors.Is(err, errorStopRequested) {
- switch {
- case isExpiredError(err):
- // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
- // has a semantic that it returns data at least as fresh as provided RV.
- // So first try to LIST with setting RV to resource version of last observed object.
- logger.V(4).Info("Watch closed", "reflector", r.name, "type", r.typeDescription, "err", err)
- case apierrors.IsTooManyRequests(err):
- logger.V(2).Info("Watch returned 429 - backing off", "reflector", r.name, "type", r.typeDescription)
- select {
- case <-stopCh:
- return nil
- case <-r.backoffManager.Backoff().C():
- continue
- }
- case apierrors.IsInternalError(err) && retry.ShouldRetry():
- logger.V(2).Info("Retrying watch after internal error", "reflector", r.name, "type", r.typeDescription, "err", err)
- continue
- default:
- logger.Info("Warning: watch ended with error", "reflector", r.name, "type", r.typeDescription, "err", err)
- }
- }
- return nil
- }
- }
- }
- // list simply lists all items and records a resource version obtained from the server at the moment of the call.
- // the resource version can be used for further progress notification (aka. watch).
- func (r *Reflector) list(ctx context.Context) error {
- var resourceVersion string
- options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
- initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
- defer initTrace.LogIfLong(10 * time.Second)
- var list runtime.Object
- var paginatedResult bool
- var err error
- listCh := make(chan struct{}, 1)
- panicCh := make(chan interface{}, 1)
- go func() {
- defer func() {
- if r := recover(); r != nil {
- panicCh <- r
- }
- }()
- // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
- // list request will return the full response.
- pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
- return r.listerWatcher.ListWithContext(ctx, opts)
- }))
- switch {
- case r.WatchListPageSize != 0:
- pager.PageSize = r.WatchListPageSize
- case r.paginatedResult:
- // We got a paginated result initially. Assume this resource and server honor
- // paging requests (i.e. watch cache is probably disabled) and leave the default
- // pager size set.
- case options.ResourceVersion != "" && options.ResourceVersion != "0":
- // User didn't explicitly request pagination.
- //
- // With ResourceVersion != "", we have a possibility to list from watch cache,
- // but we do that (for ResourceVersion != "0") only if Limit is unset.
- // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
- // switch off pagination to force listing from watch cache (if enabled).
- // With the existing semantic of RV (result is at least as fresh as provided RV),
- // this is correct and doesn't lead to going back in time.
- //
- // We also don't turn off pagination for ResourceVersion="0", since watch cache
- // is ignoring Limit in that case anyway, and if watch cache is not enabled
- // we don't introduce regression.
- pager.PageSize = 0
- }
- list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
- if isExpiredError(err) || isTooLargeResourceVersionError(err) {
- r.setIsLastSyncResourceVersionUnavailable(true)
- // Retry immediately if the resource version used to list is unavailable.
- // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
- // continuation pages, but the pager might not be enabled, the full list might fail because the
- // resource version it is listing at is expired or the cache may not yet be synced to the provided
- // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
- // the reflector makes forward progress.
- list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
- }
- close(listCh)
- }()
- select {
- case <-ctx.Done():
- return nil
- case r := <-panicCh:
- panic(r)
- case <-listCh:
- }
- initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
- if err != nil {
- return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
- }
- // We check if the list was paginated and if so set the paginatedResult based on that.
- // However, we want to do that only for the initial list (which is the only case
- // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
- // situations we may force listing directly from etcd (by setting ResourceVersion="")
- // which will return paginated result, even if watch cache is enabled. However, in
- // that case, we still want to prefer sending requests to watch cache if possible.
- //
- // Paginated result returned for request with ResourceVersion="0" mean that watch
- // cache is disabled and there are a lot of objects of a given type. In such case,
- // there is no need to prefer listing from watch cache.
- if options.ResourceVersion == "0" && paginatedResult {
- r.paginatedResult = true
- }
- r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
- listMetaInterface, err := meta.ListAccessor(list)
- if err != nil {
- return fmt.Errorf("unable to understand list result %#v: %v", list, err)
- }
- resourceVersion = listMetaInterface.GetResourceVersion()
- initTrace.Step("Resource version extracted")
- items, err := meta.ExtractListWithAlloc(list)
- if err != nil {
- return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
- }
- initTrace.Step("Objects extracted")
- if err := r.syncWith(items, resourceVersion); err != nil {
- return fmt.Errorf("unable to sync list result: %v", err)
- }
- initTrace.Step("SyncWith done")
- r.setLastSyncResourceVersion(resourceVersion)
- initTrace.Step("Resource version updated")
- return nil
- }
- // watchList establishes a stream to get a consistent snapshot of data
- // from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
- //
- // case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
- // Establishes a consistent stream with the server.
- // That means the returned data is consistent, as if, served directly from etcd via a quorum read.
- // It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
- // It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
- // After receiving a "Bookmark" event the reflector is considered to be synchronized.
- // It replaces its internal store with the collected items and
- // reuses the current watch requests for getting further events.
- //
- // case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
- // Establishes a stream with the server at the provided resource version.
- // To establish the initial state the server begins with synthetic "Added" events.
- // It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
- // After receiving a "Bookmark" event the reflector is considered to be synchronized.
- // It replaces its internal store with the collected items and
- // reuses the current watch requests for getting further events.
- func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
- stopCh := ctx.Done()
- logger := klog.FromContext(ctx)
- var w watch.Interface
- var err error
- var temporaryStore Store
- var resourceVersion string
- // TODO(#115478): see if this function could be turned
- // into a method and see if error handling
- // could be unified with the r.watch method
- isErrorRetriableWithSideEffectsFn := func(err error) bool {
- if canRetry := isWatchErrorRetriable(err); canRetry {
- logger.V(2).Info("watch-list failed - backing off", "reflector", r.name, "type", r.typeDescription, "err", err)
- <-r.backoffManager.Backoff().C()
- return true
- }
- if isExpiredError(err) || isTooLargeResourceVersionError(err) {
- // we tried to re-establish a watch request but the provided RV
- // has either expired or it is greater than the server knows about.
- // In that case we reset the RV and
- // try to get a consistent snapshot from the watch cache (case 1)
- r.setIsLastSyncResourceVersionUnavailable(true)
- return true
- }
- return false
- }
- var transformer TransformFunc
- storeOpts := []StoreOption{}
- if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
- transformer = tr.Transformer()
- storeOpts = append(storeOpts, WithTransformer(transformer))
- }
- initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
- defer initTrace.LogIfLong(10 * time.Second)
- for {
- select {
- case <-stopCh:
- return nil, nil
- default:
- }
- resourceVersion = ""
- lastKnownRV := r.rewatchResourceVersion()
- temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc, storeOpts...)
- // TODO(#115478): large "list", slow clients, slow network, p&f
- // might slow down streaming and eventually fail.
- // maybe in such a case we should retry with an increased timeout?
- timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
- options := metav1.ListOptions{
- ResourceVersion: lastKnownRV,
- AllowWatchBookmarks: true,
- SendInitialEvents: ptr.To(true),
- ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
- TimeoutSeconds: &timeoutSeconds,
- }
- start := r.clock.Now()
- w, err = r.listerWatcher.WatchWithContext(ctx, options)
- if err != nil {
- if isErrorRetriableWithSideEffectsFn(err) {
- continue
- }
- return nil, err
- }
- watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
- func(rv string) { resourceVersion = rv },
- r.clock, make(chan error))
- if err != nil {
- w.Stop() // stop and retry with clean state
- if errors.Is(err, errorStopRequested) {
- return nil, nil
- }
- if isErrorRetriableWithSideEffectsFn(err) {
- continue
- }
- return nil, err
- }
- if watchListBookmarkReceived {
- break
- }
- }
- // We successfully got initial state from watch-list confirmed by the
- // "k8s.io/initial-events-end" bookmark.
- initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
- r.setIsLastSyncResourceVersionUnavailable(false)
- // we utilize the temporaryStore to ensure independence from the current store implementation.
- // as of today, the store is implemented as a queue and will be drained by the higher-level
- // component as soon as it finishes replacing the content.
- checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List)
- if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
- return nil, fmt.Errorf("unable to sync watch-list result: %w", err)
- }
- initTrace.Step("SyncWith done")
- r.setLastSyncResourceVersion(resourceVersion)
- return w, nil
- }
- // syncWith replaces the store's items with the given list.
- func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
- found := make([]interface{}, 0, len(items))
- for _, item := range items {
- found = append(found, item)
- }
- return r.store.Replace(found, resourceVersion)
- }
- // handleListWatch consumes events from w, updates the Store, and records the
- // last seen ResourceVersion, to allow continuing from that ResourceVersion on
- // retry. If successful, the watcher will be left open after receiving the
- // initial set of objects, to allow watching for future events.
- func handleListWatch(
- ctx context.Context,
- start time.Time,
- w watch.Interface,
- store Store,
- expectedType reflect.Type,
- expectedGVK *schema.GroupVersionKind,
- name string,
- expectedTypeName string,
- setLastSyncResourceVersion func(string),
- clock clock.Clock,
- errCh chan error,
- ) (bool, error) {
- exitOnWatchListBookmarkReceived := true
- return handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
- setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
- }
- // handleListWatch consumes events from w, updates the Store, and records the
- // last seen ResourceVersion, to allow continuing from that ResourceVersion on
- // retry. The watcher will always be stopped on exit.
- func handleWatch(
- ctx context.Context,
- start time.Time,
- w watch.Interface,
- store ReflectorStore,
- expectedType reflect.Type,
- expectedGVK *schema.GroupVersionKind,
- name string,
- expectedTypeName string,
- setLastSyncResourceVersion func(string),
- clock clock.Clock,
- errCh chan error,
- ) error {
- exitOnWatchListBookmarkReceived := false
- _, err := handleAnyWatch(ctx, start, w, store, expectedType, expectedGVK, name, expectedTypeName,
- setLastSyncResourceVersion, exitOnWatchListBookmarkReceived, clock, errCh)
- return err
- }
- // handleAnyWatch consumes events from w, updates the Store, and records the last
- // seen ResourceVersion, to allow continuing from that ResourceVersion on retry.
- // If exitOnWatchListBookmarkReceived is true, the watch events will be consumed
- // until a bookmark event is received with the WatchList annotation present.
- // Returns true (watchListBookmarkReceived) if the WatchList bookmark was
- // received, even if exitOnWatchListBookmarkReceived is false.
- // The watcher will always be stopped, unless exitOnWatchListBookmarkReceived is
- // true and watchListBookmarkReceived is true. This allows the same watch stream
- // to be re-used by the caller to continue watching for new events.
- func handleAnyWatch(
- ctx context.Context,
- start time.Time,
- w watch.Interface,
- store ReflectorStore,
- expectedType reflect.Type,
- expectedGVK *schema.GroupVersionKind,
- name string,
- expectedTypeName string,
- setLastSyncResourceVersion func(string),
- exitOnWatchListBookmarkReceived bool,
- clock clock.Clock,
- errCh chan error,
- ) (bool, error) {
- watchListBookmarkReceived := false
- eventCount := 0
- logger := klog.FromContext(ctx)
- initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived)
- defer initialEventsEndBookmarkWarningTicker.Stop()
- stopWatcher := true
- defer func() {
- if stopWatcher {
- w.Stop()
- }
- }()
- loop:
- for {
- select {
- case <-ctx.Done():
- return watchListBookmarkReceived, errorStopRequested
- case err := <-errCh:
- return watchListBookmarkReceived, err
- case event, ok := <-w.ResultChan():
- if !ok {
- break loop
- }
- if event.Type == watch.Error {
- return watchListBookmarkReceived, apierrors.FromObject(event.Object)
- }
- if expectedType != nil {
- if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
- utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object type", "reflector", name, "expectedType", e, "actualType", a)
- continue
- }
- }
- if expectedGVK != nil {
- if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
- utilruntime.HandleErrorWithContext(ctx, nil, "Unexpected watch event object gvk", "reflector", name, "expectedGVK", e, "actualGVK", a)
- continue
- }
- }
- // For now, let’s block unsupported Table
- // resources for watchlist only
- // see #132926 for more info
- if exitOnWatchListBookmarkReceived {
- if unsupportedGVK := isUnsupportedTableObject(event.Object); unsupportedGVK {
- utilruntime.HandleErrorWithContext(ctx, nil, "Unsupported watch event object gvk", "reflector", name, "actualGVK", event.Object.GetObjectKind().GroupVersionKind())
- continue
- }
- }
- meta, err := meta.Accessor(event.Object)
- if err != nil {
- utilruntime.HandleErrorWithContext(ctx, err, "Unable to understand watch event", "reflector", name, "event", event)
- continue
- }
- resourceVersion := meta.GetResourceVersion()
- switch event.Type {
- case watch.Added:
- err := store.Add(event.Object)
- if err != nil {
- utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object)
- }
- case watch.Modified:
- err := store.Update(event.Object)
- if err != nil {
- utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object)
- }
- case watch.Deleted:
- // TODO: Will any consumers need access to the "last known
- // state", which is passed in event.Object? If so, may need
- // to change this.
- err := store.Delete(event.Object)
- if err != nil {
- utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object)
- }
- case watch.Bookmark:
- // A `Bookmark` means watch has synced here, just update the resourceVersion
- if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" {
- watchListBookmarkReceived = true
- }
- default:
- utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event)
- }
- setLastSyncResourceVersion(resourceVersion)
- if rvu, ok := store.(ResourceVersionUpdater); ok {
- rvu.UpdateResourceVersion(resourceVersion)
- }
- eventCount++
- if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
- stopWatcher = false
- watchDuration := clock.Since(start)
- klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration)
- return watchListBookmarkReceived, nil
- }
- initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now())
- case <-initialEventsEndBookmarkWarningTicker.C():
- initialEventsEndBookmarkWarningTicker.warnIfExpired()
- }
- }
- watchDuration := clock.Since(start)
- if watchDuration < 1*time.Second && eventCount == 0 {
- return watchListBookmarkReceived, &VeryShortWatchError{Name: name}
- }
- klog.FromContext(ctx).V(4).Info("Watch close", "reflector", name, "type", expectedTypeName, "totalItems", eventCount)
- return watchListBookmarkReceived, nil
- }
- // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
- // The value returned is not synchronized with access to the underlying store and is not thread-safe
- func (r *Reflector) LastSyncResourceVersion() string {
- r.lastSyncResourceVersionMutex.RLock()
- defer r.lastSyncResourceVersionMutex.RUnlock()
- return r.lastSyncResourceVersion
- }
- func (r *Reflector) setLastSyncResourceVersion(v string) {
- r.lastSyncResourceVersionMutex.Lock()
- defer r.lastSyncResourceVersionMutex.Unlock()
- r.lastSyncResourceVersion = v
- }
- // relistResourceVersion determines the resource version the reflector should list or relist from.
- // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
- // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
- // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
- // etcd via a quorum read.
- func (r *Reflector) relistResourceVersion() string {
- r.lastSyncResourceVersionMutex.RLock()
- defer r.lastSyncResourceVersionMutex.RUnlock()
- if r.isLastSyncResourceVersionUnavailable {
- // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
- // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
- // to the latest available ResourceVersion, using a consistent read from etcd.
- return ""
- }
- if r.lastSyncResourceVersion == "" {
- // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
- // be served from the watch cache if it is enabled.
- return "0"
- }
- return r.lastSyncResourceVersion
- }
- // rewatchResourceVersion determines the resource version the reflector should start streaming from.
- func (r *Reflector) rewatchResourceVersion() string {
- r.lastSyncResourceVersionMutex.RLock()
- defer r.lastSyncResourceVersionMutex.RUnlock()
- if r.isLastSyncResourceVersionUnavailable {
- // initial stream should return data at the most recent resource version.
- // the returned data must be consistent i.e. as if served from etcd via a quorum read
- return ""
- }
- return r.lastSyncResourceVersion
- }
- // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
- // "expired" or "too large resource version" error.
- func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
- r.lastSyncResourceVersionMutex.Lock()
- defer r.lastSyncResourceVersionMutex.Unlock()
- r.isLastSyncResourceVersionUnavailable = isUnavailable
- }
- func isExpiredError(err error) bool {
- // In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
- // apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
- // and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
- // check when we fully drop support for Kubernetes 1.17 servers from reflectors.
- return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
- }
- func isTooLargeResourceVersionError(err error) bool {
- if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
- return true
- }
- // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
- // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
- // version is larger than the largest currently available resource version. To ensure backward
- // compatibility with these server versions we also need to detect the error based on the content
- // of the error message field.
- if !apierrors.IsTimeout(err) {
- return false
- }
- apierr, ok := err.(apierrors.APIStatus)
- if !ok || apierr == nil || apierr.Status().Details == nil {
- return false
- }
- for _, cause := range apierr.Status().Details.Causes {
- // Matches the message returned by api server 1.17.0-1.18.5 for this error condition
- if cause.Message == "Too large resource version" {
- return true
- }
- }
- // Matches the message returned by api server before 1.17.0
- if strings.Contains(apierr.Status().Message, "Too large resource version") {
- return true
- }
- return false
- }
- // isWatchErrorRetriable determines if it is safe to retry
- // a watch error retrieved from the server.
- func isWatchErrorRetriable(err error) bool {
- // If this is "connection refused" error, it means that most likely apiserver is not responsive.
- // It doesn't make sense to re-list all objects because most likely we will be able to restart
- // watch where we ended.
- // If that's the case begin exponentially backing off and resend watch request.
- // Do the same for "429" errors.
- if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
- return true
- }
- return false
- }
- // initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
- // which marks the end of the watch stream, has not been received within the defined tick interval.
- //
- // Note:
- // The methods exposed by this type are not thread-safe.
- type initialEventsEndBookmarkTicker struct {
- clock.Ticker
- clock clock.Clock
- name string
- logger klog.Logger
- watchStart time.Time
- tickInterval time.Duration
- lastEventObserveTime time.Time
- }
- // newInitialEventsEndBookmarkTicker returns a noop ticker if exitOnInitialEventsEndBookmarkRequested is false.
- // Otherwise, it returns a ticker that exposes a method producing a warning if the bookmark event,
- // which marks the end of the watch stream, has not been received within the defined tick interval.
- //
- // Note that the caller controls whether to call t.C() and t.Stop().
- //
- // In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method.
- func newInitialEventsEndBookmarkTicker(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
- return newInitialEventsEndBookmarkTickerInternal(logger, name, c, watchStart, 10*time.Second, exitOnWatchListBookmarkReceived)
- }
- func newInitialEventsEndBookmarkTickerInternal(logger klog.Logger, name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnWatchListBookmarkReceived bool) *initialEventsEndBookmarkTicker {
- clockWithTicker, ok := c.(clock.WithTicker)
- if !ok || !exitOnWatchListBookmarkReceived {
- if exitOnWatchListBookmarkReceived {
- logger.Info("Warning: clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested")
- }
- return &initialEventsEndBookmarkTicker{
- Ticker: &noopTicker{},
- }
- }
- return &initialEventsEndBookmarkTicker{
- Ticker: clockWithTicker.NewTicker(tickInterval),
- clock: c,
- name: name,
- logger: logger,
- watchStart: watchStart,
- tickInterval: tickInterval,
- }
- }
- func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObserveTime time.Time) {
- t.lastEventObserveTime = lastEventObserveTime
- }
- func (t *initialEventsEndBookmarkTicker) warnIfExpired() {
- if err := t.produceWarningIfExpired(); err != nil {
- t.logger.Info("Warning: event bookmark expired", "err", err)
- }
- }
- // produceWarningIfExpired returns an error that represents a warning when
- // the time elapsed since the last received event exceeds the tickInterval.
- //
- // Note that this method should be called when t.C() yields a value.
- func (t *initialEventsEndBookmarkTicker) produceWarningIfExpired() error {
- if _, ok := t.Ticker.(*noopTicker); ok {
- return nil /*noop ticker*/
- }
- if t.lastEventObserveTime.IsZero() {
- return fmt.Errorf("%s: awaiting required bookmark event for initial events stream, no events received for %v", t.name, t.clock.Since(t.watchStart))
- }
- elapsedTime := t.clock.Now().Sub(t.lastEventObserveTime)
- hasBookmarkTimerExpired := elapsedTime >= t.tickInterval
- if !hasBookmarkTimerExpired {
- return nil
- }
- return fmt.Errorf("%s: hasn't received required bookmark event marking the end of initial events stream, received last event %v ago", t.name, elapsedTime)
- }
- var _ clock.Ticker = &noopTicker{}
- // TODO(#115478): move to k8s/utils repo
- type noopTicker struct{}
- func (t *noopTicker) C() <-chan time.Time { return nil }
- func (t *noopTicker) Stop() {}
- // VeryShortWatchError is returned when the watch result channel is closed
- // within one second, without having sent any events.
- type VeryShortWatchError struct {
- // Name of the Reflector
- Name string
- }
- // Error implements the error interface
- func (e *VeryShortWatchError) Error() string {
- return fmt.Sprintf("very short watch: %s: Unexpected watch close - "+
- "watch lasted less than a second and no items received", e.Name)
- }
- var unsupportedTableGVK = map[schema.GroupVersionKind]bool{
- metav1beta1.SchemeGroupVersion.WithKind("Table"): true,
- metav1.SchemeGroupVersion.WithKind("Table"): true,
- }
- // isUnsupportedTableObject checks whether the given runtime.Object
- // is a "Table" object that belongs to a set of well-known unsupported GroupVersionKinds.
- func isUnsupportedTableObject(rawObject runtime.Object) bool {
- unstructuredObj, ok := rawObject.(*unstructured.Unstructured)
- if !ok {
- return false
- }
- if unstructuredObj.GetKind() != "Table" {
- return false
- }
- return unsupportedTableGVK[rawObject.GetObjectKind().GroupVersionKind()]
- }
|