reflector.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "math/rand"
  20. "reflect"
  21. "sync"
  22. "time"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. "k8s.io/apimachinery/pkg/api/meta"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/runtime/schema"
  29. "k8s.io/apimachinery/pkg/util/clock"
  30. "k8s.io/apimachinery/pkg/util/naming"
  31. utilnet "k8s.io/apimachinery/pkg/util/net"
  32. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  33. "k8s.io/apimachinery/pkg/util/wait"
  34. "k8s.io/apimachinery/pkg/watch"
  35. "k8s.io/client-go/tools/pager"
  36. "k8s.io/klog/v2"
  37. "k8s.io/utils/trace"
  38. )
  39. const defaultExpectedTypeName = "<unspecified>"
  40. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
  41. type Reflector struct {
  42. // name identifies this reflector. By default it will be a file:line if possible.
  43. name string
  44. // The name of the type we expect to place in the store. The name
  45. // will be the stringification of expectedGVK if provided, and the
  46. // stringification of expectedType otherwise. It is for display
  47. // only, and should not be used for parsing or comparison.
  48. expectedTypeName string
  49. // An example object of the type we expect to place in the store.
  50. // Only the type needs to be right, except that when that is
  51. // `unstructured.Unstructured` the object's `"apiVersion"` and
  52. // `"kind"` must also be right.
  53. expectedType reflect.Type
  54. // The GVK of the object we expect to place in the store if unstructured.
  55. expectedGVK *schema.GroupVersionKind
  56. // The destination to sync up with the watch source
  57. store Store
  58. // listerWatcher is used to perform lists and watches.
  59. listerWatcher ListerWatcher
  60. // backoff manages backoff of ListWatch
  61. backoffManager wait.BackoffManager
  62. // initConnBackoffManager manages backoff the initial connection with the Watch calll of ListAndWatch.
  63. initConnBackoffManager wait.BackoffManager
  64. resyncPeriod time.Duration
  65. // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
  66. ShouldResync func() bool
  67. // clock allows tests to manipulate time
  68. clock clock.Clock
  69. // paginatedResult defines whether pagination should be forced for list calls.
  70. // It is set based on the result of the initial list call.
  71. paginatedResult bool
  72. // lastSyncResourceVersion is the resource version token last
  73. // observed when doing a sync with the underlying store
  74. // it is thread safe, but not synchronized with the underlying store
  75. lastSyncResourceVersion string
  76. // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
  77. // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
  78. isLastSyncResourceVersionUnavailable bool
  79. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
  80. lastSyncResourceVersionMutex sync.RWMutex
  81. // WatchListPageSize is the requested chunk size of initial and resync watch lists.
  82. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
  83. // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
  84. // it will turn off pagination to allow serving them from watch cache.
  85. // NOTE: It should be used carefully as paginated lists are always served directly from
  86. // etcd, which is significantly less efficient and may lead to serious performance and
  87. // scalability problems.
  88. WatchListPageSize int64
  89. // Called whenever the ListAndWatch drops the connection with an error.
  90. watchErrorHandler WatchErrorHandler
  91. }
  92. // ResourceVersionUpdater is an interface that allows store implementation to
  93. // track the current resource version of the reflector. This is especially
  94. // important if storage bookmarks are enabled.
  95. type ResourceVersionUpdater interface {
  96. // UpdateResourceVersion is called each time current resource version of the reflector
  97. // is updated.
  98. UpdateResourceVersion(resourceVersion string)
  99. }
  100. // The WatchErrorHandler is called whenever ListAndWatch drops the
  101. // connection with an error. After calling this handler, the informer
  102. // will backoff and retry.
  103. //
  104. // The default implementation looks at the error type and tries to log
  105. // the error message at an appropriate level.
  106. //
  107. // Implementations of this handler may display the error message in other
  108. // ways. Implementations should return quickly - any expensive processing
  109. // should be offloaded.
  110. type WatchErrorHandler func(r *Reflector, err error)
  111. // DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
  112. func DefaultWatchErrorHandler(r *Reflector, err error) {
  113. switch {
  114. case isExpiredError(err):
  115. // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
  116. // has a semantic that it returns data at least as fresh as provided RV.
  117. // So first try to LIST with setting RV to resource version of last observed object.
  118. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
  119. case err == io.EOF:
  120. // watch closed normally
  121. case err == io.ErrUnexpectedEOF:
  122. klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
  123. default:
  124. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
  125. }
  126. }
  127. var (
  128. // We try to spread the load on apiserver by setting timeouts for
  129. // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
  130. minWatchTimeout = 5 * time.Minute
  131. )
  132. // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
  133. // The indexer is configured to key on namespace
  134. func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
  135. indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
  136. reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
  137. return indexer, reflector
  138. }
  139. // NewReflector creates a new Reflector object which will keep the
  140. // given store up to date with the server's contents for the given
  141. // resource. Reflector promises to only put things in the store that
  142. // have the type of expectedType, unless expectedType is nil. If
  143. // resyncPeriod is non-zero, then the reflector will periodically
  144. // consult its ShouldResync function to determine whether to invoke
  145. // the Store's Resync operation; `ShouldResync==nil` means always
  146. // "yes". This enables you to use reflectors to periodically process
  147. // everything as well as incrementally processing the things that
  148. // change.
  149. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  150. return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
  151. }
  152. // NewNamedReflector same as NewReflector, but with a specified name for logging
  153. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  154. realClock := &clock.RealClock{}
  155. r := &Reflector{
  156. name: name,
  157. listerWatcher: lw,
  158. store: store,
  159. // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
  160. // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
  161. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
  162. backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
  163. initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
  164. resyncPeriod: resyncPeriod,
  165. clock: realClock,
  166. watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
  167. }
  168. r.setExpectedType(expectedType)
  169. return r
  170. }
  171. func (r *Reflector) setExpectedType(expectedType interface{}) {
  172. r.expectedType = reflect.TypeOf(expectedType)
  173. if r.expectedType == nil {
  174. r.expectedTypeName = defaultExpectedTypeName
  175. return
  176. }
  177. r.expectedTypeName = r.expectedType.String()
  178. if obj, ok := expectedType.(*unstructured.Unstructured); ok {
  179. // Use gvk to check that watch event objects are of the desired type.
  180. gvk := obj.GroupVersionKind()
  181. if gvk.Empty() {
  182. klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
  183. return
  184. }
  185. r.expectedGVK = &gvk
  186. r.expectedTypeName = gvk.String()
  187. }
  188. }
  189. // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
  190. // call chains to NewReflector, so they'd be low entropy names for reflectors
  191. var internalPackages = []string{"client-go/tools/cache/"}
  192. // Run repeatedly uses the reflector's ListAndWatch to fetch all the
  193. // objects and subsequent deltas.
  194. // Run will exit when stopCh is closed.
  195. func (r *Reflector) Run(stopCh <-chan struct{}) {
  196. klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
  197. wait.BackoffUntil(func() {
  198. if err := r.ListAndWatch(stopCh); err != nil {
  199. r.watchErrorHandler(r, err)
  200. }
  201. }, r.backoffManager, true, stopCh)
  202. klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
  203. }
  204. var (
  205. // nothing will ever be sent down this channel
  206. neverExitWatch <-chan time.Time = make(chan time.Time)
  207. // Used to indicate that watching stopped because of a signal from the stop
  208. // channel passed in from a client of the reflector.
  209. errorStopRequested = errors.New("Stop requested")
  210. )
  211. // resyncChan returns a channel which will receive something when a resync is
  212. // required, and a cleanup function.
  213. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
  214. if r.resyncPeriod == 0 {
  215. return neverExitWatch, func() bool { return false }
  216. }
  217. // The cleanup function is required: imagine the scenario where watches
  218. // always fail so we end up listing frequently. Then, if we don't
  219. // manually stop the timer, we could end up with many timers active
  220. // concurrently.
  221. t := r.clock.NewTimer(r.resyncPeriod)
  222. return t.C(), t.Stop
  223. }
  224. // ListAndWatch first lists all items and get the resource version at the moment of call,
  225. // and then use the resource version to watch.
  226. // It returns error if ListAndWatch didn't even try to initialize watch.
  227. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  228. klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
  229. var resourceVersion string
  230. options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
  231. if err := func() error {
  232. initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
  233. defer initTrace.LogIfLong(10 * time.Second)
  234. var list runtime.Object
  235. var paginatedResult bool
  236. var err error
  237. listCh := make(chan struct{}, 1)
  238. panicCh := make(chan interface{}, 1)
  239. go func() {
  240. defer func() {
  241. if r := recover(); r != nil {
  242. panicCh <- r
  243. }
  244. }()
  245. // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
  246. // list request will return the full response.
  247. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
  248. return r.listerWatcher.List(opts)
  249. }))
  250. switch {
  251. case r.WatchListPageSize != 0:
  252. pager.PageSize = r.WatchListPageSize
  253. case r.paginatedResult:
  254. // We got a paginated result initially. Assume this resource and server honor
  255. // paging requests (i.e. watch cache is probably disabled) and leave the default
  256. // pager size set.
  257. case options.ResourceVersion != "" && options.ResourceVersion != "0":
  258. // User didn't explicitly request pagination.
  259. //
  260. // With ResourceVersion != "", we have a possibility to list from watch cache,
  261. // but we do that (for ResourceVersion != "0") only if Limit is unset.
  262. // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
  263. // switch off pagination to force listing from watch cache (if enabled).
  264. // With the existing semantic of RV (result is at least as fresh as provided RV),
  265. // this is correct and doesn't lead to going back in time.
  266. //
  267. // We also don't turn off pagination for ResourceVersion="0", since watch cache
  268. // is ignoring Limit in that case anyway, and if watch cache is not enabled
  269. // we don't introduce regression.
  270. pager.PageSize = 0
  271. }
  272. list, paginatedResult, err = pager.List(context.Background(), options)
  273. if isExpiredError(err) || isTooLargeResourceVersionError(err) {
  274. r.setIsLastSyncResourceVersionUnavailable(true)
  275. // Retry immediately if the resource version used to list is unavailable.
  276. // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
  277. // continuation pages, but the pager might not be enabled, the full list might fail because the
  278. // resource version it is listing at is expired or the cache may not yet be synced to the provided
  279. // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
  280. // the reflector makes forward progress.
  281. list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
  282. }
  283. close(listCh)
  284. }()
  285. select {
  286. case <-stopCh:
  287. return nil
  288. case r := <-panicCh:
  289. panic(r)
  290. case <-listCh:
  291. }
  292. if err != nil {
  293. return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
  294. }
  295. // We check if the list was paginated and if so set the paginatedResult based on that.
  296. // However, we want to do that only for the initial list (which is the only case
  297. // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
  298. // situations we may force listing directly from etcd (by setting ResourceVersion="")
  299. // which will return paginated result, even if watch cache is enabled. However, in
  300. // that case, we still want to prefer sending requests to watch cache if possible.
  301. //
  302. // Paginated result returned for request with ResourceVersion="0" mean that watch
  303. // cache is disabled and there are a lot of objects of a given type. In such case,
  304. // there is no need to prefer listing from watch cache.
  305. if options.ResourceVersion == "0" && paginatedResult {
  306. r.paginatedResult = true
  307. }
  308. r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
  309. initTrace.Step("Objects listed")
  310. listMetaInterface, err := meta.ListAccessor(list)
  311. if err != nil {
  312. return fmt.Errorf("unable to understand list result %#v: %v", list, err)
  313. }
  314. resourceVersion = listMetaInterface.GetResourceVersion()
  315. initTrace.Step("Resource version extracted")
  316. items, err := meta.ExtractList(list)
  317. if err != nil {
  318. return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
  319. }
  320. initTrace.Step("Objects extracted")
  321. if err := r.syncWith(items, resourceVersion); err != nil {
  322. return fmt.Errorf("unable to sync list result: %v", err)
  323. }
  324. initTrace.Step("SyncWith done")
  325. r.setLastSyncResourceVersion(resourceVersion)
  326. initTrace.Step("Resource version updated")
  327. return nil
  328. }(); err != nil {
  329. return err
  330. }
  331. resyncerrc := make(chan error, 1)
  332. cancelCh := make(chan struct{})
  333. defer close(cancelCh)
  334. go func() {
  335. resyncCh, cleanup := r.resyncChan()
  336. defer func() {
  337. cleanup() // Call the last one written into cleanup
  338. }()
  339. for {
  340. select {
  341. case <-resyncCh:
  342. case <-stopCh:
  343. return
  344. case <-cancelCh:
  345. return
  346. }
  347. if r.ShouldResync == nil || r.ShouldResync() {
  348. klog.V(4).Infof("%s: forcing resync", r.name)
  349. if err := r.store.Resync(); err != nil {
  350. resyncerrc <- err
  351. return
  352. }
  353. }
  354. cleanup()
  355. resyncCh, cleanup = r.resyncChan()
  356. }
  357. }()
  358. for {
  359. // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
  360. select {
  361. case <-stopCh:
  362. return nil
  363. default:
  364. }
  365. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  366. options = metav1.ListOptions{
  367. ResourceVersion: resourceVersion,
  368. // We want to avoid situations of hanging watchers. Stop any wachers that do not
  369. // receive any events within the timeout window.
  370. TimeoutSeconds: &timeoutSeconds,
  371. // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
  372. // Reflector doesn't assume bookmarks are returned at all (if the server do not support
  373. // watch bookmarks, it will ignore this field).
  374. AllowWatchBookmarks: true,
  375. }
  376. // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
  377. start := r.clock.Now()
  378. w, err := r.listerWatcher.Watch(options)
  379. if err != nil {
  380. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
  381. // It doesn't make sense to re-list all objects because most likely we will be able to restart
  382. // watch where we ended.
  383. // If that's the case begin exponentially backing off and resend watch request.
  384. if utilnet.IsConnectionRefused(err) {
  385. <-r.initConnBackoffManager.Backoff().C()
  386. continue
  387. }
  388. return err
  389. }
  390. if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
  391. if err != errorStopRequested {
  392. switch {
  393. case isExpiredError(err):
  394. // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
  395. // has a semantic that it returns data at least as fresh as provided RV.
  396. // So first try to LIST with setting RV to resource version of last observed object.
  397. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
  398. default:
  399. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
  400. }
  401. }
  402. return nil
  403. }
  404. }
  405. }
  406. // syncWith replaces the store's items with the given list.
  407. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  408. found := make([]interface{}, 0, len(items))
  409. for _, item := range items {
  410. found = append(found, item)
  411. }
  412. return r.store.Replace(found, resourceVersion)
  413. }
  414. // watchHandler watches w and keeps *resourceVersion up to date.
  415. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  416. eventCount := 0
  417. // Stopping the watcher should be idempotent and if we return from this function there's no way
  418. // we're coming back in with the same watch interface.
  419. defer w.Stop()
  420. loop:
  421. for {
  422. select {
  423. case <-stopCh:
  424. return errorStopRequested
  425. case err := <-errc:
  426. return err
  427. case event, ok := <-w.ResultChan():
  428. if !ok {
  429. break loop
  430. }
  431. if event.Type == watch.Error {
  432. return apierrors.FromObject(event.Object)
  433. }
  434. if r.expectedType != nil {
  435. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
  436. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
  437. continue
  438. }
  439. }
  440. if r.expectedGVK != nil {
  441. if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
  442. utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
  443. continue
  444. }
  445. }
  446. meta, err := meta.Accessor(event.Object)
  447. if err != nil {
  448. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  449. continue
  450. }
  451. newResourceVersion := meta.GetResourceVersion()
  452. switch event.Type {
  453. case watch.Added:
  454. err := r.store.Add(event.Object)
  455. if err != nil {
  456. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
  457. }
  458. case watch.Modified:
  459. err := r.store.Update(event.Object)
  460. if err != nil {
  461. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
  462. }
  463. case watch.Deleted:
  464. // TODO: Will any consumers need access to the "last known
  465. // state", which is passed in event.Object? If so, may need
  466. // to change this.
  467. err := r.store.Delete(event.Object)
  468. if err != nil {
  469. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
  470. }
  471. case watch.Bookmark:
  472. // A `Bookmark` means watch has synced here, just update the resourceVersion
  473. default:
  474. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  475. }
  476. *resourceVersion = newResourceVersion
  477. r.setLastSyncResourceVersion(newResourceVersion)
  478. if rvu, ok := r.store.(ResourceVersionUpdater); ok {
  479. rvu.UpdateResourceVersion(newResourceVersion)
  480. }
  481. eventCount++
  482. }
  483. }
  484. watchDuration := r.clock.Since(start)
  485. if watchDuration < 1*time.Second && eventCount == 0 {
  486. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
  487. }
  488. klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
  489. return nil
  490. }
  491. // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
  492. // The value returned is not synchronized with access to the underlying store and is not thread-safe
  493. func (r *Reflector) LastSyncResourceVersion() string {
  494. r.lastSyncResourceVersionMutex.RLock()
  495. defer r.lastSyncResourceVersionMutex.RUnlock()
  496. return r.lastSyncResourceVersion
  497. }
  498. func (r *Reflector) setLastSyncResourceVersion(v string) {
  499. r.lastSyncResourceVersionMutex.Lock()
  500. defer r.lastSyncResourceVersionMutex.Unlock()
  501. r.lastSyncResourceVersion = v
  502. }
  503. // relistResourceVersion determines the resource version the reflector should list or relist from.
  504. // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
  505. // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
  506. // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
  507. // etcd via a quorum read.
  508. func (r *Reflector) relistResourceVersion() string {
  509. r.lastSyncResourceVersionMutex.RLock()
  510. defer r.lastSyncResourceVersionMutex.RUnlock()
  511. if r.isLastSyncResourceVersionUnavailable {
  512. // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
  513. // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
  514. // to the latest available ResourceVersion, using a consistent read from etcd.
  515. return ""
  516. }
  517. if r.lastSyncResourceVersion == "" {
  518. // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
  519. // be served from the watch cache if it is enabled.
  520. return "0"
  521. }
  522. return r.lastSyncResourceVersion
  523. }
  524. // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
  525. // "expired" or "too large resource version" error.
  526. func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
  527. r.lastSyncResourceVersionMutex.Lock()
  528. defer r.lastSyncResourceVersionMutex.Unlock()
  529. r.isLastSyncResourceVersionUnavailable = isUnavailable
  530. }
  531. func isExpiredError(err error) bool {
  532. // In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
  533. // apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
  534. // and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
  535. // check when we fully drop support for Kubernetes 1.17 servers from reflectors.
  536. return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
  537. }
  538. func isTooLargeResourceVersionError(err error) bool {
  539. if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
  540. return true
  541. }
  542. // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
  543. // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
  544. // version is larger than the largest currently available resource version. To ensure backward
  545. // compatibility with these server versions we also need to detect the error based on the content
  546. // of the error message field.
  547. if !apierrors.IsTimeout(err) {
  548. return false
  549. }
  550. apierr, ok := err.(apierrors.APIStatus)
  551. if !ok || apierr == nil || apierr.Status().Details == nil {
  552. return false
  553. }
  554. for _, cause := range apierr.Status().Details.Causes {
  555. // Matches the message returned by api server 1.17.0-1.18.5 for this error condition
  556. if cause.Message == "Too large resource version" {
  557. return true
  558. }
  559. }
  560. return false
  561. }