|
|
@@ -62,6 +62,8 @@ var defaultS3Config = S3Config{
|
|
|
PartSize: 1024 * 1024 * 64, // 64MB.
|
|
|
}
|
|
|
|
|
|
+const defaultS3ReadChunkSize int64 = 8 * 1024 * 1024
|
|
|
+
|
|
|
// Config stores the configuration for s3 bucket.
|
|
|
type S3Config struct {
|
|
|
Bucket string `yaml:"bucket"`
|
|
|
@@ -299,6 +301,32 @@ func (s3 *S3Storage) Read(name string) ([]byte, error) {
|
|
|
|
|
|
}
|
|
|
|
|
|
+// ReadStream returns an io.ReadCloser that incrementally streams an object from S3
|
|
|
+// by issuing byte-range requests under the hood.
|
|
|
+func (s3 *S3Storage) ReadStream(path string) (io.ReadCloser, error) {
|
|
|
+ path = trimLeading(path)
|
|
|
+
|
|
|
+ log.Debugf("S3Storage::ReadStream::%s(%s)", s3.protocol(), path)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ sse, err := s3.getServerSideEncryption(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ objInfo, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse})
|
|
|
+ if err != nil {
|
|
|
+ if s3.isDoesNotExist(err) || s3.isObjNotFound(err) {
|
|
|
+ return nil, DoesNotExistError
|
|
|
+ }
|
|
|
+ return nil, errors.Wrap(err, "StatObject from S3 failed")
|
|
|
+ }
|
|
|
+
|
|
|
+ return newS3ChunkReader(objInfo.Size, defaultS3ReadChunkSize, func(off, length int64) ([]byte, error) {
|
|
|
+ return s3.getRange(ctx, path, off, length)
|
|
|
+ }), nil
|
|
|
+}
|
|
|
+
|
|
|
// ReadToLocalFile streams the specified object at path to destPath on the local file system.
|
|
|
func (s3 *S3Storage) ReadToLocalFile(path, destPath string) error {
|
|
|
path = trimLeading(path)
|
|
|
@@ -580,15 +608,10 @@ func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int6
|
|
|
}
|
|
|
|
|
|
opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
|
|
|
- if length != -1 {
|
|
|
- if err := opts.SetRange(off, off+length-1); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- } else if off > 0 {
|
|
|
- if err := opts.SetRange(off, 0); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
+ if err := setGetObjectRange(opts, off, length); err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
+
|
|
|
r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
|
|
|
if err != nil {
|
|
|
if s3.isObjNotFound(err) {
|
|
|
@@ -596,21 +619,132 @@ func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int6
|
|
|
}
|
|
|
return nil, err
|
|
|
}
|
|
|
- defer r.Close()
|
|
|
-
|
|
|
// NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
|
|
|
// for convenience.
|
|
|
if _, err := r.Read(nil); err != nil {
|
|
|
if s3.isObjNotFound(err) {
|
|
|
+ _ = r.Close()
|
|
|
return nil, DoesNotExistError
|
|
|
}
|
|
|
|
|
|
+ _ = r.Close()
|
|
|
return nil, errors.Wrap(err, "Read from S3 failed")
|
|
|
}
|
|
|
|
|
|
+ defer r.Close()
|
|
|
return io.ReadAll(r)
|
|
|
}
|
|
|
|
|
|
+func setGetObjectRange(opts *minio.GetObjectOptions, off, length int64) error {
|
|
|
+ if off < 0 {
|
|
|
+ return errors.New("range offset must be >= 0")
|
|
|
+ }
|
|
|
+ if length < -1 || length == 0 {
|
|
|
+ return errors.New("range length must be -1 or > 0")
|
|
|
+ }
|
|
|
+
|
|
|
+ if length > 0 {
|
|
|
+ return opts.SetRange(off, off+length-1)
|
|
|
+ }
|
|
|
+ if off > 0 {
|
|
|
+ return opts.SetRange(off, 0)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+type s3ChunkReader struct {
|
|
|
+ size int64
|
|
|
+ chunkSize int64
|
|
|
+ pos int64
|
|
|
+ chunkOff int64
|
|
|
+ chunk []byte
|
|
|
+ closed bool
|
|
|
+ fetch func(off, length int64) ([]byte, error)
|
|
|
+}
|
|
|
+
|
|
|
+func newS3ChunkReader(size, chunkSize int64, fetch func(off, length int64) ([]byte, error)) io.ReadCloser {
|
|
|
+ if chunkSize <= 0 {
|
|
|
+ chunkSize = defaultS3ReadChunkSize
|
|
|
+ }
|
|
|
+ return &s3ChunkReader{
|
|
|
+ size: size,
|
|
|
+ chunkSize: chunkSize,
|
|
|
+ chunkOff: -1,
|
|
|
+ fetch: fetch,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (r *s3ChunkReader) Read(p []byte) (int, error) {
|
|
|
+ if r.closed {
|
|
|
+ return 0, errors.New("s3 chunk reader is closed")
|
|
|
+ }
|
|
|
+ if len(p) == 0 {
|
|
|
+ return 0, nil
|
|
|
+ }
|
|
|
+ if r.pos >= r.size {
|
|
|
+ return 0, io.EOF
|
|
|
+ }
|
|
|
+
|
|
|
+ n := 0
|
|
|
+ for n < len(p) && r.pos < r.size {
|
|
|
+ if !r.hasChunkForPos(r.pos) {
|
|
|
+ if err := r.loadChunk(); err != nil {
|
|
|
+ if err == io.EOF && n > 0 {
|
|
|
+ return n, nil
|
|
|
+ }
|
|
|
+ return n, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ chunkIdx := int(r.pos - r.chunkOff)
|
|
|
+ wrote := copy(p[n:], r.chunk[chunkIdx:])
|
|
|
+ n += wrote
|
|
|
+ r.pos += int64(wrote)
|
|
|
+ }
|
|
|
+
|
|
|
+ if r.pos >= r.size {
|
|
|
+ return n, io.EOF
|
|
|
+ }
|
|
|
+ return n, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r *s3ChunkReader) Close() error {
|
|
|
+ r.closed = true
|
|
|
+ r.chunk = nil
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r *s3ChunkReader) hasChunkForPos(pos int64) bool {
|
|
|
+ if len(r.chunk) == 0 || r.chunkOff < 0 {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ return pos >= r.chunkOff && pos < r.chunkOff+int64(len(r.chunk))
|
|
|
+}
|
|
|
+
|
|
|
+func (r *s3ChunkReader) loadChunk() error {
|
|
|
+ if r.pos >= r.size {
|
|
|
+ return io.EOF
|
|
|
+ }
|
|
|
+
|
|
|
+ length := r.chunkSize
|
|
|
+ if remaining := r.size - r.pos; remaining < length {
|
|
|
+ length = remaining
|
|
|
+ }
|
|
|
+
|
|
|
+ chunk, err := r.fetch(r.pos, length)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if len(chunk) == 0 {
|
|
|
+ return io.EOF
|
|
|
+ }
|
|
|
+
|
|
|
+ r.chunk = chunk
|
|
|
+ r.chunkOff = r.pos
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
// awsAuth retrieves credentials from the aws-sdk-go.
|
|
|
type awsAuth struct {
|
|
|
Region string
|