request.go 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rest
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/hex"
  18. "fmt"
  19. "io"
  20. "mime"
  21. "net/http"
  22. "net/http/httptrace"
  23. "net/url"
  24. "os"
  25. "path"
  26. "reflect"
  27. "strconv"
  28. "strings"
  29. "sync"
  30. "time"
  31. "golang.org/x/net/http2"
  32. "k8s.io/apimachinery/pkg/api/errors"
  33. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  34. "k8s.io/apimachinery/pkg/runtime"
  35. "k8s.io/apimachinery/pkg/runtime/schema"
  36. "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
  37. "k8s.io/apimachinery/pkg/util/net"
  38. "k8s.io/apimachinery/pkg/watch"
  39. restclientwatch "k8s.io/client-go/rest/watch"
  40. "k8s.io/client-go/tools/metrics"
  41. "k8s.io/client-go/util/flowcontrol"
  42. "k8s.io/klog/v2"
  43. "k8s.io/utils/clock"
  44. )
  45. const (
  46. // longThrottleLatency defines threshold for logging requests. All requests being
  47. // throttled (via the provided rateLimiter) for more than longThrottleLatency will
  48. // be logged.
  49. longThrottleLatency = 50 * time.Millisecond
  50. // extraLongThrottleLatency defines the threshold for logging requests at log level 2.
  51. extraLongThrottleLatency = 1 * time.Second
  52. )
  53. // HTTPClient is an interface for testing a request object.
  54. type HTTPClient interface {
  55. Do(req *http.Request) (*http.Response, error)
  56. }
  57. // ResponseWrapper is an interface for getting a response.
  58. // The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
  59. type ResponseWrapper interface {
  60. DoRaw(context.Context) ([]byte, error)
  61. Stream(context.Context) (io.ReadCloser, error)
  62. }
  63. // RequestConstructionError is returned when there's an error assembling a request.
  64. type RequestConstructionError struct {
  65. Err error
  66. }
  67. // Error returns a textual description of 'r'.
  68. func (r *RequestConstructionError) Error() string {
  69. return fmt.Sprintf("request construction error: '%v'", r.Err)
  70. }
  71. var noBackoff = &NoBackoff{}
  72. type requestRetryFunc func(maxRetries int) WithRetry
  73. func defaultRequestRetryFn(maxRetries int) WithRetry {
  74. return &withRetry{maxRetries: maxRetries}
  75. }
  76. // Request allows for building up a request to a server in a chained fashion.
  77. // Any errors are stored until the end of your call, so you only have to
  78. // check once.
  79. type Request struct {
  80. c *RESTClient
  81. contentConfig ClientContentConfig
  82. contentTypeNotSet bool
  83. warningHandler WarningHandlerWithContext
  84. rateLimiter flowcontrol.RateLimiter
  85. backoff BackoffManagerWithContext
  86. timeout time.Duration
  87. maxRetries int
  88. // generic components accessible via method setters
  89. verb string
  90. pathPrefix string
  91. subpath string
  92. params url.Values
  93. headers http.Header
  94. // structural elements of the request that are part of the Kubernetes API conventions
  95. namespace string
  96. namespaceSet bool
  97. resource string
  98. resourceName string
  99. subresource string
  100. // output
  101. err error
  102. // only one of body / bodyBytes may be set. requests using body are not retryable.
  103. body io.Reader
  104. bodyBytes []byte
  105. retryFn requestRetryFunc
  106. }
  107. // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
  108. func NewRequest(c *RESTClient) *Request {
  109. var backoff BackoffManagerWithContext
  110. if c.createBackoffMgr != nil {
  111. backoff = c.createBackoffMgr()
  112. }
  113. if backoff == nil {
  114. backoff = noBackoff
  115. }
  116. var pathPrefix string
  117. if c.base != nil {
  118. pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
  119. } else {
  120. pathPrefix = path.Join("/", c.versionedAPIPath)
  121. }
  122. var timeout time.Duration
  123. if c.Client != nil {
  124. timeout = c.Client.Timeout
  125. }
  126. // A request needs to know whether the content type was explicitly configured or selected by
  127. // default in order to support the per-request Protobuf override used by clients generated
  128. // with --prefers-protobuf.
  129. contentConfig, contentTypeDefaulted := c.content.GetClientContentConfig()
  130. r := &Request{
  131. c: c,
  132. rateLimiter: c.rateLimiter,
  133. backoff: backoff,
  134. timeout: timeout,
  135. pathPrefix: pathPrefix,
  136. maxRetries: 10,
  137. retryFn: defaultRequestRetryFn,
  138. warningHandler: c.warningHandler,
  139. contentConfig: contentConfig,
  140. contentTypeNotSet: contentTypeDefaulted,
  141. }
  142. r.setAcceptHeader()
  143. return r
  144. }
  145. // NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
  146. func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
  147. return NewRequest(&RESTClient{
  148. base: base,
  149. versionedAPIPath: versionedAPIPath,
  150. content: requestClientContentConfigProvider{base: content},
  151. Client: client,
  152. })
  153. }
  154. func (r *Request) UseProtobufAsDefaultIfPreferred(prefersProtobuf bool) *Request {
  155. if prefersProtobuf {
  156. return r.UseProtobufAsDefault()
  157. }
  158. return r
  159. }
  160. func (r *Request) UseProtobufAsDefault() *Request {
  161. if r.contentTypeNotSet && len(r.contentConfig.AcceptContentTypes) == 0 {
  162. r.contentConfig.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
  163. r.contentConfig.ContentType = "application/vnd.kubernetes.protobuf"
  164. r.setAcceptHeader()
  165. }
  166. return r
  167. }
  168. func (r *Request) setAcceptHeader() {
  169. switch {
  170. case len(r.contentConfig.AcceptContentTypes) > 0:
  171. r.SetHeader("Accept", r.contentConfig.AcceptContentTypes)
  172. case len(r.contentConfig.ContentType) > 0:
  173. r.SetHeader("Accept", r.contentConfig.ContentType+", */*")
  174. }
  175. }
  176. // Verb sets the verb this request will use.
  177. func (r *Request) Verb(verb string) *Request {
  178. r.verb = verb
  179. return r
  180. }
  181. // Prefix adds segments to the relative beginning to the request path. These
  182. // items will be placed before the optional Namespace, Resource, or Name sections.
  183. // Setting AbsPath will clear any previously set Prefix segments
  184. func (r *Request) Prefix(segments ...string) *Request {
  185. if r.err != nil {
  186. return r
  187. }
  188. r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
  189. return r
  190. }
  191. // Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
  192. // Namespace, Resource, or Name sections.
  193. func (r *Request) Suffix(segments ...string) *Request {
  194. if r.err != nil {
  195. return r
  196. }
  197. r.subpath = path.Join(r.subpath, path.Join(segments...))
  198. return r
  199. }
  200. // Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
  201. func (r *Request) Resource(resource string) *Request {
  202. if r.err != nil {
  203. return r
  204. }
  205. if len(r.resource) != 0 {
  206. r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
  207. return r
  208. }
  209. if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
  210. r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
  211. return r
  212. }
  213. r.resource = resource
  214. return r
  215. }
  216. // BackOff sets the request's backoff manager to the one specified,
  217. // or defaults to the stub implementation if nil is provided.
  218. //
  219. // Deprecated: BackoffManager.Sleep ignores the caller's context. Use BackOffWithContext and BackoffManagerWithContext instead.
  220. func (r *Request) BackOff(manager BackoffManager) *Request {
  221. if manager == nil {
  222. r.backoff = &NoBackoff{}
  223. return r
  224. }
  225. r.backoff = &backoffManagerNopContext{BackoffManager: manager}
  226. return r
  227. }
  228. // BackOffWithContext sets the request's backoff manager to the one specified,
  229. // or defaults to the stub implementation if nil is provided.
  230. func (r *Request) BackOffWithContext(manager BackoffManagerWithContext) *Request {
  231. if manager == nil {
  232. r.backoff = &NoBackoff{}
  233. return r
  234. }
  235. r.backoff = manager
  236. return r
  237. }
  238. // WarningHandler sets the handler this client uses when warning headers are encountered.
  239. // If set to nil, this client will use the default warning handler (see [SetDefaultWarningHandler]).
  240. //
  241. //logcheck:context // WarningHandlerWithContext should be used instead of WarningHandler in code which supports contextual logging.
  242. func (r *Request) WarningHandler(handler WarningHandler) *Request {
  243. if handler == nil {
  244. r.warningHandler = nil
  245. return r
  246. }
  247. r.warningHandler = warningLoggerNopContext{l: handler}
  248. return r
  249. }
  250. // WarningHandlerWithContext sets the handler this client uses when warning headers are encountered.
  251. // If set to nil, this client will use the default warning handler (see [SetDefaultWarningHandlerWithContext]).
  252. func (r *Request) WarningHandlerWithContext(handler WarningHandlerWithContext) *Request {
  253. r.warningHandler = handler
  254. return r
  255. }
  256. // Throttle receives a rate-limiter and sets or replaces an existing request limiter
  257. func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
  258. r.rateLimiter = limiter
  259. return r
  260. }
  261. // SubResource sets a sub-resource path which can be multiple segments after the resource
  262. // name but before the suffix.
  263. func (r *Request) SubResource(subresources ...string) *Request {
  264. if r.err != nil {
  265. return r
  266. }
  267. subresource := path.Join(subresources...)
  268. if len(r.subresource) != 0 {
  269. r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.subresource, subresource)
  270. return r
  271. }
  272. for _, s := range subresources {
  273. if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
  274. r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
  275. return r
  276. }
  277. }
  278. r.subresource = subresource
  279. return r
  280. }
  281. // Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
  282. func (r *Request) Name(resourceName string) *Request {
  283. if r.err != nil {
  284. return r
  285. }
  286. if len(resourceName) == 0 {
  287. r.err = fmt.Errorf("resource name may not be empty")
  288. return r
  289. }
  290. if len(r.resourceName) != 0 {
  291. r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
  292. return r
  293. }
  294. if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
  295. r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
  296. return r
  297. }
  298. r.resourceName = resourceName
  299. return r
  300. }
  301. // Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
  302. func (r *Request) Namespace(namespace string) *Request {
  303. if r.err != nil {
  304. return r
  305. }
  306. if r.namespaceSet {
  307. r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
  308. return r
  309. }
  310. if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
  311. r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
  312. return r
  313. }
  314. r.namespaceSet = true
  315. r.namespace = namespace
  316. return r
  317. }
  318. // NamespaceIfScoped is a convenience function to set a namespace if scoped is true
  319. func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
  320. if scoped {
  321. return r.Namespace(namespace)
  322. }
  323. return r
  324. }
  325. // AbsPath overwrites an existing path with the segments provided.
  326. // Trailing slashes are preserved when a single segment is passed.
  327. // Any path in the request's REST client's base URL is preserved as a prefix.
  328. func (r *Request) AbsPath(segments ...string) *Request {
  329. if r.err != nil {
  330. return r
  331. }
  332. r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
  333. if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
  334. // preserve any trailing slashes for legacy behavior
  335. r.pathPrefix += "/"
  336. }
  337. return r
  338. }
  339. // RequestURI overwrites existing path and parameters with the value of the provided server relative URI.
  340. // This is equivalent to clearing params, then calling AbsPath() + Param() for each query parameter.
  341. func (r *Request) RequestURI(uri string) *Request {
  342. if r.err != nil {
  343. return r
  344. }
  345. locator, err := url.Parse(uri)
  346. if err != nil {
  347. r.err = err
  348. return r
  349. }
  350. // AbsPath handles prepending r.c.base.Path, if set
  351. r.AbsPath(locator.Path)
  352. if len(locator.Query()) > 0 {
  353. // clear any existing params
  354. r.params = make(url.Values)
  355. for k, v := range locator.Query() {
  356. r.params[k] = v
  357. }
  358. } else {
  359. // clear any existing params
  360. r.params = nil
  361. }
  362. return r
  363. }
  364. // Param creates a query parameter with the given string value.
  365. func (r *Request) Param(paramName, s string) *Request {
  366. if r.err != nil {
  367. return r
  368. }
  369. return r.setParam(paramName, s)
  370. }
  371. // VersionedParams will take the provided object, serialize it to a map[string][]string using the
  372. // implicit RESTClient API version and the default parameter codec, and then add those as parameters
  373. // to the request. Use this to provide versioned query parameters from client libraries.
  374. // VersionedParams will not write query parameters that have omitempty set and are empty. If a
  375. // parameter has already been set it is appended to (Params and VersionedParams are additive).
  376. func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
  377. return r.SpecificallyVersionedParams(obj, codec, r.contentConfig.GroupVersion)
  378. }
  379. func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
  380. if r.err != nil {
  381. return r
  382. }
  383. params, err := codec.EncodeParameters(obj, version)
  384. if err != nil {
  385. r.err = err
  386. return r
  387. }
  388. for k, v := range params {
  389. if r.params == nil {
  390. r.params = make(url.Values)
  391. }
  392. r.params[k] = append(r.params[k], v...)
  393. }
  394. return r
  395. }
  396. func (r *Request) setParam(paramName, value string) *Request {
  397. if r.params == nil {
  398. r.params = make(url.Values)
  399. }
  400. r.params[paramName] = append(r.params[paramName], value)
  401. return r
  402. }
  403. func (r *Request) SetHeader(key string, values ...string) *Request {
  404. if r.headers == nil {
  405. r.headers = http.Header{}
  406. }
  407. r.headers.Del(key)
  408. for _, value := range values {
  409. r.headers.Add(key, value)
  410. }
  411. return r
  412. }
  413. // Timeout makes the request use the given duration as an overall timeout for the
  414. // request. Additionally, if set passes the value as "timeout" parameter in URL.
  415. func (r *Request) Timeout(d time.Duration) *Request {
  416. if r.err != nil {
  417. return r
  418. }
  419. r.timeout = d
  420. return r
  421. }
  422. // MaxRetries makes the request use the given integer as a ceiling of retrying upon receiving
  423. // "Retry-After" headers and 429 status-code in the response. The default is 10 unless this
  424. // function is specifically called with a different value.
  425. // A zero maxRetries prevent it from doing retires and return an error immediately.
  426. func (r *Request) MaxRetries(maxRetries int) *Request {
  427. if maxRetries < 0 {
  428. maxRetries = 0
  429. }
  430. r.maxRetries = maxRetries
  431. return r
  432. }
  433. // Body makes the request use obj as the body. Optional.
  434. // If obj is a string, try to read a file of that name.
  435. // If obj is a []byte, send it directly.
  436. // If obj is an io.Reader, use it directly.
  437. // If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
  438. // If obj is a runtime.Object and nil, do nothing.
  439. // Otherwise, set an error.
  440. func (r *Request) Body(obj interface{}) *Request {
  441. if r.err != nil {
  442. return r
  443. }
  444. switch t := obj.(type) {
  445. case string:
  446. data, err := os.ReadFile(t)
  447. if err != nil {
  448. r.err = err
  449. return r
  450. }
  451. r.body = nil
  452. r.bodyBytes = data
  453. case []byte:
  454. r.body = nil
  455. r.bodyBytes = t
  456. case io.Reader:
  457. r.body = t
  458. r.bodyBytes = nil
  459. case runtime.Object:
  460. // callers may pass typed interface pointers, therefore we must check nil with reflection
  461. if reflect.ValueOf(t).IsNil() {
  462. return r
  463. }
  464. encoder, err := r.contentConfig.Negotiator.Encoder(r.contentConfig.ContentType, nil)
  465. if err != nil {
  466. r.err = err
  467. return r
  468. }
  469. data, err := runtime.Encode(encoder, t)
  470. if err != nil {
  471. r.err = err
  472. return r
  473. }
  474. r.body = nil
  475. r.bodyBytes = data
  476. r.SetHeader("Content-Type", r.contentConfig.ContentType)
  477. default:
  478. r.err = fmt.Errorf("unknown type used for body: %+v", obj)
  479. }
  480. return r
  481. }
  482. // Error returns any error encountered constructing the request, if any.
  483. func (r *Request) Error() error {
  484. return r.err
  485. }
  486. // URL returns the current working URL. Check the result of Error() to ensure
  487. // that the returned URL is valid.
  488. func (r *Request) URL() *url.URL {
  489. p := r.pathPrefix
  490. if r.namespaceSet && len(r.namespace) > 0 {
  491. p = path.Join(p, "namespaces", r.namespace)
  492. }
  493. if len(r.resource) != 0 {
  494. p = path.Join(p, strings.ToLower(r.resource))
  495. }
  496. // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
  497. if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
  498. p = path.Join(p, r.resourceName, r.subresource, r.subpath)
  499. }
  500. finalURL := &url.URL{}
  501. if r.c.base != nil {
  502. *finalURL = *r.c.base
  503. }
  504. finalURL.Path = p
  505. query := url.Values{}
  506. for key, values := range r.params {
  507. for _, value := range values {
  508. query.Add(key, value)
  509. }
  510. }
  511. // timeout is handled specially here.
  512. if r.timeout != 0 {
  513. query.Set("timeout", r.timeout.String())
  514. }
  515. finalURL.RawQuery = query.Encode()
  516. return finalURL
  517. }
  518. // finalURLTemplate is similar to URL(), but will make all specific parameter values equal
  519. // - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
  520. // parameters will be reset. This creates a copy of the url so as not to change the
  521. // underlying object.
  522. func (r Request) finalURLTemplate() url.URL {
  523. newParams := url.Values{}
  524. v := []string{"{value}"}
  525. for k := range r.params {
  526. newParams[k] = v
  527. }
  528. r.params = newParams
  529. u := r.URL()
  530. if u == nil {
  531. return url.URL{}
  532. }
  533. segments := strings.Split(u.Path, "/")
  534. groupIndex := 0
  535. index := 0
  536. trimmedBasePath := ""
  537. if r.c.base != nil && strings.Contains(u.Path, r.c.base.Path) {
  538. p := strings.TrimPrefix(u.Path, r.c.base.Path)
  539. if !strings.HasPrefix(p, "/") {
  540. p = "/" + p
  541. }
  542. // store the base path that we have trimmed so we can append it
  543. // before returning the URL
  544. trimmedBasePath = r.c.base.Path
  545. segments = strings.Split(p, "/")
  546. groupIndex = 1
  547. }
  548. if len(segments) <= 2 {
  549. return *u
  550. }
  551. const CoreGroupPrefix = "api"
  552. const NamedGroupPrefix = "apis"
  553. isCoreGroup := segments[groupIndex] == CoreGroupPrefix
  554. isNamedGroup := segments[groupIndex] == NamedGroupPrefix
  555. if isCoreGroup {
  556. // checking the case of core group with /api/v1/... format
  557. index = groupIndex + 2
  558. } else if isNamedGroup {
  559. // checking the case of named group with /apis/apps/v1/... format
  560. index = groupIndex + 3
  561. } else {
  562. // this should not happen that the only two possibilities are /api... and /apis..., just want to put an
  563. // outlet here in case more API groups are added in future if ever possible:
  564. // https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
  565. // if a wrong API groups name is encountered, return the {prefix} for url.Path
  566. u.Path = "/{prefix}"
  567. u.RawQuery = ""
  568. return *u
  569. }
  570. // switch segLength := len(segments) - index; segLength {
  571. switch {
  572. // case len(segments) - index == 1:
  573. // resource (with no name) do nothing
  574. case len(segments)-index == 2:
  575. // /$RESOURCE/$NAME: replace $NAME with {name}
  576. segments[index+1] = "{name}"
  577. case len(segments)-index == 3:
  578. if segments[index+2] == "finalize" || segments[index+2] == "status" {
  579. // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
  580. segments[index+1] = "{name}"
  581. } else {
  582. // /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
  583. segments[index+1] = "{namespace}"
  584. }
  585. case len(segments)-index >= 4:
  586. segments[index+1] = "{namespace}"
  587. // /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
  588. if segments[index+3] != "finalize" && segments[index+3] != "status" {
  589. // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
  590. segments[index+3] = "{name}"
  591. }
  592. }
  593. u.Path = path.Join(trimmedBasePath, path.Join(segments...))
  594. return *u
  595. }
  596. func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) error {
  597. if r.rateLimiter == nil {
  598. return nil
  599. }
  600. now := time.Now()
  601. err := r.rateLimiter.Wait(ctx)
  602. if err != nil {
  603. err = fmt.Errorf("client rate limiter Wait returned an error: %w", err)
  604. }
  605. latency := time.Since(now)
  606. if latency > longThrottleLatency {
  607. if retryInfo == "" {
  608. retryInfo = "client-side throttling, not priority and fairness"
  609. }
  610. klog.FromContext(ctx).V(3).Info("Waited before sending request", "delay", latency, "reason", retryInfo, "verb", r.verb, "URL", r.URL())
  611. if latency > extraLongThrottleLatency {
  612. // If the rate limiter latency is very high, the log message should be printed at a higher log level,
  613. // but we use a throttled logger to prevent spamming.
  614. globalThrottledLogger.info(klog.FromContext(ctx), "Waited before sending request", "delay", latency, "reason", retryInfo, "verb", r.verb, "URL", r.URL())
  615. }
  616. }
  617. metrics.RateLimiterLatency.Observe(ctx, r.verb, r.finalURLTemplate(), latency)
  618. return err
  619. }
  620. func (r *Request) tryThrottle(ctx context.Context) error {
  621. return r.tryThrottleWithInfo(ctx, "")
  622. }
  623. type throttleSettings struct {
  624. logLevel int
  625. minLogInterval time.Duration
  626. lastLogTime time.Time
  627. lock sync.RWMutex
  628. }
  629. type throttledLogger struct {
  630. clock clock.PassiveClock
  631. settings []*throttleSettings
  632. }
  633. var globalThrottledLogger = &throttledLogger{
  634. clock: clock.RealClock{},
  635. settings: []*throttleSettings{
  636. {
  637. logLevel: 2,
  638. minLogInterval: 1 * time.Second,
  639. }, {
  640. logLevel: 0,
  641. minLogInterval: 10 * time.Second,
  642. },
  643. },
  644. }
  645. func (b *throttledLogger) attemptToLog(logger klog.Logger) (int, bool) {
  646. for _, setting := range b.settings {
  647. if bool(logger.V(setting.logLevel).Enabled()) {
  648. // Return early without write locking if possible.
  649. if func() bool {
  650. setting.lock.RLock()
  651. defer setting.lock.RUnlock()
  652. return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
  653. }() {
  654. setting.lock.Lock()
  655. defer setting.lock.Unlock()
  656. if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
  657. setting.lastLogTime = b.clock.Now()
  658. return setting.logLevel, true
  659. }
  660. }
  661. return -1, false
  662. }
  663. }
  664. return -1, false
  665. }
  666. // Infof will write a log message at each logLevel specified by the receiver's throttleSettings
  667. // as long as it hasn't written a log message more recently than minLogInterval.
  668. func (b *throttledLogger) info(logger klog.Logger, message string, kv ...any) {
  669. if logLevel, ok := b.attemptToLog(logger); ok {
  670. logger.V(logLevel).Info(message, kv...)
  671. }
  672. }
  673. // Watch attempts to begin watching the requested location.
  674. // Returns a watch.Interface, or an error.
  675. func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
  676. if r.body == nil {
  677. logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes)
  678. }
  679. // We specifically don't want to rate limit watches, so we
  680. // don't use r.rateLimiter here.
  681. if r.err != nil {
  682. return nil, r.err
  683. }
  684. client := r.c.Client
  685. if client == nil {
  686. client = http.DefaultClient
  687. }
  688. isErrRetryableFunc := func(request *http.Request, err error) bool {
  689. // The watch stream mechanism handles many common partial data errors, so closed
  690. // connections can be retried in many cases.
  691. if net.IsProbableEOF(err) || net.IsTimeout(err) {
  692. return true
  693. }
  694. return false
  695. }
  696. retry := r.retryFn(r.maxRetries)
  697. url := r.URL().String()
  698. for {
  699. if err := retry.Before(ctx, r); err != nil {
  700. return nil, retry.WrapPreviousError(err)
  701. }
  702. req, err := r.newHTTPRequest(ctx)
  703. if err != nil {
  704. return nil, err
  705. }
  706. resp, err := client.Do(req)
  707. retry.After(ctx, r, resp, err)
  708. if err == nil && resp.StatusCode == http.StatusOK {
  709. return r.newStreamWatcher(ctx, resp)
  710. }
  711. done, transformErr := func() (bool, error) {
  712. defer readAndCloseResponseBody(resp)
  713. if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
  714. return false, nil
  715. }
  716. if resp == nil {
  717. // the server must have sent us an error in 'err'
  718. return true, nil
  719. }
  720. result := r.transformResponse(ctx, resp, req)
  721. if err := result.Error(); err != nil {
  722. return true, err
  723. }
  724. return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
  725. }()
  726. if done {
  727. if isErrRetryableFunc(req, err) {
  728. return watch.NewEmptyWatch(), nil
  729. }
  730. if err == nil {
  731. // if the server sent us an HTTP Response object,
  732. // we need to return the error object from that.
  733. err = transformErr
  734. }
  735. return nil, retry.WrapPreviousError(err)
  736. }
  737. }
  738. }
  739. func (r *Request) newStreamWatcher(ctx context.Context, resp *http.Response) (watch.Interface, error) {
  740. contentType := resp.Header.Get("Content-Type")
  741. mediaType, params, err := mime.ParseMediaType(contentType)
  742. if err != nil {
  743. klog.FromContext(ctx).V(4).Info("Unexpected content type from the server", "contentType", contentType, "err", err)
  744. }
  745. objectDecoder, streamingSerializer, framer, err := r.contentConfig.Negotiator.StreamDecoder(mediaType, params)
  746. if err != nil {
  747. return nil, err
  748. }
  749. handleWarnings(ctx, resp.Header, r.warningHandler)
  750. frameReader := framer.NewFrameReader(resp.Body)
  751. watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
  752. return watch.NewStreamWatcherWithLogger(
  753. klog.FromContext(ctx),
  754. restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
  755. // use 500 to indicate that the cause of the error is unknown - other error codes
  756. // are more specific to HTTP interactions, and set a reason
  757. errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
  758. ), nil
  759. }
  760. // updateRequestResultMetric increments the RequestResult metric counter,
  761. // it should be called with the (response, err) tuple from the final
  762. // reply from the server.
  763. func updateRequestResultMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
  764. code, host := sanitize(req, resp, err)
  765. metrics.RequestResult.Increment(ctx, code, req.verb, host)
  766. }
  767. // updateRequestRetryMetric increments the RequestRetry metric counter,
  768. // it should be called with the (response, err) tuple for each retry
  769. // except for the final attempt.
  770. func updateRequestRetryMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
  771. code, host := sanitize(req, resp, err)
  772. metrics.RequestRetry.IncrementRetry(ctx, code, req.verb, host)
  773. }
  774. func sanitize(req *Request, resp *http.Response, err error) (string, string) {
  775. host := "none"
  776. if req.c.base != nil {
  777. host = req.c.base.Host
  778. }
  779. // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
  780. // system so we just report them as `<error>`.
  781. code := "<error>"
  782. if resp != nil {
  783. code = strconv.Itoa(resp.StatusCode)
  784. }
  785. return code, host
  786. }
  787. // Stream formats and executes the request, and offers streaming of the response.
  788. // Returns io.ReadCloser which could be used for streaming of the response, or an error
  789. // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
  790. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
  791. func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
  792. if r.body == nil {
  793. logBody(klog.FromContext(ctx), 2, "Request Body", r.bodyBytes)
  794. }
  795. if r.err != nil {
  796. return nil, r.err
  797. }
  798. if err := r.tryThrottle(ctx); err != nil {
  799. return nil, err
  800. }
  801. client := r.c.Client
  802. if client == nil {
  803. client = http.DefaultClient
  804. }
  805. retry := r.retryFn(r.maxRetries)
  806. url := r.URL().String()
  807. for {
  808. if err := retry.Before(ctx, r); err != nil {
  809. return nil, err
  810. }
  811. req, err := r.newHTTPRequest(ctx)
  812. if err != nil {
  813. return nil, err
  814. }
  815. resp, err := client.Do(req)
  816. retry.After(ctx, r, resp, err)
  817. if err != nil {
  818. // we only retry on an HTTP response with 'Retry-After' header
  819. return nil, err
  820. }
  821. switch {
  822. case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
  823. handleWarnings(ctx, resp.Header, r.warningHandler)
  824. return resp.Body, nil
  825. default:
  826. done, transformErr := func() (bool, error) {
  827. defer resp.Body.Close()
  828. if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
  829. return false, nil
  830. }
  831. result := r.transformResponse(ctx, resp, req)
  832. if err := result.Error(); err != nil {
  833. return true, err
  834. }
  835. return true, fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
  836. }()
  837. if done {
  838. return nil, transformErr
  839. }
  840. }
  841. }
  842. }
  843. // requestPreflightCheck looks for common programmer errors on Request.
  844. //
  845. // We tackle here two programmer mistakes. The first one is to try to create
  846. // something(POST) using an empty string as namespace with namespaceSet as
  847. // true. If namespaceSet is true then namespace should also be defined. The
  848. // second mistake is, when under the same circumstances, the programmer tries
  849. // to GET, PUT or DELETE a named resource(resourceName != ""), again, if
  850. // namespaceSet is true then namespace must not be empty.
  851. func (r *Request) requestPreflightCheck() error {
  852. if !r.namespaceSet {
  853. return nil
  854. }
  855. if len(r.namespace) > 0 {
  856. return nil
  857. }
  858. switch r.verb {
  859. case "POST":
  860. return fmt.Errorf("an empty namespace may not be set during creation")
  861. case "GET", "PUT", "DELETE":
  862. if len(r.resourceName) > 0 {
  863. return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
  864. }
  865. }
  866. return nil
  867. }
  868. func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
  869. var body io.Reader
  870. switch {
  871. case r.body != nil && r.bodyBytes != nil:
  872. return nil, fmt.Errorf("cannot set both body and bodyBytes")
  873. case r.body != nil:
  874. body = r.body
  875. case r.bodyBytes != nil:
  876. // Create a new reader specifically for this request.
  877. // Giving each request a dedicated reader allows retries to avoid races resetting the request body.
  878. body = bytes.NewReader(r.bodyBytes)
  879. }
  880. url := r.URL().String()
  881. req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, newDNSMetricsTrace(ctx)), r.verb, url, body)
  882. if err != nil {
  883. return nil, err
  884. }
  885. req.Header = r.headers
  886. return req, nil
  887. }
  888. // newDNSMetricsTrace returns an HTTP trace that tracks time spent on DNS lookups per host.
  889. // This metric is available in client as "rest_client_dns_resolution_duration_seconds".
  890. func newDNSMetricsTrace(ctx context.Context) *httptrace.ClientTrace {
  891. type dnsMetric struct {
  892. start time.Time
  893. host string
  894. sync.Mutex
  895. }
  896. dns := &dnsMetric{}
  897. return &httptrace.ClientTrace{
  898. DNSStart: func(info httptrace.DNSStartInfo) {
  899. dns.Lock()
  900. defer dns.Unlock()
  901. dns.start = time.Now()
  902. dns.host = info.Host
  903. },
  904. DNSDone: func(info httptrace.DNSDoneInfo) {
  905. dns.Lock()
  906. defer dns.Unlock()
  907. metrics.ResolverLatency.Observe(ctx, dns.host, time.Since(dns.start))
  908. },
  909. }
  910. }
  911. // request connects to the server and invokes the provided function when a server response is
  912. // received. It handles retry behavior and up front validation of requests. It will invoke
  913. // fn at most once. It will return an error if a problem occurred prior to connecting to the
  914. // server - the provided function is responsible for handling server errors.
  915. func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
  916. // Metrics for total request latency
  917. start := time.Now()
  918. defer func() {
  919. metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
  920. }()
  921. if r.err != nil {
  922. klog.FromContext(ctx).V(4).Info("Error in request", "err", r.err)
  923. return r.err
  924. }
  925. if err := r.requestPreflightCheck(); err != nil {
  926. return err
  927. }
  928. client := r.c.Client
  929. if client == nil {
  930. client = http.DefaultClient
  931. }
  932. // Throttle the first try before setting up the timeout configured on the
  933. // client. We don't want a throttled client to return timeouts to callers
  934. // before it makes a single request.
  935. if err := r.tryThrottle(ctx); err != nil {
  936. return err
  937. }
  938. if r.timeout > 0 {
  939. var cancel context.CancelFunc
  940. ctx, cancel = context.WithTimeout(ctx, r.timeout)
  941. defer cancel()
  942. }
  943. isErrRetryableFunc := func(req *http.Request, err error) bool {
  944. // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
  945. // Thus in case of "GET" operations, we simply retry it.
  946. // We are not automatically retrying "write" operations, as they are not idempotent.
  947. if req.Method != "GET" {
  948. return false
  949. }
  950. // For connection errors and apiserver shutdown errors retry.
  951. if net.IsConnectionReset(err) || net.IsProbableEOF(err) || net.IsHTTP2ConnectionLost(err) {
  952. return true
  953. }
  954. return false
  955. }
  956. // Right now we make about ten retry attempts if we get a Retry-After response.
  957. retry := r.retryFn(r.maxRetries)
  958. for {
  959. if err := retry.Before(ctx, r); err != nil {
  960. return retry.WrapPreviousError(err)
  961. }
  962. req, err := r.newHTTPRequest(ctx)
  963. if err != nil {
  964. return err
  965. }
  966. resp, err := client.Do(req)
  967. // The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
  968. // https://pkg.go.dev/net/http#Request
  969. if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
  970. metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
  971. }
  972. if resp != nil && resp.StatusCode == http.StatusUnsupportedMediaType {
  973. r.c.content.UnsupportedMediaType(resp.Request.Header.Get("Content-Type"))
  974. }
  975. retry.After(ctx, r, resp, err)
  976. done := func() bool {
  977. defer readAndCloseResponseBody(resp)
  978. // if the server returns an error in err, the response will be nil.
  979. f := func(req *http.Request, resp *http.Response) {
  980. if resp == nil {
  981. return
  982. }
  983. fn(req, resp)
  984. }
  985. if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
  986. return false
  987. }
  988. f(req, resp)
  989. return true
  990. }()
  991. if done {
  992. return retry.WrapPreviousError(err)
  993. }
  994. }
  995. }
  996. // Do formats and executes the request. Returns a Result object for easy response
  997. // processing.
  998. //
  999. // Error type:
  1000. // - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
  1001. // - http.Client.Do errors are returned directly.
  1002. func (r *Request) Do(ctx context.Context) Result {
  1003. logger := klog.FromContext(ctx)
  1004. if r.body == nil {
  1005. logBody(logger, 2, "Request Body", r.bodyBytes)
  1006. }
  1007. var result Result
  1008. err := r.request(ctx, func(req *http.Request, resp *http.Response) {
  1009. result = r.transformResponse(ctx, resp, req)
  1010. })
  1011. if err != nil {
  1012. return Result{err: err, logger: logger}
  1013. }
  1014. if result.err == nil || len(result.body) > 0 {
  1015. metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
  1016. }
  1017. return result
  1018. }
  1019. // DoRaw executes the request but does not process the response body.
  1020. func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
  1021. logger := klog.FromContext(ctx)
  1022. if r.body == nil {
  1023. logBody(logger, 2, "Request Body", r.bodyBytes)
  1024. }
  1025. var result Result
  1026. err := r.request(ctx, func(req *http.Request, resp *http.Response) {
  1027. result.body, result.err = io.ReadAll(resp.Body)
  1028. logBody(logger, 2, "Response Body", result.body)
  1029. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
  1030. result.err = r.transformUnstructuredResponseError(resp, req, result.body)
  1031. }
  1032. })
  1033. if err != nil {
  1034. return nil, err
  1035. }
  1036. if result.err == nil || len(result.body) > 0 {
  1037. metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
  1038. }
  1039. return result.body, result.err
  1040. }
  1041. // transformResponse converts an API response into a structured API object
  1042. func (r *Request) transformResponse(ctx context.Context, resp *http.Response, req *http.Request) Result {
  1043. logger := klog.FromContext(ctx)
  1044. var body []byte
  1045. if resp.Body != nil {
  1046. data, err := io.ReadAll(resp.Body)
  1047. switch err.(type) {
  1048. case nil:
  1049. body = data
  1050. case http2.StreamError:
  1051. // This is trying to catch the scenario that the server may close the connection when sending the
  1052. // response body. This can be caused by server timeout due to a slow network connection.
  1053. // TODO: Add test for this. Steps may be:
  1054. // 1. client-go (or kubectl) sends a GET request.
  1055. // 2. Apiserver sends back the headers and then part of the body
  1056. // 3. Apiserver closes connection.
  1057. // 4. client-go should catch this and return an error.
  1058. logger.V(2).Info("Stream error when reading response body, may be caused by closed connection", "err", err)
  1059. streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w", err)
  1060. return Result{
  1061. err: streamErr,
  1062. logger: logger,
  1063. }
  1064. default:
  1065. logger.Error(err, "Unexpected error when reading response body")
  1066. unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %w", err)
  1067. return Result{
  1068. err: unexpectedErr,
  1069. logger: logger,
  1070. }
  1071. }
  1072. }
  1073. // Call depth is tricky. This one is okay for Do and DoRaw.
  1074. logBody(logger, 7, "Response Body", body)
  1075. // verify the content type is accurate
  1076. var decoder runtime.Decoder
  1077. contentType := resp.Header.Get("Content-Type")
  1078. if len(contentType) == 0 {
  1079. contentType = r.contentConfig.ContentType
  1080. }
  1081. if len(contentType) > 0 {
  1082. var err error
  1083. mediaType, params, err := mime.ParseMediaType(contentType)
  1084. if err != nil {
  1085. return Result{err: errors.NewInternalError(err), logger: logger}
  1086. }
  1087. decoder, err = r.contentConfig.Negotiator.Decoder(mediaType, params)
  1088. if err != nil {
  1089. // if we fail to negotiate a decoder, treat this as an unstructured error
  1090. switch {
  1091. case resp.StatusCode == http.StatusSwitchingProtocols:
  1092. // no-op, we've been upgraded
  1093. case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  1094. return Result{err: r.transformUnstructuredResponseError(resp, req, body), logger: logger}
  1095. }
  1096. return Result{
  1097. body: body,
  1098. contentType: contentType,
  1099. statusCode: resp.StatusCode,
  1100. warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
  1101. logger: logger,
  1102. }
  1103. }
  1104. }
  1105. switch {
  1106. case resp.StatusCode == http.StatusSwitchingProtocols:
  1107. // no-op, we've been upgraded
  1108. case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  1109. // calculate an unstructured error from the response which the Result object may use if the caller
  1110. // did not return a structured error.
  1111. retryAfter, _ := retryAfterSeconds(resp)
  1112. err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
  1113. return Result{
  1114. body: body,
  1115. contentType: contentType,
  1116. statusCode: resp.StatusCode,
  1117. decoder: decoder,
  1118. err: err,
  1119. warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
  1120. logger: logger,
  1121. }
  1122. }
  1123. return Result{
  1124. body: body,
  1125. contentType: contentType,
  1126. statusCode: resp.StatusCode,
  1127. decoder: decoder,
  1128. warnings: handleWarnings(ctx, resp.Header, r.warningHandler),
  1129. logger: logger,
  1130. }
  1131. }
  1132. // truncateBody decides if the body should be truncated, based on the glog Verbosity.
  1133. func truncateBody(logger klog.Logger, body string) string {
  1134. max := 0
  1135. switch {
  1136. case bool(logger.V(10).Enabled()):
  1137. return body
  1138. case bool(logger.V(9).Enabled()):
  1139. max = 10240
  1140. case bool(logger.V(8).Enabled()):
  1141. max = 1024
  1142. }
  1143. if len(body) <= max {
  1144. return body
  1145. }
  1146. return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
  1147. }
  1148. // logBody logs a body output that could be either JSON or protobuf. It explicitly guards against
  1149. // allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
  1150. // whether the body is printable.
  1151. //
  1152. // It needs to be called by all functions which send or receive the data.
  1153. func logBody(logger klog.Logger, callDepth int, prefix string, body []byte) {
  1154. if loggerV := logger.V(8); loggerV.Enabled() {
  1155. loggerV := loggerV.WithCallDepth(callDepth)
  1156. if bytes.IndexFunc(body, func(r rune) bool {
  1157. return r < 0x0a
  1158. }) != -1 {
  1159. loggerV.Info(prefix, "body", truncateBody(logger, hex.Dump(body)))
  1160. } else {
  1161. loggerV.Info(prefix, "body", truncateBody(logger, string(body)))
  1162. }
  1163. }
  1164. }
  1165. // maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
  1166. const maxUnstructuredResponseTextBytes = 2048
  1167. // transformUnstructuredResponseError handles an error from the server that is not in a structured form.
  1168. // It is expected to transform any response that is not recognizable as a clear server sent error from the
  1169. // K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
  1170. // introduce a level of uncertainty to the responses returned by servers that in common use result in
  1171. // unexpected responses. The rough structure is:
  1172. //
  1173. // 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
  1174. // - this is the happy path
  1175. // - when you get this output, trust what the server sends
  1176. // 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
  1177. // generate a reasonable facsimile of the original failure.
  1178. // - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
  1179. // 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
  1180. // 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
  1181. // initial contact, the presence of mismatched body contents from posted content types
  1182. // - Give these a separate distinct error type and capture as much as possible of the original message
  1183. //
  1184. // TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
  1185. func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
  1186. if body == nil && resp.Body != nil {
  1187. if data, err := io.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
  1188. body = data
  1189. }
  1190. }
  1191. retryAfter, _ := retryAfterSeconds(resp)
  1192. return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
  1193. }
  1194. // newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
  1195. func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
  1196. // cap the amount of output we create
  1197. if len(body) > maxUnstructuredResponseTextBytes {
  1198. body = body[:maxUnstructuredResponseTextBytes]
  1199. }
  1200. message := "unknown"
  1201. if isTextResponse {
  1202. message = strings.TrimSpace(string(body))
  1203. }
  1204. var groupResource schema.GroupResource
  1205. if len(r.resource) > 0 {
  1206. groupResource.Group = r.contentConfig.GroupVersion.Group
  1207. groupResource.Resource = r.resource
  1208. }
  1209. return errors.NewGenericServerResponse(
  1210. statusCode,
  1211. method,
  1212. groupResource,
  1213. r.resourceName,
  1214. message,
  1215. retryAfter,
  1216. true,
  1217. )
  1218. }
  1219. // isTextResponse returns true if the response appears to be a textual media type.
  1220. func isTextResponse(resp *http.Response) bool {
  1221. contentType := resp.Header.Get("Content-Type")
  1222. if len(contentType) == 0 {
  1223. return true
  1224. }
  1225. media, _, err := mime.ParseMediaType(contentType)
  1226. if err != nil {
  1227. return false
  1228. }
  1229. return strings.HasPrefix(media, "text/")
  1230. }
  1231. // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
  1232. // the header was missing or not a valid number.
  1233. func retryAfterSeconds(resp *http.Response) (int, bool) {
  1234. if h := resp.Header.Get("Retry-After"); len(h) > 0 {
  1235. if i, err := strconv.Atoi(h); err == nil {
  1236. return i, true
  1237. }
  1238. }
  1239. return 0, false
  1240. }
  1241. // Result contains the result of calling Request.Do().
  1242. type Result struct {
  1243. body []byte
  1244. warnings []net.WarningHeader
  1245. contentType string
  1246. err error
  1247. statusCode int
  1248. logger klog.Logger
  1249. decoder runtime.Decoder
  1250. }
  1251. // Raw returns the raw result.
  1252. func (r Result) Raw() ([]byte, error) {
  1253. return r.body, r.err
  1254. }
  1255. // Get returns the result as an object, which means it passes through the decoder.
  1256. // If the returned object is of type Status and has .Status != StatusSuccess, the
  1257. // additional information in Status will be used to enrich the error.
  1258. func (r Result) Get() (runtime.Object, error) {
  1259. if r.err != nil {
  1260. // Check whether the result has a Status object in the body and prefer that.
  1261. return nil, r.Error()
  1262. }
  1263. if r.decoder == nil {
  1264. return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  1265. }
  1266. // decode, but if the result is Status return that as an error instead.
  1267. out, _, err := r.decoder.Decode(r.body, nil, nil)
  1268. if err != nil {
  1269. return nil, err
  1270. }
  1271. switch t := out.(type) {
  1272. case *metav1.Status:
  1273. // any status besides StatusSuccess is considered an error.
  1274. if t.Status != metav1.StatusSuccess {
  1275. return nil, errors.FromObject(t)
  1276. }
  1277. }
  1278. return out, nil
  1279. }
  1280. // StatusCode returns the HTTP status code of the request. (Only valid if no
  1281. // error was returned.)
  1282. func (r Result) StatusCode(statusCode *int) Result {
  1283. *statusCode = r.statusCode
  1284. return r
  1285. }
  1286. // ContentType returns the "Content-Type" response header into the passed
  1287. // string, returning the Result for possible chaining. (Only valid if no
  1288. // error code was returned.)
  1289. func (r Result) ContentType(contentType *string) Result {
  1290. *contentType = r.contentType
  1291. return r
  1292. }
  1293. // Into stores the result into obj, if possible. If obj is nil it is ignored.
  1294. // If the returned object is of type Status and has .Status != StatusSuccess, the
  1295. // additional information in Status will be used to enrich the error.
  1296. func (r Result) Into(obj runtime.Object) error {
  1297. if r.err != nil {
  1298. // Check whether the result has a Status object in the body and prefer that.
  1299. return r.Error()
  1300. }
  1301. if r.decoder == nil {
  1302. return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  1303. }
  1304. if len(r.body) == 0 {
  1305. return fmt.Errorf("0-length response with status code: %d and content type: %s",
  1306. r.statusCode, r.contentType)
  1307. }
  1308. out, _, err := r.decoder.Decode(r.body, nil, obj)
  1309. if err != nil || out == obj {
  1310. return err
  1311. }
  1312. // if a different object is returned, see if it is Status and avoid double decoding
  1313. // the object.
  1314. switch t := out.(type) {
  1315. case *metav1.Status:
  1316. // any status besides StatusSuccess is considered an error.
  1317. if t.Status != metav1.StatusSuccess {
  1318. return errors.FromObject(t)
  1319. }
  1320. }
  1321. return nil
  1322. }
  1323. // WasCreated updates the provided bool pointer to whether the server returned
  1324. // 201 created or a different response.
  1325. func (r Result) WasCreated(wasCreated *bool) Result {
  1326. *wasCreated = r.statusCode == http.StatusCreated
  1327. return r
  1328. }
  1329. // Error returns the error executing the request, nil if no error occurred.
  1330. // If the returned object is of type Status and has Status != StatusSuccess, the
  1331. // additional information in Status will be used to enrich the error.
  1332. // See the Request.Do() comment for what errors you might get.
  1333. func (r Result) Error() error {
  1334. // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
  1335. // a Status object.
  1336. if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
  1337. return r.err
  1338. }
  1339. // attempt to convert the body into a Status object
  1340. // to be backwards compatible with old servers that do not return a version, default to "v1"
  1341. out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
  1342. if err != nil {
  1343. r.logger.V(5).Info("Body was not decodable (unable to check for Status)", "err", err)
  1344. return r.err
  1345. }
  1346. switch t := out.(type) {
  1347. case *metav1.Status:
  1348. // because we default the kind, we *must* check for StatusFailure
  1349. if t.Status == metav1.StatusFailure {
  1350. return errors.FromObject(t)
  1351. }
  1352. }
  1353. return r.err
  1354. }
  1355. // Warnings returns any warning headers received in the response
  1356. func (r Result) Warnings() []net.WarningHeader {
  1357. return r.warnings
  1358. }
  1359. // NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
  1360. var NameMayNotBe = []string{".", ".."}
  1361. // NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
  1362. var NameMayNotContain = []string{"/", "%"}
  1363. // IsValidPathSegmentName validates the name can be safely encoded as a path segment
  1364. func IsValidPathSegmentName(name string) []string {
  1365. for _, illegalName := range NameMayNotBe {
  1366. if name == illegalName {
  1367. return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
  1368. }
  1369. }
  1370. var errors []string
  1371. for _, illegalContent := range NameMayNotContain {
  1372. if strings.Contains(name, illegalContent) {
  1373. errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
  1374. }
  1375. }
  1376. return errors
  1377. }
  1378. // IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
  1379. // It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
  1380. func IsValidPathSegmentPrefix(name string) []string {
  1381. var errors []string
  1382. for _, illegalContent := range NameMayNotContain {
  1383. if strings.Contains(name, illegalContent) {
  1384. errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
  1385. }
  1386. }
  1387. return errors
  1388. }
  1389. // ValidatePathSegmentName validates the name can be safely encoded as a path segment
  1390. func ValidatePathSegmentName(name string, prefix bool) []string {
  1391. if prefix {
  1392. return IsValidPathSegmentPrefix(name)
  1393. }
  1394. return IsValidPathSegmentName(name)
  1395. }