decoder.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package yaml
  14. import (
  15. "bufio"
  16. "bytes"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "strings"
  22. "unicode"
  23. "unicode/utf8"
  24. jsonutil "k8s.io/apimachinery/pkg/util/json"
  25. "sigs.k8s.io/yaml"
  26. )
  27. // Unmarshal unmarshals the given data
  28. // If v is a *map[string]interface{}, *[]interface{}, or *interface{} numbers
  29. // are converted to int64 or float64
  30. func Unmarshal(data []byte, v interface{}) error {
  31. preserveIntFloat := func(d *json.Decoder) *json.Decoder {
  32. d.UseNumber()
  33. return d
  34. }
  35. switch v := v.(type) {
  36. case *map[string]interface{}:
  37. if err := yaml.Unmarshal(data, v, preserveIntFloat); err != nil {
  38. return err
  39. }
  40. return jsonutil.ConvertMapNumbers(*v, 0)
  41. case *[]interface{}:
  42. if err := yaml.Unmarshal(data, v, preserveIntFloat); err != nil {
  43. return err
  44. }
  45. return jsonutil.ConvertSliceNumbers(*v, 0)
  46. case *interface{}:
  47. if err := yaml.Unmarshal(data, v, preserveIntFloat); err != nil {
  48. return err
  49. }
  50. return jsonutil.ConvertInterfaceNumbers(v, 0)
  51. default:
  52. return yaml.Unmarshal(data, v)
  53. }
  54. }
  55. // UnmarshalStrict unmarshals the given data
  56. // strictly (erroring when there are duplicate fields).
  57. func UnmarshalStrict(data []byte, v interface{}) error {
  58. preserveIntFloat := func(d *json.Decoder) *json.Decoder {
  59. d.UseNumber()
  60. return d
  61. }
  62. switch v := v.(type) {
  63. case *map[string]interface{}:
  64. if err := yaml.UnmarshalStrict(data, v, preserveIntFloat); err != nil {
  65. return err
  66. }
  67. return jsonutil.ConvertMapNumbers(*v, 0)
  68. case *[]interface{}:
  69. if err := yaml.UnmarshalStrict(data, v, preserveIntFloat); err != nil {
  70. return err
  71. }
  72. return jsonutil.ConvertSliceNumbers(*v, 0)
  73. case *interface{}:
  74. if err := yaml.UnmarshalStrict(data, v, preserveIntFloat); err != nil {
  75. return err
  76. }
  77. return jsonutil.ConvertInterfaceNumbers(v, 0)
  78. default:
  79. return yaml.UnmarshalStrict(data, v)
  80. }
  81. }
  82. // ToJSON converts a single YAML document into a JSON document
  83. // or returns an error. If the document appears to be JSON the
  84. // YAML decoding path is not used (so that error messages are
  85. // JSON specific).
  86. func ToJSON(data []byte) ([]byte, error) {
  87. if IsJSONBuffer(data) {
  88. return data, nil
  89. }
  90. return yaml.YAMLToJSON(data)
  91. }
  92. // YAMLToJSONDecoder decodes YAML documents from an io.Reader by
  93. // separating individual documents. It first converts the YAML
  94. // body to JSON, then unmarshals the JSON.
  95. type YAMLToJSONDecoder struct {
  96. reader Reader
  97. inputOffset int
  98. }
  99. // NewYAMLToJSONDecoder decodes YAML documents from the provided
  100. // stream in chunks by converting each document (as defined by
  101. // the YAML spec) into its own chunk, converting it to JSON via
  102. // yaml.YAMLToJSON, and then passing it to json.Decoder.
  103. func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
  104. reader := bufio.NewReader(r)
  105. return &YAMLToJSONDecoder{
  106. reader: NewYAMLReader(reader),
  107. }
  108. }
  109. // Decode reads a YAML document as JSON from the stream or returns
  110. // an error. The decoding rules match json.Unmarshal, not
  111. // yaml.Unmarshal.
  112. func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
  113. bytes, err := d.reader.Read()
  114. if err != nil && err != io.EOF { //nolint:errorlint
  115. return err
  116. }
  117. if len(bytes) != 0 {
  118. err := yaml.Unmarshal(bytes, into)
  119. if err != nil {
  120. return YAMLSyntaxError{err}
  121. }
  122. }
  123. d.inputOffset += len(bytes)
  124. return err
  125. }
  126. func (d *YAMLToJSONDecoder) InputOffset() int {
  127. return d.inputOffset
  128. }
  129. // YAMLDecoder reads chunks of objects and returns ErrShortBuffer if
  130. // the data is not sufficient.
  131. type YAMLDecoder struct {
  132. r io.ReadCloser
  133. scanner *bufio.Scanner
  134. remaining []byte
  135. }
  136. // NewDocumentDecoder decodes YAML documents from the provided
  137. // stream in chunks by converting each document (as defined by
  138. // the YAML spec) into its own chunk. io.ErrShortBuffer will be
  139. // returned if the entire buffer could not be read to assist
  140. // the caller in framing the chunk.
  141. func NewDocumentDecoder(r io.ReadCloser) io.ReadCloser {
  142. scanner := bufio.NewScanner(r)
  143. // the size of initial allocation for buffer 4k
  144. buf := make([]byte, 4*1024)
  145. // the maximum size used to buffer a token 5M
  146. scanner.Buffer(buf, 5*1024*1024)
  147. scanner.Split(splitYAMLDocument)
  148. return &YAMLDecoder{
  149. r: r,
  150. scanner: scanner,
  151. }
  152. }
  153. // Read reads the previous slice into the buffer, or attempts to read
  154. // the next chunk.
  155. // TODO: switch to readline approach.
  156. func (d *YAMLDecoder) Read(data []byte) (n int, err error) {
  157. left := len(d.remaining)
  158. if left == 0 {
  159. // return the next chunk from the stream
  160. if !d.scanner.Scan() {
  161. err := d.scanner.Err()
  162. if err == nil {
  163. err = io.EOF
  164. }
  165. return 0, err
  166. }
  167. out := d.scanner.Bytes()
  168. d.remaining = out
  169. left = len(out)
  170. }
  171. // fits within data
  172. if left <= len(data) {
  173. copy(data, d.remaining)
  174. d.remaining = nil
  175. return left, nil
  176. }
  177. // caller will need to reread
  178. copy(data, d.remaining[:len(data)])
  179. d.remaining = d.remaining[len(data):]
  180. return len(data), io.ErrShortBuffer
  181. }
  182. func (d *YAMLDecoder) Close() error {
  183. return d.r.Close()
  184. }
  185. const yamlSeparator = "\n---"
  186. const separator = "---"
  187. // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
  188. func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
  189. if atEOF && len(data) == 0 {
  190. return 0, nil, nil
  191. }
  192. sep := len([]byte(yamlSeparator))
  193. if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
  194. // We have a potential document terminator
  195. i += sep
  196. after := data[i:]
  197. if len(after) == 0 {
  198. // we can't read any more characters
  199. if atEOF {
  200. return len(data), data[:len(data)-sep], nil
  201. }
  202. return 0, nil, nil
  203. }
  204. if j := bytes.IndexByte(after, '\n'); j >= 0 {
  205. return i + j + 1, data[0 : i-sep], nil
  206. }
  207. return 0, nil, nil
  208. }
  209. // If we're at EOF, we have a final, non-terminated line. Return it.
  210. if atEOF {
  211. return len(data), data, nil
  212. }
  213. // Request more data.
  214. return 0, nil, nil
  215. }
  216. // YAMLOrJSONDecoder attempts to decode a stream of JSON or YAML documents.
  217. // While JSON is YAML, the way Go's JSON decode defines a multi-document stream
  218. // is a series of JSON objects (e.g. {}{}), but YAML defines a multi-document
  219. // stream as a series of documents separated by "---".
  220. //
  221. // This decoder will attempt to decode the stream as JSON first, and if that
  222. // fails, it will switch to YAML. Once it determines the stream is JSON (by
  223. // finding a non-YAML-delimited series of objects), it will not switch to YAML.
  224. // Once it switches to YAML it will not switch back to JSON.
  225. type YAMLOrJSONDecoder struct {
  226. json *json.Decoder
  227. jsonConsumed int64 // of the stream total, how much was JSON?
  228. yaml *YAMLToJSONDecoder
  229. yamlConsumed int64 // of the stream total, how much was YAML?
  230. stream *StreamReader
  231. count int // how many objects have been decoded
  232. }
  233. type JSONSyntaxError struct {
  234. Offset int64
  235. Err error
  236. }
  237. func (e JSONSyntaxError) Error() string {
  238. return fmt.Sprintf("json: offset %d: %s", e.Offset, e.Err.Error())
  239. }
  240. type YAMLSyntaxError struct {
  241. err error
  242. }
  243. func (e YAMLSyntaxError) Error() string {
  244. return e.err.Error()
  245. }
  246. // NewYAMLOrJSONDecoder returns a decoder that will process YAML documents
  247. // or JSON documents from the given reader as a stream. bufferSize determines
  248. // how far into the stream the decoder will look to figure out whether this
  249. // is a JSON stream (has whitespace followed by an open brace).
  250. func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder {
  251. d := &YAMLOrJSONDecoder{}
  252. reader, _, mightBeJSON := GuessJSONStream(r, bufferSize)
  253. d.stream = reader
  254. if mightBeJSON {
  255. d.json = json.NewDecoder(reader)
  256. } else {
  257. d.yaml = NewYAMLToJSONDecoder(reader)
  258. }
  259. return d
  260. }
  261. // Decode unmarshals the next object from the underlying stream into the
  262. // provide object, or returns an error.
  263. func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
  264. // Because we don't know if this is a JSON or YAML stream, a failure from
  265. // both decoders is ambiguous. When in doubt, it will return the error from
  266. // the JSON decoder. Unfortunately, this means that if the first document
  267. // is invalid YAML, the error won't be awesome.
  268. // TODO: the errors from YAML are not great, we could improve them a lot.
  269. var firstErr error
  270. if d.json != nil {
  271. err := d.json.Decode(into)
  272. if err == nil {
  273. d.count++
  274. consumed := d.json.InputOffset() - d.jsonConsumed
  275. d.stream.Consume(int(consumed))
  276. d.jsonConsumed += consumed
  277. return nil
  278. }
  279. if err == io.EOF { //nolint:errorlint
  280. return err
  281. }
  282. var syntax *json.SyntaxError
  283. if ok := errors.As(err, &syntax); ok {
  284. firstErr = JSONSyntaxError{
  285. Offset: syntax.Offset,
  286. Err: syntax,
  287. }
  288. } else {
  289. firstErr = err
  290. }
  291. if d.count > 1 {
  292. // If we found 0 or 1 JSON object(s), this stream is still
  293. // ambiguous. But if we found more than 1 JSON object, then this
  294. // is an unambiguous JSON stream, and we should not switch to YAML.
  295. return err
  296. }
  297. // If JSON decoding hits the end of one object and then fails on the
  298. // next, it leaves any leading whitespace in the buffer, which can
  299. // confuse the YAML decoder. We just eat any whitespace we find, up to
  300. // and including the first newline.
  301. d.stream.Rewind()
  302. if err := d.consumeWhitespace(); err == nil {
  303. d.yaml = NewYAMLToJSONDecoder(d.stream)
  304. }
  305. d.json = nil
  306. }
  307. if d.yaml != nil {
  308. err := d.yaml.Decode(into)
  309. if err == nil {
  310. d.count++
  311. consumed := int64(d.yaml.InputOffset()) - d.yamlConsumed
  312. d.stream.Consume(int(consumed))
  313. d.yamlConsumed += consumed
  314. return nil
  315. }
  316. if err == io.EOF { //nolint:errorlint
  317. return err
  318. }
  319. if firstErr == nil {
  320. firstErr = err
  321. }
  322. }
  323. if firstErr != nil {
  324. return firstErr
  325. }
  326. return fmt.Errorf("decoding failed as both JSON and YAML")
  327. }
  328. func (d *YAMLOrJSONDecoder) consumeWhitespace() error {
  329. consumed := 0
  330. for {
  331. buf, err := d.stream.ReadN(4)
  332. if err != nil && err == io.EOF { //nolint:errorlint
  333. return err
  334. }
  335. r, sz := utf8.DecodeRune(buf)
  336. if r == utf8.RuneError || sz == 0 {
  337. return fmt.Errorf("invalid utf8 rune")
  338. }
  339. d.stream.RewindN(len(buf) - sz)
  340. if !unicode.IsSpace(r) {
  341. d.stream.RewindN(sz)
  342. d.stream.Consume(consumed)
  343. return nil
  344. }
  345. consumed += sz
  346. if r == '\n' {
  347. d.stream.Consume(consumed)
  348. return nil
  349. }
  350. if err == io.EOF { //nolint:errorlint
  351. break
  352. }
  353. }
  354. return io.EOF
  355. }
  356. type Reader interface {
  357. Read() ([]byte, error)
  358. }
  359. type YAMLReader struct {
  360. reader Reader
  361. }
  362. func NewYAMLReader(r *bufio.Reader) *YAMLReader {
  363. return &YAMLReader{
  364. reader: &LineReader{reader: r},
  365. }
  366. }
  367. // Read returns a full YAML document.
  368. func (r *YAMLReader) Read() ([]byte, error) {
  369. var buffer bytes.Buffer
  370. for {
  371. line, err := r.reader.Read()
  372. if err != nil && err != io.EOF { //nolint:errorlint
  373. return nil, err
  374. }
  375. sep := len([]byte(separator))
  376. if i := bytes.Index(line, []byte(separator)); i == 0 {
  377. // We have a potential document terminator
  378. i += sep
  379. trimmed := strings.TrimSpace(string(line[i:]))
  380. // We only allow comments and spaces following the yaml doc separator, otherwise we'll return an error
  381. if len(trimmed) > 0 && string(trimmed[0]) != "#" {
  382. return nil, YAMLSyntaxError{
  383. err: fmt.Errorf("invalid Yaml document separator: %s", trimmed),
  384. }
  385. }
  386. if buffer.Len() != 0 {
  387. return buffer.Bytes(), nil
  388. }
  389. if err == io.EOF { //nolint:errorlint
  390. return nil, err
  391. }
  392. }
  393. if err == io.EOF { //nolint:errorlint
  394. if buffer.Len() != 0 {
  395. // If we're at EOF, we have a final, non-terminated line. Return it.
  396. return buffer.Bytes(), nil
  397. }
  398. return nil, err
  399. }
  400. buffer.Write(line)
  401. }
  402. }
  403. type LineReader struct {
  404. reader *bufio.Reader
  405. }
  406. // Read returns a single line (with '\n' ended) from the underlying reader.
  407. // An error is returned iff there is an error with the underlying reader.
  408. func (r *LineReader) Read() ([]byte, error) {
  409. var (
  410. isPrefix bool = true
  411. err error = nil
  412. line []byte
  413. buffer bytes.Buffer
  414. )
  415. for isPrefix && err == nil {
  416. line, isPrefix, err = r.reader.ReadLine()
  417. buffer.Write(line)
  418. }
  419. buffer.WriteByte('\n')
  420. return buffer.Bytes(), err
  421. }
  422. // GuessJSONStream scans the provided reader up to size, looking
  423. // for an open brace indicating this is JSON. It will return the
  424. // bufio.Reader it creates for the consumer.
  425. func GuessJSONStream(r io.Reader, size int) (*StreamReader, []byte, bool) {
  426. buffer := NewStreamReader(r, size)
  427. b, _ := buffer.Peek(size)
  428. return buffer, b, IsJSONBuffer(b)
  429. }
  430. // IsJSONBuffer scans the provided buffer, looking
  431. // for an open brace indicating this is JSON.
  432. func IsJSONBuffer(buf []byte) bool {
  433. return hasPrefix(buf, jsonPrefix)
  434. }
  435. var jsonPrefix = []byte("{")
  436. // Return true if the first non-whitespace bytes in buf is
  437. // prefix.
  438. func hasPrefix(buf []byte, prefix []byte) bool {
  439. trim := bytes.TrimLeftFunc(buf, unicode.IsSpace)
  440. return bytes.HasPrefix(trim, prefix)
  441. }