pager.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package pager
  14. import (
  15. "context"
  16. "fmt"
  17. "k8s.io/apimachinery/pkg/api/errors"
  18. "k8s.io/apimachinery/pkg/api/meta"
  19. metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
  20. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  21. "k8s.io/apimachinery/pkg/runtime"
  22. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  23. )
  24. const defaultPageSize = 500
  25. const defaultPageBufferSize = 10
  26. // ListPageFunc returns a list object for the given list options.
  27. type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
  28. // SimplePageFunc adapts a context-less list function into one that accepts a context.
  29. func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc {
  30. return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
  31. return fn(opts)
  32. }
  33. }
  34. // ListPager assists client code in breaking large list queries into multiple
  35. // smaller chunks of PageSize or smaller. PageFn is expected to accept a
  36. // metav1.ListOptions that supports paging and return a list. The pager does
  37. // not alter the field or label selectors on the initial options list.
  38. type ListPager struct {
  39. PageSize int64
  40. PageFn ListPageFunc
  41. FullListIfExpired bool
  42. // Number of pages to buffer
  43. PageBufferSize int32
  44. }
  45. // New creates a new pager from the provided pager function using the default
  46. // options. It will fall back to a full list if an expiration error is encountered
  47. // as a last resort.
  48. func New(fn ListPageFunc) *ListPager {
  49. return &ListPager{
  50. PageSize: defaultPageSize,
  51. PageFn: fn,
  52. FullListIfExpired: true,
  53. PageBufferSize: defaultPageBufferSize,
  54. }
  55. }
  56. // TODO: introduce other types of paging functions - such as those that retrieve from a list
  57. // of namespaces.
  58. // List returns a single list object, but attempts to retrieve smaller chunks from the
  59. // server to reduce the impact on the server. If the chunk attempt fails, it will load
  60. // the full list instead. The Limit field on options, if unset, will default to the page size.
  61. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, bool, error) {
  62. if options.Limit == 0 {
  63. options.Limit = p.PageSize
  64. }
  65. requestedResourceVersion := options.ResourceVersion
  66. var list *metainternalversion.List
  67. paginatedResult := false
  68. for {
  69. select {
  70. case <-ctx.Done():
  71. return nil, paginatedResult, ctx.Err()
  72. default:
  73. }
  74. obj, err := p.PageFn(ctx, options)
  75. if err != nil {
  76. // Only fallback to full list if an "Expired" errors is returned, FullListIfExpired is true, and
  77. // the "Expired" error occurred in page 2 or later (since full list is intended to prevent a pager.List from
  78. // failing when the resource versions is established by the first page request falls out of the compaction
  79. // during the subsequent list requests).
  80. if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
  81. return nil, paginatedResult, err
  82. }
  83. // the list expired while we were processing, fall back to a full list at
  84. // the requested ResourceVersion.
  85. options.Limit = 0
  86. options.Continue = ""
  87. options.ResourceVersion = requestedResourceVersion
  88. result, err := p.PageFn(ctx, options)
  89. return result, paginatedResult, err
  90. }
  91. m, err := meta.ListAccessor(obj)
  92. if err != nil {
  93. return nil, paginatedResult, fmt.Errorf("returned object must be a list: %v", err)
  94. }
  95. // exit early and return the object we got if we haven't processed any pages
  96. if len(m.GetContinue()) == 0 && list == nil {
  97. return obj, paginatedResult, nil
  98. }
  99. // initialize the list and fill its contents
  100. if list == nil {
  101. list = &metainternalversion.List{Items: make([]runtime.Object, 0, options.Limit+1)}
  102. list.ResourceVersion = m.GetResourceVersion()
  103. list.SelfLink = m.GetSelfLink()
  104. }
  105. if err := meta.EachListItem(obj, func(obj runtime.Object) error {
  106. list.Items = append(list.Items, obj)
  107. return nil
  108. }); err != nil {
  109. return nil, paginatedResult, err
  110. }
  111. // if we have no more items, return the list
  112. if len(m.GetContinue()) == 0 {
  113. return list, paginatedResult, nil
  114. }
  115. // set the next loop up
  116. options.Continue = m.GetContinue()
  117. // Clear the ResourceVersion on the subsequent List calls to avoid the
  118. // `specifying resource version is not allowed when using continue` error.
  119. // See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
  120. options.ResourceVersion = ""
  121. // At this point, result is already paginated.
  122. paginatedResult = true
  123. }
  124. }
  125. // EachListItem fetches runtime.Object items using this ListPager and invokes fn on each item. If
  126. // fn returns an error, processing stops and that error is returned. If fn does not return an error,
  127. // any error encountered while retrieving the list from the server is returned. If the context
  128. // cancels or times out, the context error is returned. Since the list is retrieved in paginated
  129. // chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the pagination list
  130. // requests exceed the expiration limit of the apiserver being called.
  131. //
  132. // Items are retrieved in chunks from the server to reduce the impact on the server with up to
  133. // ListPager.PageBufferSize chunks buffered concurrently in the background.
  134. func (p *ListPager) EachListItem(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
  135. return p.eachListChunkBuffered(ctx, options, func(obj runtime.Object) error {
  136. return meta.EachListItem(obj, fn)
  137. })
  138. }
  139. // eachListChunkBuffered fetches runtimeObject list chunks using this ListPager and invokes fn on
  140. // each list chunk. If fn returns an error, processing stops and that error is returned. If fn does
  141. // not return an error, any error encountered while retrieving the list from the server is
  142. // returned. If the context cancels or times out, the context error is returned. Since the list is
  143. // retrieved in paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if
  144. // the pagination list requests exceed the expiration limit of the apiserver being called.
  145. //
  146. // Up to ListPager.PageBufferSize chunks are buffered concurrently in the background.
  147. func (p *ListPager) eachListChunkBuffered(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
  148. if p.PageBufferSize < 0 {
  149. return fmt.Errorf("ListPager.PageBufferSize must be >= 0, got %d", p.PageBufferSize)
  150. }
  151. // Ensure background goroutine is stopped if this call exits before all list items are
  152. // processed. Cancelation error from this deferred cancel call is never returned to caller;
  153. // either the list result has already been sent to bgResultC or the fn error is returned and
  154. // the cancelation error is discarded.
  155. ctx, cancel := context.WithCancel(ctx)
  156. defer cancel()
  157. chunkC := make(chan runtime.Object, p.PageBufferSize)
  158. bgResultC := make(chan error, 1)
  159. go func() {
  160. defer utilruntime.HandleCrash()
  161. var err error
  162. defer func() {
  163. close(chunkC)
  164. bgResultC <- err
  165. }()
  166. err = p.eachListChunk(ctx, options, func(chunk runtime.Object) error {
  167. select {
  168. case chunkC <- chunk: // buffer the chunk, this can block
  169. case <-ctx.Done():
  170. return ctx.Err()
  171. }
  172. return nil
  173. })
  174. }()
  175. for o := range chunkC {
  176. err := fn(o)
  177. if err != nil {
  178. return err // any fn error should be returned immediately
  179. }
  180. }
  181. // promote the results of our background goroutine to the foreground
  182. return <-bgResultC
  183. }
  184. // eachListChunk fetches runtimeObject list chunks using this ListPager and invokes fn on each list
  185. // chunk. If fn returns an error, processing stops and that error is returned. If fn does not return
  186. // an error, any error encountered while retrieving the list from the server is returned. If the
  187. // context cancels or times out, the context error is returned. Since the list is retrieved in
  188. // paginated chunks, an "Expired" error (metav1.StatusReasonExpired) may be returned if the
  189. // pagination list requests exceed the expiration limit of the apiserver being called.
  190. func (p *ListPager) eachListChunk(ctx context.Context, options metav1.ListOptions, fn func(obj runtime.Object) error) error {
  191. if options.Limit == 0 {
  192. options.Limit = p.PageSize
  193. }
  194. for {
  195. select {
  196. case <-ctx.Done():
  197. return ctx.Err()
  198. default:
  199. }
  200. obj, err := p.PageFn(ctx, options)
  201. if err != nil {
  202. return err
  203. }
  204. m, err := meta.ListAccessor(obj)
  205. if err != nil {
  206. return fmt.Errorf("returned object must be a list: %v", err)
  207. }
  208. if err := fn(obj); err != nil {
  209. return err
  210. }
  211. // if we have no more items, return.
  212. if len(m.GetContinue()) == 0 {
  213. return nil
  214. }
  215. // set the next loop up
  216. options.Continue = m.GetContinue()
  217. }
  218. }