stream_reader.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. /*
  2. Copyright 2025 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package yaml
  14. import "io"
  15. // StreamReader is a reader designed for consuming streams of variable-length
  16. // messages. It buffers data until it is explicitly consumed, and can be
  17. // rewound to re-read previous data.
  18. type StreamReader struct {
  19. r io.Reader
  20. buf []byte
  21. head int // current read offset into buf
  22. ttlConsumed int // number of bytes which have been consumed
  23. }
  24. // NewStreamReader creates a new StreamReader wrapping the provided
  25. // io.Reader.
  26. func NewStreamReader(r io.Reader, size int) *StreamReader {
  27. if size == 0 {
  28. size = 4096
  29. }
  30. return &StreamReader{
  31. r: r,
  32. buf: make([]byte, 0, size), // Start with a reasonable capacity
  33. }
  34. }
  35. // Read implements io.Reader. It first returns any buffered data after the
  36. // current offset, and if that's exhausted, reads from the underlying reader
  37. // and buffers the data. The returned data is not considered consumed until the
  38. // Consume method is called.
  39. func (r *StreamReader) Read(p []byte) (n int, err error) {
  40. // If we have buffered data, return it
  41. if r.head < len(r.buf) {
  42. n = copy(p, r.buf[r.head:])
  43. r.head += n
  44. return n, nil
  45. }
  46. // If we've already hit EOF, return it
  47. if r.r == nil {
  48. return 0, io.EOF
  49. }
  50. // Read from the underlying reader
  51. n, err = r.r.Read(p)
  52. if n > 0 {
  53. r.buf = append(r.buf, p[:n]...)
  54. r.head += n
  55. }
  56. if err == nil {
  57. return n, nil
  58. }
  59. if err == io.EOF {
  60. // Store that we've hit EOF by setting r to nil
  61. r.r = nil
  62. }
  63. return n, err
  64. }
  65. // ReadN reads exactly n bytes from the reader, blocking until all bytes are
  66. // read or an error occurs. If an error occurs, the number of bytes read is
  67. // returned along with the error. If EOF is hit before n bytes are read, this
  68. // will return the bytes read so far, along with io.EOF. The returned data is
  69. // not considered consumed until the Consume method is called.
  70. func (r *StreamReader) ReadN(want int) ([]byte, error) {
  71. ret := make([]byte, want)
  72. off := 0
  73. for off < want {
  74. n, err := r.Read(ret[off:])
  75. if err != nil {
  76. return ret[:off+n], err
  77. }
  78. off += n
  79. }
  80. return ret, nil
  81. }
  82. // Peek returns the next n bytes without advancing the reader. The returned
  83. // bytes are valid until the next call to Consume.
  84. func (r *StreamReader) Peek(n int) ([]byte, error) {
  85. buf, err := r.ReadN(n)
  86. r.RewindN(len(buf))
  87. if err != nil {
  88. return buf, err
  89. }
  90. return buf, nil
  91. }
  92. // Rewind resets the reader to the beginning of the buffered data.
  93. func (r *StreamReader) Rewind() {
  94. r.head = 0
  95. }
  96. // RewindN rewinds the reader by n bytes. If n is greater than the current
  97. // buffer, the reader is rewound to the beginning of the buffer.
  98. func (r *StreamReader) RewindN(n int) {
  99. r.head -= min(n, r.head)
  100. }
  101. // Consume discards up to n bytes of previously read data from the beginning of
  102. // the buffer. Once consumed, that data is no longer available for rewinding.
  103. // If n is greater than the current buffer, the buffer is cleared. Consume
  104. // never consume data from the underlying reader.
  105. func (r *StreamReader) Consume(n int) {
  106. n = min(n, len(r.buf))
  107. r.buf = r.buf[n:]
  108. r.head -= n
  109. r.ttlConsumed += n
  110. }
  111. // Consumed returns the number of bytes consumed from the input reader.
  112. func (r *StreamReader) Consumed() int {
  113. return r.ttlConsumed
  114. }