request.go 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package rest
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/hex"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "mime"
  22. "net/http"
  23. "net/url"
  24. "path"
  25. "reflect"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "golang.org/x/net/http2"
  31. "k8s.io/apimachinery/pkg/api/errors"
  32. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  33. "k8s.io/apimachinery/pkg/runtime"
  34. "k8s.io/apimachinery/pkg/runtime/schema"
  35. "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
  36. utilclock "k8s.io/apimachinery/pkg/util/clock"
  37. "k8s.io/apimachinery/pkg/util/net"
  38. "k8s.io/apimachinery/pkg/watch"
  39. restclientwatch "k8s.io/client-go/rest/watch"
  40. "k8s.io/client-go/tools/metrics"
  41. "k8s.io/client-go/util/flowcontrol"
  42. "k8s.io/klog/v2"
  43. )
  44. var (
  45. // longThrottleLatency defines threshold for logging requests. All requests being
  46. // throttled (via the provided rateLimiter) for more than longThrottleLatency will
  47. // be logged.
  48. longThrottleLatency = 50 * time.Millisecond
  49. // extraLongThrottleLatency defines the threshold for logging requests at log level 2.
  50. extraLongThrottleLatency = 1 * time.Second
  51. )
  52. // HTTPClient is an interface for testing a request object.
  53. type HTTPClient interface {
  54. Do(req *http.Request) (*http.Response, error)
  55. }
  56. // ResponseWrapper is an interface for getting a response.
  57. // The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
  58. type ResponseWrapper interface {
  59. DoRaw(context.Context) ([]byte, error)
  60. Stream(context.Context) (io.ReadCloser, error)
  61. }
  62. // RequestConstructionError is returned when there's an error assembling a request.
  63. type RequestConstructionError struct {
  64. Err error
  65. }
  66. // Error returns a textual description of 'r'.
  67. func (r *RequestConstructionError) Error() string {
  68. return fmt.Sprintf("request construction error: '%v'", r.Err)
  69. }
  70. var noBackoff = &NoBackoff{}
  71. // Request allows for building up a request to a server in a chained fashion.
  72. // Any errors are stored until the end of your call, so you only have to
  73. // check once.
  74. type Request struct {
  75. c *RESTClient
  76. warningHandler WarningHandler
  77. rateLimiter flowcontrol.RateLimiter
  78. backoff BackoffManager
  79. timeout time.Duration
  80. maxRetries int
  81. // generic components accessible via method setters
  82. verb string
  83. pathPrefix string
  84. subpath string
  85. params url.Values
  86. headers http.Header
  87. // structural elements of the request that are part of the Kubernetes API conventions
  88. namespace string
  89. namespaceSet bool
  90. resource string
  91. resourceName string
  92. subresource string
  93. // output
  94. err error
  95. body io.Reader
  96. }
  97. // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
  98. func NewRequest(c *RESTClient) *Request {
  99. var backoff BackoffManager
  100. if c.createBackoffMgr != nil {
  101. backoff = c.createBackoffMgr()
  102. }
  103. if backoff == nil {
  104. backoff = noBackoff
  105. }
  106. var pathPrefix string
  107. if c.base != nil {
  108. pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
  109. } else {
  110. pathPrefix = path.Join("/", c.versionedAPIPath)
  111. }
  112. var timeout time.Duration
  113. if c.Client != nil {
  114. timeout = c.Client.Timeout
  115. }
  116. r := &Request{
  117. c: c,
  118. rateLimiter: c.rateLimiter,
  119. backoff: backoff,
  120. timeout: timeout,
  121. pathPrefix: pathPrefix,
  122. maxRetries: 10,
  123. warningHandler: c.warningHandler,
  124. }
  125. switch {
  126. case len(c.content.AcceptContentTypes) > 0:
  127. r.SetHeader("Accept", c.content.AcceptContentTypes)
  128. case len(c.content.ContentType) > 0:
  129. r.SetHeader("Accept", c.content.ContentType+", */*")
  130. }
  131. return r
  132. }
  133. // NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
  134. func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
  135. return NewRequest(&RESTClient{
  136. base: base,
  137. versionedAPIPath: versionedAPIPath,
  138. content: content,
  139. Client: client,
  140. })
  141. }
  142. // Verb sets the verb this request will use.
  143. func (r *Request) Verb(verb string) *Request {
  144. r.verb = verb
  145. return r
  146. }
  147. // Prefix adds segments to the relative beginning to the request path. These
  148. // items will be placed before the optional Namespace, Resource, or Name sections.
  149. // Setting AbsPath will clear any previously set Prefix segments
  150. func (r *Request) Prefix(segments ...string) *Request {
  151. if r.err != nil {
  152. return r
  153. }
  154. r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
  155. return r
  156. }
  157. // Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
  158. // Namespace, Resource, or Name sections.
  159. func (r *Request) Suffix(segments ...string) *Request {
  160. if r.err != nil {
  161. return r
  162. }
  163. r.subpath = path.Join(r.subpath, path.Join(segments...))
  164. return r
  165. }
  166. // Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
  167. func (r *Request) Resource(resource string) *Request {
  168. if r.err != nil {
  169. return r
  170. }
  171. if len(r.resource) != 0 {
  172. r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
  173. return r
  174. }
  175. if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
  176. r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
  177. return r
  178. }
  179. r.resource = resource
  180. return r
  181. }
  182. // BackOff sets the request's backoff manager to the one specified,
  183. // or defaults to the stub implementation if nil is provided
  184. func (r *Request) BackOff(manager BackoffManager) *Request {
  185. if manager == nil {
  186. r.backoff = &NoBackoff{}
  187. return r
  188. }
  189. r.backoff = manager
  190. return r
  191. }
  192. // WarningHandler sets the handler this client uses when warning headers are encountered.
  193. // If set to nil, this client will use the default warning handler (see SetDefaultWarningHandler).
  194. func (r *Request) WarningHandler(handler WarningHandler) *Request {
  195. r.warningHandler = handler
  196. return r
  197. }
  198. // Throttle receives a rate-limiter and sets or replaces an existing request limiter
  199. func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
  200. r.rateLimiter = limiter
  201. return r
  202. }
  203. // SubResource sets a sub-resource path which can be multiple segments after the resource
  204. // name but before the suffix.
  205. func (r *Request) SubResource(subresources ...string) *Request {
  206. if r.err != nil {
  207. return r
  208. }
  209. subresource := path.Join(subresources...)
  210. if len(r.subresource) != 0 {
  211. r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.subresource, subresource)
  212. return r
  213. }
  214. for _, s := range subresources {
  215. if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
  216. r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
  217. return r
  218. }
  219. }
  220. r.subresource = subresource
  221. return r
  222. }
  223. // Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
  224. func (r *Request) Name(resourceName string) *Request {
  225. if r.err != nil {
  226. return r
  227. }
  228. if len(resourceName) == 0 {
  229. r.err = fmt.Errorf("resource name may not be empty")
  230. return r
  231. }
  232. if len(r.resourceName) != 0 {
  233. r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
  234. return r
  235. }
  236. if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
  237. r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
  238. return r
  239. }
  240. r.resourceName = resourceName
  241. return r
  242. }
  243. // Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
  244. func (r *Request) Namespace(namespace string) *Request {
  245. if r.err != nil {
  246. return r
  247. }
  248. if r.namespaceSet {
  249. r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
  250. return r
  251. }
  252. if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
  253. r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
  254. return r
  255. }
  256. r.namespaceSet = true
  257. r.namespace = namespace
  258. return r
  259. }
  260. // NamespaceIfScoped is a convenience function to set a namespace if scoped is true
  261. func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
  262. if scoped {
  263. return r.Namespace(namespace)
  264. }
  265. return r
  266. }
  267. // AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
  268. // when a single segment is passed.
  269. func (r *Request) AbsPath(segments ...string) *Request {
  270. if r.err != nil {
  271. return r
  272. }
  273. r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
  274. if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
  275. // preserve any trailing slashes for legacy behavior
  276. r.pathPrefix += "/"
  277. }
  278. return r
  279. }
  280. // RequestURI overwrites existing path and parameters with the value of the provided server relative
  281. // URI.
  282. func (r *Request) RequestURI(uri string) *Request {
  283. if r.err != nil {
  284. return r
  285. }
  286. locator, err := url.Parse(uri)
  287. if err != nil {
  288. r.err = err
  289. return r
  290. }
  291. r.pathPrefix = locator.Path
  292. if len(locator.Query()) > 0 {
  293. if r.params == nil {
  294. r.params = make(url.Values)
  295. }
  296. for k, v := range locator.Query() {
  297. r.params[k] = v
  298. }
  299. }
  300. return r
  301. }
  302. // Param creates a query parameter with the given string value.
  303. func (r *Request) Param(paramName, s string) *Request {
  304. if r.err != nil {
  305. return r
  306. }
  307. return r.setParam(paramName, s)
  308. }
  309. // VersionedParams will take the provided object, serialize it to a map[string][]string using the
  310. // implicit RESTClient API version and the default parameter codec, and then add those as parameters
  311. // to the request. Use this to provide versioned query parameters from client libraries.
  312. // VersionedParams will not write query parameters that have omitempty set and are empty. If a
  313. // parameter has already been set it is appended to (Params and VersionedParams are additive).
  314. func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
  315. return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
  316. }
  317. func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
  318. if r.err != nil {
  319. return r
  320. }
  321. params, err := codec.EncodeParameters(obj, version)
  322. if err != nil {
  323. r.err = err
  324. return r
  325. }
  326. for k, v := range params {
  327. if r.params == nil {
  328. r.params = make(url.Values)
  329. }
  330. r.params[k] = append(r.params[k], v...)
  331. }
  332. return r
  333. }
  334. func (r *Request) setParam(paramName, value string) *Request {
  335. if r.params == nil {
  336. r.params = make(url.Values)
  337. }
  338. r.params[paramName] = append(r.params[paramName], value)
  339. return r
  340. }
  341. func (r *Request) SetHeader(key string, values ...string) *Request {
  342. if r.headers == nil {
  343. r.headers = http.Header{}
  344. }
  345. r.headers.Del(key)
  346. for _, value := range values {
  347. r.headers.Add(key, value)
  348. }
  349. return r
  350. }
  351. // Timeout makes the request use the given duration as an overall timeout for the
  352. // request. Additionally, if set passes the value as "timeout" parameter in URL.
  353. func (r *Request) Timeout(d time.Duration) *Request {
  354. if r.err != nil {
  355. return r
  356. }
  357. r.timeout = d
  358. return r
  359. }
  360. // MaxRetries makes the request use the given integer as a ceiling of retrying upon receiving
  361. // "Retry-After" headers and 429 status-code in the response. The default is 10 unless this
  362. // function is specifically called with a different value.
  363. // A zero maxRetries prevent it from doing retires and return an error immediately.
  364. func (r *Request) MaxRetries(maxRetries int) *Request {
  365. if maxRetries < 0 {
  366. maxRetries = 0
  367. }
  368. r.maxRetries = maxRetries
  369. return r
  370. }
  371. // Body makes the request use obj as the body. Optional.
  372. // If obj is a string, try to read a file of that name.
  373. // If obj is a []byte, send it directly.
  374. // If obj is an io.Reader, use it directly.
  375. // If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
  376. // If obj is a runtime.Object and nil, do nothing.
  377. // Otherwise, set an error.
  378. func (r *Request) Body(obj interface{}) *Request {
  379. if r.err != nil {
  380. return r
  381. }
  382. switch t := obj.(type) {
  383. case string:
  384. data, err := ioutil.ReadFile(t)
  385. if err != nil {
  386. r.err = err
  387. return r
  388. }
  389. glogBody("Request Body", data)
  390. r.body = bytes.NewReader(data)
  391. case []byte:
  392. glogBody("Request Body", t)
  393. r.body = bytes.NewReader(t)
  394. case io.Reader:
  395. r.body = t
  396. case runtime.Object:
  397. // callers may pass typed interface pointers, therefore we must check nil with reflection
  398. if reflect.ValueOf(t).IsNil() {
  399. return r
  400. }
  401. encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
  402. if err != nil {
  403. r.err = err
  404. return r
  405. }
  406. data, err := runtime.Encode(encoder, t)
  407. if err != nil {
  408. r.err = err
  409. return r
  410. }
  411. glogBody("Request Body", data)
  412. r.body = bytes.NewReader(data)
  413. r.SetHeader("Content-Type", r.c.content.ContentType)
  414. default:
  415. r.err = fmt.Errorf("unknown type used for body: %+v", obj)
  416. }
  417. return r
  418. }
  419. // URL returns the current working URL.
  420. func (r *Request) URL() *url.URL {
  421. p := r.pathPrefix
  422. if r.namespaceSet && len(r.namespace) > 0 {
  423. p = path.Join(p, "namespaces", r.namespace)
  424. }
  425. if len(r.resource) != 0 {
  426. p = path.Join(p, strings.ToLower(r.resource))
  427. }
  428. // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
  429. if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
  430. p = path.Join(p, r.resourceName, r.subresource, r.subpath)
  431. }
  432. finalURL := &url.URL{}
  433. if r.c.base != nil {
  434. *finalURL = *r.c.base
  435. }
  436. finalURL.Path = p
  437. query := url.Values{}
  438. for key, values := range r.params {
  439. for _, value := range values {
  440. query.Add(key, value)
  441. }
  442. }
  443. // timeout is handled specially here.
  444. if r.timeout != 0 {
  445. query.Set("timeout", r.timeout.String())
  446. }
  447. finalURL.RawQuery = query.Encode()
  448. return finalURL
  449. }
  450. // finalURLTemplate is similar to URL(), but will make all specific parameter values equal
  451. // - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
  452. // parameters will be reset. This creates a copy of the url so as not to change the
  453. // underlying object.
  454. func (r Request) finalURLTemplate() url.URL {
  455. newParams := url.Values{}
  456. v := []string{"{value}"}
  457. for k := range r.params {
  458. newParams[k] = v
  459. }
  460. r.params = newParams
  461. url := r.URL()
  462. segments := strings.Split(url.Path, "/")
  463. groupIndex := 0
  464. index := 0
  465. trimmedBasePath := ""
  466. if url != nil && r.c.base != nil && strings.Contains(url.Path, r.c.base.Path) {
  467. p := strings.TrimPrefix(url.Path, r.c.base.Path)
  468. if !strings.HasPrefix(p, "/") {
  469. p = "/" + p
  470. }
  471. // store the base path that we have trimmed so we can append it
  472. // before returning the URL
  473. trimmedBasePath = r.c.base.Path
  474. segments = strings.Split(p, "/")
  475. groupIndex = 1
  476. }
  477. if len(segments) <= 2 {
  478. return *url
  479. }
  480. const CoreGroupPrefix = "api"
  481. const NamedGroupPrefix = "apis"
  482. isCoreGroup := segments[groupIndex] == CoreGroupPrefix
  483. isNamedGroup := segments[groupIndex] == NamedGroupPrefix
  484. if isCoreGroup {
  485. // checking the case of core group with /api/v1/... format
  486. index = groupIndex + 2
  487. } else if isNamedGroup {
  488. // checking the case of named group with /apis/apps/v1/... format
  489. index = groupIndex + 3
  490. } else {
  491. // this should not happen that the only two possibilities are /api... and /apis..., just want to put an
  492. // outlet here in case more API groups are added in future if ever possible:
  493. // https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
  494. // if a wrong API groups name is encountered, return the {prefix} for url.Path
  495. url.Path = "/{prefix}"
  496. url.RawQuery = ""
  497. return *url
  498. }
  499. //switch segLength := len(segments) - index; segLength {
  500. switch {
  501. // case len(segments) - index == 1:
  502. // resource (with no name) do nothing
  503. case len(segments)-index == 2:
  504. // /$RESOURCE/$NAME: replace $NAME with {name}
  505. segments[index+1] = "{name}"
  506. case len(segments)-index == 3:
  507. if segments[index+2] == "finalize" || segments[index+2] == "status" {
  508. // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
  509. segments[index+1] = "{name}"
  510. } else {
  511. // /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
  512. segments[index+1] = "{namespace}"
  513. }
  514. case len(segments)-index >= 4:
  515. segments[index+1] = "{namespace}"
  516. // /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
  517. if segments[index+3] != "finalize" && segments[index+3] != "status" {
  518. // /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
  519. segments[index+3] = "{name}"
  520. }
  521. }
  522. url.Path = path.Join(trimmedBasePath, path.Join(segments...))
  523. return *url
  524. }
  525. func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) error {
  526. if r.rateLimiter == nil {
  527. return nil
  528. }
  529. now := time.Now()
  530. err := r.rateLimiter.Wait(ctx)
  531. latency := time.Since(now)
  532. var message string
  533. switch {
  534. case len(retryInfo) > 0:
  535. message = fmt.Sprintf("Waited for %v, %s - request: %s:%s", latency, retryInfo, r.verb, r.URL().String())
  536. default:
  537. message = fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s:%s", latency, r.verb, r.URL().String())
  538. }
  539. if latency > longThrottleLatency {
  540. klog.V(3).Info(message)
  541. }
  542. if latency > extraLongThrottleLatency {
  543. // If the rate limiter latency is very high, the log message should be printed at a higher log level,
  544. // but we use a throttled logger to prevent spamming.
  545. globalThrottledLogger.Infof(message)
  546. }
  547. metrics.RateLimiterLatency.Observe(ctx, r.verb, r.finalURLTemplate(), latency)
  548. return err
  549. }
  550. func (r *Request) tryThrottle(ctx context.Context) error {
  551. return r.tryThrottleWithInfo(ctx, "")
  552. }
  553. type throttleSettings struct {
  554. logLevel klog.Level
  555. minLogInterval time.Duration
  556. lastLogTime time.Time
  557. lock sync.RWMutex
  558. }
  559. type throttledLogger struct {
  560. clock utilclock.PassiveClock
  561. settings []*throttleSettings
  562. }
  563. var globalThrottledLogger = &throttledLogger{
  564. clock: utilclock.RealClock{},
  565. settings: []*throttleSettings{
  566. {
  567. logLevel: 2,
  568. minLogInterval: 1 * time.Second,
  569. }, {
  570. logLevel: 0,
  571. minLogInterval: 10 * time.Second,
  572. },
  573. },
  574. }
  575. func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
  576. for _, setting := range b.settings {
  577. if bool(klog.V(setting.logLevel).Enabled()) {
  578. // Return early without write locking if possible.
  579. if func() bool {
  580. setting.lock.RLock()
  581. defer setting.lock.RUnlock()
  582. return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
  583. }() {
  584. setting.lock.Lock()
  585. defer setting.lock.Unlock()
  586. if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
  587. setting.lastLogTime = b.clock.Now()
  588. return setting.logLevel, true
  589. }
  590. }
  591. return -1, false
  592. }
  593. }
  594. return -1, false
  595. }
  596. // Infof will write a log message at each logLevel specified by the receiver's throttleSettings
  597. // as long as it hasn't written a log message more recently than minLogInterval.
  598. func (b *throttledLogger) Infof(message string, args ...interface{}) {
  599. if logLevel, ok := b.attemptToLog(); ok {
  600. klog.V(logLevel).Infof(message, args...)
  601. }
  602. }
  603. // Watch attempts to begin watching the requested location.
  604. // Returns a watch.Interface, or an error.
  605. func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
  606. // We specifically don't want to rate limit watches, so we
  607. // don't use r.rateLimiter here.
  608. if r.err != nil {
  609. return nil, r.err
  610. }
  611. url := r.URL().String()
  612. req, err := http.NewRequest(r.verb, url, r.body)
  613. if err != nil {
  614. return nil, err
  615. }
  616. req = req.WithContext(ctx)
  617. req.Header = r.headers
  618. client := r.c.Client
  619. if client == nil {
  620. client = http.DefaultClient
  621. }
  622. r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
  623. resp, err := client.Do(req)
  624. updateURLMetrics(ctx, r, resp, err)
  625. if r.c.base != nil {
  626. if err != nil {
  627. r.backoff.UpdateBackoff(r.c.base, err, 0)
  628. } else {
  629. r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
  630. }
  631. }
  632. if err != nil {
  633. // The watch stream mechanism handles many common partial data errors, so closed
  634. // connections can be retried in many cases.
  635. if net.IsProbableEOF(err) || net.IsTimeout(err) {
  636. return watch.NewEmptyWatch(), nil
  637. }
  638. return nil, err
  639. }
  640. if resp.StatusCode != http.StatusOK {
  641. defer resp.Body.Close()
  642. if result := r.transformResponse(resp, req); result.err != nil {
  643. return nil, result.err
  644. }
  645. return nil, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
  646. }
  647. contentType := resp.Header.Get("Content-Type")
  648. mediaType, params, err := mime.ParseMediaType(contentType)
  649. if err != nil {
  650. klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
  651. }
  652. objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
  653. if err != nil {
  654. return nil, err
  655. }
  656. handleWarnings(resp.Header, r.warningHandler)
  657. frameReader := framer.NewFrameReader(resp.Body)
  658. watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
  659. return watch.NewStreamWatcher(
  660. restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
  661. // use 500 to indicate that the cause of the error is unknown - other error codes
  662. // are more specific to HTTP interactions, and set a reason
  663. errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
  664. ), nil
  665. }
  666. // updateURLMetrics is a convenience function for pushing metrics.
  667. // It also handles corner cases for incomplete/invalid request data.
  668. func updateURLMetrics(ctx context.Context, req *Request, resp *http.Response, err error) {
  669. url := "none"
  670. if req.c.base != nil {
  671. url = req.c.base.Host
  672. }
  673. // Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
  674. // system so we just report them as `<error>`.
  675. if err != nil {
  676. metrics.RequestResult.Increment(ctx, "<error>", req.verb, url)
  677. } else {
  678. //Metrics for failure codes
  679. metrics.RequestResult.Increment(ctx, strconv.Itoa(resp.StatusCode), req.verb, url)
  680. }
  681. }
  682. // Stream formats and executes the request, and offers streaming of the response.
  683. // Returns io.ReadCloser which could be used for streaming of the response, or an error
  684. // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
  685. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
  686. func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
  687. if r.err != nil {
  688. return nil, r.err
  689. }
  690. if err := r.tryThrottle(ctx); err != nil {
  691. return nil, err
  692. }
  693. url := r.URL().String()
  694. req, err := http.NewRequest(r.verb, url, nil)
  695. if err != nil {
  696. return nil, err
  697. }
  698. if r.body != nil {
  699. req.Body = ioutil.NopCloser(r.body)
  700. }
  701. req = req.WithContext(ctx)
  702. req.Header = r.headers
  703. client := r.c.Client
  704. if client == nil {
  705. client = http.DefaultClient
  706. }
  707. r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
  708. resp, err := client.Do(req)
  709. updateURLMetrics(ctx, r, resp, err)
  710. if r.c.base != nil {
  711. if err != nil {
  712. r.backoff.UpdateBackoff(r.URL(), err, 0)
  713. } else {
  714. r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
  715. }
  716. }
  717. if err != nil {
  718. return nil, err
  719. }
  720. switch {
  721. case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
  722. handleWarnings(resp.Header, r.warningHandler)
  723. return resp.Body, nil
  724. default:
  725. // ensure we close the body before returning the error
  726. defer resp.Body.Close()
  727. result := r.transformResponse(resp, req)
  728. err := result.Error()
  729. if err == nil {
  730. err = fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
  731. }
  732. return nil, err
  733. }
  734. }
  735. // requestPreflightCheck looks for common programmer errors on Request.
  736. //
  737. // We tackle here two programmer mistakes. The first one is to try to create
  738. // something(POST) using an empty string as namespace with namespaceSet as
  739. // true. If namespaceSet is true then namespace should also be defined. The
  740. // second mistake is, when under the same circumstances, the programmer tries
  741. // to GET, PUT or DELETE a named resource(resourceName != ""), again, if
  742. // namespaceSet is true then namespace must not be empty.
  743. func (r *Request) requestPreflightCheck() error {
  744. if !r.namespaceSet {
  745. return nil
  746. }
  747. if len(r.namespace) > 0 {
  748. return nil
  749. }
  750. switch r.verb {
  751. case "POST":
  752. return fmt.Errorf("an empty namespace may not be set during creation")
  753. case "GET", "PUT", "DELETE":
  754. if len(r.resourceName) > 0 {
  755. return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
  756. }
  757. }
  758. return nil
  759. }
  760. // request connects to the server and invokes the provided function when a server response is
  761. // received. It handles retry behavior and up front validation of requests. It will invoke
  762. // fn at most once. It will return an error if a problem occurred prior to connecting to the
  763. // server - the provided function is responsible for handling server errors.
  764. func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
  765. //Metrics for total request latency
  766. start := time.Now()
  767. defer func() {
  768. metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
  769. }()
  770. if r.err != nil {
  771. klog.V(4).Infof("Error in request: %v", r.err)
  772. return r.err
  773. }
  774. if err := r.requestPreflightCheck(); err != nil {
  775. return err
  776. }
  777. client := r.c.Client
  778. if client == nil {
  779. client = http.DefaultClient
  780. }
  781. // Throttle the first try before setting up the timeout configured on the
  782. // client. We don't want a throttled client to return timeouts to callers
  783. // before it makes a single request.
  784. if err := r.tryThrottle(ctx); err != nil {
  785. return err
  786. }
  787. if r.timeout > 0 {
  788. var cancel context.CancelFunc
  789. ctx, cancel = context.WithTimeout(ctx, r.timeout)
  790. defer cancel()
  791. }
  792. // Right now we make about ten retry attempts if we get a Retry-After response.
  793. retries := 0
  794. var retryInfo string
  795. for {
  796. url := r.URL().String()
  797. req, err := http.NewRequest(r.verb, url, r.body)
  798. if err != nil {
  799. return err
  800. }
  801. req = req.WithContext(ctx)
  802. req.Header = r.headers
  803. r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
  804. if retries > 0 {
  805. // We are retrying the request that we already send to apiserver
  806. // at least once before.
  807. // This request should also be throttled with the client-internal rate limiter.
  808. if err := r.tryThrottleWithInfo(ctx, retryInfo); err != nil {
  809. return err
  810. }
  811. retryInfo = ""
  812. }
  813. resp, err := client.Do(req)
  814. updateURLMetrics(ctx, r, resp, err)
  815. if err != nil {
  816. r.backoff.UpdateBackoff(r.URL(), err, 0)
  817. } else {
  818. r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
  819. }
  820. if err != nil {
  821. // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
  822. // Thus in case of "GET" operations, we simply retry it.
  823. // We are not automatically retrying "write" operations, as
  824. // they are not idempotent.
  825. if r.verb != "GET" {
  826. return err
  827. }
  828. // For connection errors and apiserver shutdown errors retry.
  829. if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
  830. // For the purpose of retry, we set the artificial "retry-after" response.
  831. // TODO: Should we clean the original response if it exists?
  832. resp = &http.Response{
  833. StatusCode: http.StatusInternalServerError,
  834. Header: http.Header{"Retry-After": []string{"1"}},
  835. Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
  836. }
  837. } else {
  838. return err
  839. }
  840. }
  841. done := func() bool {
  842. // Ensure the response body is fully read and closed
  843. // before we reconnect, so that we reuse the same TCP
  844. // connection.
  845. defer func() {
  846. const maxBodySlurpSize = 2 << 10
  847. if resp.ContentLength <= maxBodySlurpSize {
  848. io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
  849. }
  850. resp.Body.Close()
  851. }()
  852. retries++
  853. if seconds, wait := checkWait(resp); wait && retries <= r.maxRetries {
  854. retryInfo = getRetryReason(retries, seconds, resp, err)
  855. if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
  856. _, err := seeker.Seek(0, 0)
  857. if err != nil {
  858. klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
  859. fn(req, resp)
  860. return true
  861. }
  862. }
  863. klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
  864. r.backoff.Sleep(time.Duration(seconds) * time.Second)
  865. return false
  866. }
  867. fn(req, resp)
  868. return true
  869. }()
  870. if done {
  871. return nil
  872. }
  873. }
  874. }
  875. // Do formats and executes the request. Returns a Result object for easy response
  876. // processing.
  877. //
  878. // Error type:
  879. // * If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
  880. // * http.Client.Do errors are returned directly.
  881. func (r *Request) Do(ctx context.Context) Result {
  882. var result Result
  883. err := r.request(ctx, func(req *http.Request, resp *http.Response) {
  884. result = r.transformResponse(resp, req)
  885. })
  886. if err != nil {
  887. return Result{err: err}
  888. }
  889. return result
  890. }
  891. // DoRaw executes the request but does not process the response body.
  892. func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
  893. var result Result
  894. err := r.request(ctx, func(req *http.Request, resp *http.Response) {
  895. result.body, result.err = ioutil.ReadAll(resp.Body)
  896. glogBody("Response Body", result.body)
  897. if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
  898. result.err = r.transformUnstructuredResponseError(resp, req, result.body)
  899. }
  900. })
  901. if err != nil {
  902. return nil, err
  903. }
  904. return result.body, result.err
  905. }
  906. // transformResponse converts an API response into a structured API object
  907. func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
  908. var body []byte
  909. if resp.Body != nil {
  910. data, err := ioutil.ReadAll(resp.Body)
  911. switch err.(type) {
  912. case nil:
  913. body = data
  914. case http2.StreamError:
  915. // This is trying to catch the scenario that the server may close the connection when sending the
  916. // response body. This can be caused by server timeout due to a slow network connection.
  917. // TODO: Add test for this. Steps may be:
  918. // 1. client-go (or kubectl) sends a GET request.
  919. // 2. Apiserver sends back the headers and then part of the body
  920. // 3. Apiserver closes connection.
  921. // 4. client-go should catch this and return an error.
  922. klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
  923. streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %v", err)
  924. return Result{
  925. err: streamErr,
  926. }
  927. default:
  928. klog.Errorf("Unexpected error when reading response body: %v", err)
  929. unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %v", err)
  930. return Result{
  931. err: unexpectedErr,
  932. }
  933. }
  934. }
  935. glogBody("Response Body", body)
  936. // verify the content type is accurate
  937. var decoder runtime.Decoder
  938. contentType := resp.Header.Get("Content-Type")
  939. if len(contentType) == 0 {
  940. contentType = r.c.content.ContentType
  941. }
  942. if len(contentType) > 0 {
  943. var err error
  944. mediaType, params, err := mime.ParseMediaType(contentType)
  945. if err != nil {
  946. return Result{err: errors.NewInternalError(err)}
  947. }
  948. decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
  949. if err != nil {
  950. // if we fail to negotiate a decoder, treat this as an unstructured error
  951. switch {
  952. case resp.StatusCode == http.StatusSwitchingProtocols:
  953. // no-op, we've been upgraded
  954. case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  955. return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
  956. }
  957. return Result{
  958. body: body,
  959. contentType: contentType,
  960. statusCode: resp.StatusCode,
  961. warnings: handleWarnings(resp.Header, r.warningHandler),
  962. }
  963. }
  964. }
  965. switch {
  966. case resp.StatusCode == http.StatusSwitchingProtocols:
  967. // no-op, we've been upgraded
  968. case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  969. // calculate an unstructured error from the response which the Result object may use if the caller
  970. // did not return a structured error.
  971. retryAfter, _ := retryAfterSeconds(resp)
  972. err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
  973. return Result{
  974. body: body,
  975. contentType: contentType,
  976. statusCode: resp.StatusCode,
  977. decoder: decoder,
  978. err: err,
  979. warnings: handleWarnings(resp.Header, r.warningHandler),
  980. }
  981. }
  982. return Result{
  983. body: body,
  984. contentType: contentType,
  985. statusCode: resp.StatusCode,
  986. decoder: decoder,
  987. warnings: handleWarnings(resp.Header, r.warningHandler),
  988. }
  989. }
  990. // truncateBody decides if the body should be truncated, based on the glog Verbosity.
  991. func truncateBody(body string) string {
  992. max := 0
  993. switch {
  994. case bool(klog.V(10).Enabled()):
  995. return body
  996. case bool(klog.V(9).Enabled()):
  997. max = 10240
  998. case bool(klog.V(8).Enabled()):
  999. max = 1024
  1000. }
  1001. if len(body) <= max {
  1002. return body
  1003. }
  1004. return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
  1005. }
  1006. // glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
  1007. // allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
  1008. // whether the body is printable.
  1009. func glogBody(prefix string, body []byte) {
  1010. if klog.V(8).Enabled() {
  1011. if bytes.IndexFunc(body, func(r rune) bool {
  1012. return r < 0x0a
  1013. }) != -1 {
  1014. klog.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
  1015. } else {
  1016. klog.Infof("%s: %s", prefix, truncateBody(string(body)))
  1017. }
  1018. }
  1019. }
  1020. // maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
  1021. const maxUnstructuredResponseTextBytes = 2048
  1022. // transformUnstructuredResponseError handles an error from the server that is not in a structured form.
  1023. // It is expected to transform any response that is not recognizable as a clear server sent error from the
  1024. // K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
  1025. // introduce a level of uncertainty to the responses returned by servers that in common use result in
  1026. // unexpected responses. The rough structure is:
  1027. //
  1028. // 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
  1029. // - this is the happy path
  1030. // - when you get this output, trust what the server sends
  1031. // 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
  1032. // generate a reasonable facsimile of the original failure.
  1033. // - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
  1034. // 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
  1035. // 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
  1036. // initial contact, the presence of mismatched body contents from posted content types
  1037. // - Give these a separate distinct error type and capture as much as possible of the original message
  1038. //
  1039. // TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
  1040. func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
  1041. if body == nil && resp.Body != nil {
  1042. if data, err := ioutil.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
  1043. body = data
  1044. }
  1045. }
  1046. retryAfter, _ := retryAfterSeconds(resp)
  1047. return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
  1048. }
  1049. // newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
  1050. func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
  1051. // cap the amount of output we create
  1052. if len(body) > maxUnstructuredResponseTextBytes {
  1053. body = body[:maxUnstructuredResponseTextBytes]
  1054. }
  1055. message := "unknown"
  1056. if isTextResponse {
  1057. message = strings.TrimSpace(string(body))
  1058. }
  1059. var groupResource schema.GroupResource
  1060. if len(r.resource) > 0 {
  1061. groupResource.Group = r.c.content.GroupVersion.Group
  1062. groupResource.Resource = r.resource
  1063. }
  1064. return errors.NewGenericServerResponse(
  1065. statusCode,
  1066. method,
  1067. groupResource,
  1068. r.resourceName,
  1069. message,
  1070. retryAfter,
  1071. true,
  1072. )
  1073. }
  1074. // isTextResponse returns true if the response appears to be a textual media type.
  1075. func isTextResponse(resp *http.Response) bool {
  1076. contentType := resp.Header.Get("Content-Type")
  1077. if len(contentType) == 0 {
  1078. return true
  1079. }
  1080. media, _, err := mime.ParseMediaType(contentType)
  1081. if err != nil {
  1082. return false
  1083. }
  1084. return strings.HasPrefix(media, "text/")
  1085. }
  1086. // checkWait returns true along with a number of seconds if the server instructed us to wait
  1087. // before retrying.
  1088. func checkWait(resp *http.Response) (int, bool) {
  1089. switch r := resp.StatusCode; {
  1090. // any 500 error code and 429 can trigger a wait
  1091. case r == http.StatusTooManyRequests, r >= 500:
  1092. default:
  1093. return 0, false
  1094. }
  1095. i, ok := retryAfterSeconds(resp)
  1096. return i, ok
  1097. }
  1098. // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
  1099. // the header was missing or not a valid number.
  1100. func retryAfterSeconds(resp *http.Response) (int, bool) {
  1101. if h := resp.Header.Get("Retry-After"); len(h) > 0 {
  1102. if i, err := strconv.Atoi(h); err == nil {
  1103. return i, true
  1104. }
  1105. }
  1106. return 0, false
  1107. }
  1108. func getRetryReason(retries, seconds int, resp *http.Response, err error) string {
  1109. // priority and fairness sets the UID of the FlowSchema associated with a request
  1110. // in the following response Header.
  1111. const responseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID"
  1112. message := fmt.Sprintf("retries: %d, retry-after: %ds", retries, seconds)
  1113. switch {
  1114. case resp.StatusCode == http.StatusTooManyRequests:
  1115. // it is server-side throttling from priority and fairness
  1116. flowSchemaUID := resp.Header.Get(responseHeaderMatchedFlowSchemaUID)
  1117. return fmt.Sprintf("%s - retry-reason: due to server-side throttling, FlowSchema UID: %q", message, flowSchemaUID)
  1118. case err != nil:
  1119. // it's a retriable error
  1120. return fmt.Sprintf("%s - retry-reason: due to retriable error, error: %v", message, err)
  1121. default:
  1122. return fmt.Sprintf("%s - retry-reason: %d", message, resp.StatusCode)
  1123. }
  1124. }
  1125. // Result contains the result of calling Request.Do().
  1126. type Result struct {
  1127. body []byte
  1128. warnings []net.WarningHeader
  1129. contentType string
  1130. err error
  1131. statusCode int
  1132. decoder runtime.Decoder
  1133. }
  1134. // Raw returns the raw result.
  1135. func (r Result) Raw() ([]byte, error) {
  1136. return r.body, r.err
  1137. }
  1138. // Get returns the result as an object, which means it passes through the decoder.
  1139. // If the returned object is of type Status and has .Status != StatusSuccess, the
  1140. // additional information in Status will be used to enrich the error.
  1141. func (r Result) Get() (runtime.Object, error) {
  1142. if r.err != nil {
  1143. // Check whether the result has a Status object in the body and prefer that.
  1144. return nil, r.Error()
  1145. }
  1146. if r.decoder == nil {
  1147. return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  1148. }
  1149. // decode, but if the result is Status return that as an error instead.
  1150. out, _, err := r.decoder.Decode(r.body, nil, nil)
  1151. if err != nil {
  1152. return nil, err
  1153. }
  1154. switch t := out.(type) {
  1155. case *metav1.Status:
  1156. // any status besides StatusSuccess is considered an error.
  1157. if t.Status != metav1.StatusSuccess {
  1158. return nil, errors.FromObject(t)
  1159. }
  1160. }
  1161. return out, nil
  1162. }
  1163. // StatusCode returns the HTTP status code of the request. (Only valid if no
  1164. // error was returned.)
  1165. func (r Result) StatusCode(statusCode *int) Result {
  1166. *statusCode = r.statusCode
  1167. return r
  1168. }
  1169. // Into stores the result into obj, if possible. If obj is nil it is ignored.
  1170. // If the returned object is of type Status and has .Status != StatusSuccess, the
  1171. // additional information in Status will be used to enrich the error.
  1172. func (r Result) Into(obj runtime.Object) error {
  1173. if r.err != nil {
  1174. // Check whether the result has a Status object in the body and prefer that.
  1175. return r.Error()
  1176. }
  1177. if r.decoder == nil {
  1178. return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  1179. }
  1180. if len(r.body) == 0 {
  1181. return fmt.Errorf("0-length response with status code: %d and content type: %s",
  1182. r.statusCode, r.contentType)
  1183. }
  1184. out, _, err := r.decoder.Decode(r.body, nil, obj)
  1185. if err != nil || out == obj {
  1186. return err
  1187. }
  1188. // if a different object is returned, see if it is Status and avoid double decoding
  1189. // the object.
  1190. switch t := out.(type) {
  1191. case *metav1.Status:
  1192. // any status besides StatusSuccess is considered an error.
  1193. if t.Status != metav1.StatusSuccess {
  1194. return errors.FromObject(t)
  1195. }
  1196. }
  1197. return nil
  1198. }
  1199. // WasCreated updates the provided bool pointer to whether the server returned
  1200. // 201 created or a different response.
  1201. func (r Result) WasCreated(wasCreated *bool) Result {
  1202. *wasCreated = r.statusCode == http.StatusCreated
  1203. return r
  1204. }
  1205. // Error returns the error executing the request, nil if no error occurred.
  1206. // If the returned object is of type Status and has Status != StatusSuccess, the
  1207. // additional information in Status will be used to enrich the error.
  1208. // See the Request.Do() comment for what errors you might get.
  1209. func (r Result) Error() error {
  1210. // if we have received an unexpected server error, and we have a body and decoder, we can try to extract
  1211. // a Status object.
  1212. if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
  1213. return r.err
  1214. }
  1215. // attempt to convert the body into a Status object
  1216. // to be backwards compatible with old servers that do not return a version, default to "v1"
  1217. out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
  1218. if err != nil {
  1219. klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
  1220. return r.err
  1221. }
  1222. switch t := out.(type) {
  1223. case *metav1.Status:
  1224. // because we default the kind, we *must* check for StatusFailure
  1225. if t.Status == metav1.StatusFailure {
  1226. return errors.FromObject(t)
  1227. }
  1228. }
  1229. return r.err
  1230. }
  1231. // Warnings returns any warning headers received in the response
  1232. func (r Result) Warnings() []net.WarningHeader {
  1233. return r.warnings
  1234. }
  1235. // NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
  1236. var NameMayNotBe = []string{".", ".."}
  1237. // NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
  1238. var NameMayNotContain = []string{"/", "%"}
  1239. // IsValidPathSegmentName validates the name can be safely encoded as a path segment
  1240. func IsValidPathSegmentName(name string) []string {
  1241. for _, illegalName := range NameMayNotBe {
  1242. if name == illegalName {
  1243. return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
  1244. }
  1245. }
  1246. var errors []string
  1247. for _, illegalContent := range NameMayNotContain {
  1248. if strings.Contains(name, illegalContent) {
  1249. errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
  1250. }
  1251. }
  1252. return errors
  1253. }
  1254. // IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
  1255. // It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
  1256. func IsValidPathSegmentPrefix(name string) []string {
  1257. var errors []string
  1258. for _, illegalContent := range NameMayNotContain {
  1259. if strings.Contains(name, illegalContent) {
  1260. errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
  1261. }
  1262. }
  1263. return errors
  1264. }
  1265. // ValidatePathSegmentName validates the name can be safely encoded as a path segment
  1266. func ValidatePathSegmentName(name string, prefix bool) []string {
  1267. if prefix {
  1268. return IsValidPathSegmentPrefix(name)
  1269. }
  1270. return IsValidPathSegmentName(name)
  1271. }