Alex Meijer 4 settimane fa
parent
commit
83788fd83d
2 ha cambiato i file con 17 aggiunte e 171 eliminazioni
  1. 17 107
      core/pkg/storage/s3storage.go
  2. 0 64
      core/pkg/storage/s3storage_test.go

+ 17 - 107
core/pkg/storage/s3storage.go

@@ -62,8 +62,6 @@ var defaultS3Config = S3Config{
 	PartSize: 1024 * 1024 * 64, // 64MB.
 }
 
-const defaultS3ReadChunkSize int64 = 8 * 1024 * 1024
-
 // Config stores the configuration for s3 bucket.
 type S3Config struct {
 	Bucket             string            `yaml:"bucket"`
@@ -301,8 +299,7 @@ func (s3 *S3Storage) Read(name string) ([]byte, error) {
 
 }
 
-// ReadStream returns an io.ReadCloser that incrementally streams an object from S3
-// by issuing byte-range requests under the hood.
+// ReadStream returns an io.ReadCloser that streams an object from S3.
 func (s3 *S3Storage) ReadStream(path string) (io.ReadCloser, error) {
 	path = trimLeading(path)
 
@@ -314,17 +311,28 @@ func (s3 *S3Storage) ReadStream(path string) (io.ReadCloser, error) {
 		return nil, err
 	}
 
-	objInfo, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse})
+	opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
+	r, err := s3.client.GetObject(ctx, s3.name, path, *opts)
 	if err != nil {
-		if s3.isDoesNotExist(err) || s3.isObjNotFound(err) {
+		if s3.isObjNotFound(err) {
+			return nil, DoesNotExistError
+		}
+		return nil, err
+	}
+
+	// Force a metadata call and surface "not found" errors early,
+	// matching behavior in getRange().
+	if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil {
+		if s3.isObjNotFound(err) || s3.isDoesNotExist(err) {
+			_ = r.Close()
 			return nil, DoesNotExistError
 		}
+
+		_ = r.Close()
 		return nil, errors.Wrap(err, "StatObject from S3 failed")
 	}
 
-	return newS3ChunkReader(objInfo.Size, defaultS3ReadChunkSize, func(off, length int64) ([]byte, error) {
-		return s3.getRange(ctx, path, off, length)
-	}), nil
+	return r, nil
 }
 
 // ReadToLocalFile streams the specified object at path to destPath on the local file system.
@@ -653,104 +661,6 @@ func setGetObjectRange(opts *minio.GetObjectOptions, off, length int64) error {
 	return nil
 }
 
-type s3ChunkReader struct {
-	size      int64
-	chunkSize int64
-	pos       int64
-	chunkOff  int64
-	chunk     []byte
-	closed    bool
-	fetch     func(off, length int64) ([]byte, error)
-}
-
-
-// s3 chunk reader has following features over minio: 
-// It enforces fixed-size ranged reads (defaultS3ReadChunkSize) instead of one long-lived stream.
-// It keeps error mapping consistent (DoesNotExistError, range validation, SSE options via getRange()).
-// It gives predictable io.Reader semantics around EOF/partial reads while hiding S3 range mechanics.
-//
-func newS3ChunkReader(size, chunkSize int64, fetch func(off, length int64) ([]byte, error)) io.ReadCloser {
-	if chunkSize <= 0 {
-		chunkSize = defaultS3ReadChunkSize
-	}
-	return &s3ChunkReader{
-		size:      size,
-		chunkSize: chunkSize,
-		chunkOff:  -1,
-		fetch:     fetch,
-	}
-}
-
-func (r *s3ChunkReader) Read(p []byte) (int, error) {
-	if r.closed {
-		return 0, errors.New("s3 chunk reader is closed")
-	}
-	if len(p) == 0 {
-		return 0, nil
-	}
-	if r.pos >= r.size {
-		return 0, io.EOF
-	}
-
-	n := 0
-	for n < len(p) && r.pos < r.size {
-		if !r.hasChunkForPos(r.pos) {
-			if err := r.loadChunk(); err != nil {
-				if err == io.EOF && n > 0 {
-					return n, nil
-				}
-				return n, err
-			}
-		}
-
-		chunkIdx := int(r.pos - r.chunkOff)
-		wrote := copy(p[n:], r.chunk[chunkIdx:])
-		n += wrote
-		r.pos += int64(wrote)
-	}
-
-	if r.pos >= r.size {
-		return n, io.EOF
-	}
-	return n, nil
-}
-
-func (r *s3ChunkReader) Close() error {
-	r.closed = true
-	r.chunk = nil
-	return nil
-}
-
-func (r *s3ChunkReader) hasChunkForPos(pos int64) bool {
-	if len(r.chunk) == 0 || r.chunkOff < 0 {
-		return false
-	}
-	return pos >= r.chunkOff && pos < r.chunkOff+int64(len(r.chunk))
-}
-
-func (r *s3ChunkReader) loadChunk() error {
-	if r.pos >= r.size {
-		return io.EOF
-	}
-
-	length := r.chunkSize
-	if remaining := r.size - r.pos; remaining < length {
-		length = remaining
-	}
-
-	chunk, err := r.fetch(r.pos, length)
-	if err != nil {
-		return err
-	}
-	if len(chunk) == 0 {
-		return io.EOF
-	}
-
-	r.chunk = chunk
-	r.chunkOff = r.pos
-	return nil
-}
-
 // awsAuth retrieves credentials from the aws-sdk-go.
 type awsAuth struct {
 	Region string

+ 0 - 64
core/pkg/storage/s3storage_test.go

@@ -1,10 +1,6 @@
 package storage
 
 import (
-	"bytes"
-	"errors"
-	"io"
-	"reflect"
 	"testing"
 
 	"github.com/minio/minio-go/v7"
@@ -101,63 +97,3 @@ func TestSetGetObjectRange(t *testing.T) {
 	}
 }
 
-func TestS3ChunkReader_ReadUsesRanges(t *testing.T) {
-	data := []byte("abcdefghijklmnopqrstuvwxyz")
-	var calls [][2]int64
-
-	reader := newS3ChunkReader(int64(len(data)), 8, func(off, length int64) ([]byte, error) {
-		calls = append(calls, [2]int64{off, length})
-		end := off + length
-		if end > int64(len(data)) {
-			end = int64(len(data))
-		}
-		return data[off:end], nil
-	})
-	defer reader.Close()
-
-	got, err := io.ReadAll(reader)
-	if err != nil {
-		t.Fatalf("reading chunked reader failed: %v", err)
-	}
-	if !bytes.Equal(got, data) {
-		t.Fatalf("data mismatch: got=%q want=%q", string(got), string(data))
-	}
-
-	wantCalls := [][2]int64{
-		{0, 8},
-		{8, 8},
-		{16, 8},
-		{24, 2},
-	}
-	if !reflect.DeepEqual(calls, wantCalls) {
-		t.Fatalf("range calls mismatch: got=%v want=%v", calls, wantCalls)
-	}
-}
-
-func TestS3ChunkReader_Close(t *testing.T) {
-	reader := newS3ChunkReader(10, 4, func(off, length int64) ([]byte, error) {
-		return []byte("xxxx"), nil
-	})
-	if err := reader.Close(); err != nil {
-		t.Fatalf("close failed: %v", err)
-	}
-
-	p := make([]byte, 4)
-	_, err := reader.Read(p)
-	if err == nil {
-		t.Fatal("expected read error after close")
-	}
-}
-
-func TestS3ChunkReader_PropagatesFetchError(t *testing.T) {
-	reader := newS3ChunkReader(10, 4, func(off, length int64) ([]byte, error) {
-		return nil, errors.New("fetch failed")
-	})
-	defer reader.Close()
-
-	p := make([]byte, 4)
-	_, err := reader.Read(p)
-	if err == nil || err.Error() != "fetch failed" {
-		t.Fatalf("expected fetch error, got: %v", err)
-	}
-}