decoder.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package yaml
  14. import (
  15. "bufio"
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "strings"
  21. "unicode"
  22. jsonutil "k8s.io/apimachinery/pkg/util/json"
  23. "sigs.k8s.io/yaml"
  24. )
  25. // Unmarshal unmarshals the given data
  26. // If v is a *map[string]interface{}, *[]interface{}, or *interface{} numbers
  27. // are converted to int64 or float64
  28. func Unmarshal(data []byte, v interface{}) error {
  29. preserveIntFloat := func(d *json.Decoder) *json.Decoder {
  30. d.UseNumber()
  31. return d
  32. }
  33. switch v := v.(type) {
  34. case *map[string]interface{}:
  35. if err := yaml.Unmarshal(data, v, preserveIntFloat); err != nil {
  36. return err
  37. }
  38. return jsonutil.ConvertMapNumbers(*v, 0)
  39. case *[]interface{}:
  40. if err := yaml.Unmarshal(data, v, preserveIntFloat); err != nil {
  41. return err
  42. }
  43. return jsonutil.ConvertSliceNumbers(*v, 0)
  44. case *interface{}:
  45. if err := yaml.Unmarshal(data, v, preserveIntFloat); err != nil {
  46. return err
  47. }
  48. return jsonutil.ConvertInterfaceNumbers(v, 0)
  49. default:
  50. return yaml.Unmarshal(data, v)
  51. }
  52. }
  53. // ToJSON converts a single YAML document into a JSON document
  54. // or returns an error. If the document appears to be JSON the
  55. // YAML decoding path is not used (so that error messages are
  56. // JSON specific).
  57. func ToJSON(data []byte) ([]byte, error) {
  58. if hasJSONPrefix(data) {
  59. return data, nil
  60. }
  61. return yaml.YAMLToJSON(data)
  62. }
  63. // YAMLToJSONDecoder decodes YAML documents from an io.Reader by
  64. // separating individual documents. It first converts the YAML
  65. // body to JSON, then unmarshals the JSON.
  66. type YAMLToJSONDecoder struct {
  67. reader Reader
  68. }
  69. // NewYAMLToJSONDecoder decodes YAML documents from the provided
  70. // stream in chunks by converting each document (as defined by
  71. // the YAML spec) into its own chunk, converting it to JSON via
  72. // yaml.YAMLToJSON, and then passing it to json.Decoder.
  73. func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
  74. reader := bufio.NewReader(r)
  75. return &YAMLToJSONDecoder{
  76. reader: NewYAMLReader(reader),
  77. }
  78. }
  79. // Decode reads a YAML document as JSON from the stream or returns
  80. // an error. The decoding rules match json.Unmarshal, not
  81. // yaml.Unmarshal.
  82. func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
  83. bytes, err := d.reader.Read()
  84. if err != nil && err != io.EOF {
  85. return err
  86. }
  87. if len(bytes) != 0 {
  88. err := yaml.Unmarshal(bytes, into)
  89. if err != nil {
  90. return YAMLSyntaxError{err}
  91. }
  92. }
  93. return err
  94. }
  95. // YAMLDecoder reads chunks of objects and returns ErrShortBuffer if
  96. // the data is not sufficient.
  97. type YAMLDecoder struct {
  98. r io.ReadCloser
  99. scanner *bufio.Scanner
  100. remaining []byte
  101. }
  102. // NewDocumentDecoder decodes YAML documents from the provided
  103. // stream in chunks by converting each document (as defined by
  104. // the YAML spec) into its own chunk. io.ErrShortBuffer will be
  105. // returned if the entire buffer could not be read to assist
  106. // the caller in framing the chunk.
  107. func NewDocumentDecoder(r io.ReadCloser) io.ReadCloser {
  108. scanner := bufio.NewScanner(r)
  109. // the size of initial allocation for buffer 4k
  110. buf := make([]byte, 4*1024)
  111. // the maximum size used to buffer a token 5M
  112. scanner.Buffer(buf, 5*1024*1024)
  113. scanner.Split(splitYAMLDocument)
  114. return &YAMLDecoder{
  115. r: r,
  116. scanner: scanner,
  117. }
  118. }
  119. // Read reads the previous slice into the buffer, or attempts to read
  120. // the next chunk.
  121. // TODO: switch to readline approach.
  122. func (d *YAMLDecoder) Read(data []byte) (n int, err error) {
  123. left := len(d.remaining)
  124. if left == 0 {
  125. // return the next chunk from the stream
  126. if !d.scanner.Scan() {
  127. err := d.scanner.Err()
  128. if err == nil {
  129. err = io.EOF
  130. }
  131. return 0, err
  132. }
  133. out := d.scanner.Bytes()
  134. d.remaining = out
  135. left = len(out)
  136. }
  137. // fits within data
  138. if left <= len(data) {
  139. copy(data, d.remaining)
  140. d.remaining = nil
  141. return left, nil
  142. }
  143. // caller will need to reread
  144. copy(data, d.remaining[:len(data)])
  145. d.remaining = d.remaining[len(data):]
  146. return len(data), io.ErrShortBuffer
  147. }
  148. func (d *YAMLDecoder) Close() error {
  149. return d.r.Close()
  150. }
  151. const yamlSeparator = "\n---"
  152. const separator = "---"
  153. // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
  154. func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
  155. if atEOF && len(data) == 0 {
  156. return 0, nil, nil
  157. }
  158. sep := len([]byte(yamlSeparator))
  159. if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
  160. // We have a potential document terminator
  161. i += sep
  162. after := data[i:]
  163. if len(after) == 0 {
  164. // we can't read any more characters
  165. if atEOF {
  166. return len(data), data[:len(data)-sep], nil
  167. }
  168. return 0, nil, nil
  169. }
  170. if j := bytes.IndexByte(after, '\n'); j >= 0 {
  171. return i + j + 1, data[0 : i-sep], nil
  172. }
  173. return 0, nil, nil
  174. }
  175. // If we're at EOF, we have a final, non-terminated line. Return it.
  176. if atEOF {
  177. return len(data), data, nil
  178. }
  179. // Request more data.
  180. return 0, nil, nil
  181. }
  182. // decoder is a convenience interface for Decode.
  183. type decoder interface {
  184. Decode(into interface{}) error
  185. }
  186. // YAMLOrJSONDecoder attempts to decode a stream of JSON documents or
  187. // YAML documents by sniffing for a leading { character.
  188. type YAMLOrJSONDecoder struct {
  189. r io.Reader
  190. bufferSize int
  191. decoder decoder
  192. }
  193. type JSONSyntaxError struct {
  194. Offset int64
  195. Err error
  196. }
  197. func (e JSONSyntaxError) Error() string {
  198. return fmt.Sprintf("json: offset %d: %s", e.Offset, e.Err.Error())
  199. }
  200. type YAMLSyntaxError struct {
  201. err error
  202. }
  203. func (e YAMLSyntaxError) Error() string {
  204. return e.err.Error()
  205. }
  206. // NewYAMLOrJSONDecoder returns a decoder that will process YAML documents
  207. // or JSON documents from the given reader as a stream. bufferSize determines
  208. // how far into the stream the decoder will look to figure out whether this
  209. // is a JSON stream (has whitespace followed by an open brace).
  210. func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder {
  211. return &YAMLOrJSONDecoder{
  212. r: r,
  213. bufferSize: bufferSize,
  214. }
  215. }
  216. // Decode unmarshals the next object from the underlying stream into the
  217. // provide object, or returns an error.
  218. func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
  219. if d.decoder == nil {
  220. buffer, _, isJSON := GuessJSONStream(d.r, d.bufferSize)
  221. if isJSON {
  222. d.decoder = json.NewDecoder(buffer)
  223. } else {
  224. d.decoder = NewYAMLToJSONDecoder(buffer)
  225. }
  226. }
  227. err := d.decoder.Decode(into)
  228. if syntax, ok := err.(*json.SyntaxError); ok {
  229. return JSONSyntaxError{
  230. Offset: syntax.Offset,
  231. Err: syntax,
  232. }
  233. }
  234. return err
  235. }
  236. type Reader interface {
  237. Read() ([]byte, error)
  238. }
  239. type YAMLReader struct {
  240. reader Reader
  241. }
  242. func NewYAMLReader(r *bufio.Reader) *YAMLReader {
  243. return &YAMLReader{
  244. reader: &LineReader{reader: r},
  245. }
  246. }
  247. // Read returns a full YAML document.
  248. func (r *YAMLReader) Read() ([]byte, error) {
  249. var buffer bytes.Buffer
  250. for {
  251. line, err := r.reader.Read()
  252. if err != nil && err != io.EOF {
  253. return nil, err
  254. }
  255. sep := len([]byte(separator))
  256. if i := bytes.Index(line, []byte(separator)); i == 0 {
  257. // We have a potential document terminator
  258. i += sep
  259. after := line[i:]
  260. if len(strings.TrimRightFunc(string(after), unicode.IsSpace)) == 0 {
  261. if buffer.Len() != 0 {
  262. return buffer.Bytes(), nil
  263. }
  264. if err == io.EOF {
  265. return nil, err
  266. }
  267. }
  268. }
  269. if err == io.EOF {
  270. if buffer.Len() != 0 {
  271. // If we're at EOF, we have a final, non-terminated line. Return it.
  272. return buffer.Bytes(), nil
  273. }
  274. return nil, err
  275. }
  276. buffer.Write(line)
  277. }
  278. }
  279. type LineReader struct {
  280. reader *bufio.Reader
  281. }
  282. // Read returns a single line (with '\n' ended) from the underlying reader.
  283. // An error is returned iff there is an error with the underlying reader.
  284. func (r *LineReader) Read() ([]byte, error) {
  285. var (
  286. isPrefix bool = true
  287. err error = nil
  288. line []byte
  289. buffer bytes.Buffer
  290. )
  291. for isPrefix && err == nil {
  292. line, isPrefix, err = r.reader.ReadLine()
  293. buffer.Write(line)
  294. }
  295. buffer.WriteByte('\n')
  296. return buffer.Bytes(), err
  297. }
  298. // GuessJSONStream scans the provided reader up to size, looking
  299. // for an open brace indicating this is JSON. It will return the
  300. // bufio.Reader it creates for the consumer.
  301. func GuessJSONStream(r io.Reader, size int) (io.Reader, []byte, bool) {
  302. buffer := bufio.NewReaderSize(r, size)
  303. b, _ := buffer.Peek(size)
  304. return buffer, b, hasJSONPrefix(b)
  305. }
  306. // IsJSONBuffer scans the provided buffer, looking
  307. // for an open brace indicating this is JSON.
  308. func IsJSONBuffer(buf []byte) bool {
  309. return hasJSONPrefix(buf)
  310. }
  311. var jsonPrefix = []byte("{")
  312. // hasJSONPrefix returns true if the provided buffer appears to start with
  313. // a JSON open brace.
  314. func hasJSONPrefix(buf []byte) bool {
  315. return hasPrefix(buf, jsonPrefix)
  316. }
  317. // Return true if the first non-whitespace bytes in buf is
  318. // prefix.
  319. func hasPrefix(buf []byte, prefix []byte) bool {
  320. trim := bytes.TrimLeftFunc(buf, unicode.IsSpace)
  321. return bytes.HasPrefix(trim, prefix)
  322. }