Prechádzať zdrojové kódy

Memory tweaks 2 (#3706)

Signed-off-by: Alex Meijer <ameijer@users.noreply.github.com>
Co-authored-by: Matt Bolt <mbolt35@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Alex Meijer 3 týždňov pred
rodič
commit
781cf45815

+ 65 - 2
core/pkg/storage/azurestorage.go

@@ -8,7 +8,10 @@ import (
 	"bytes"
 	"context"
 	"fmt"
+	"io"
 	"net/http"
+	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -235,9 +238,9 @@ func (b *AzureStorage) Read(name string) ([]byte, error) {
 
 	downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
 	if err != nil {
-		return nil, fmt.Errorf("AzureStorage: Read: failed to download %w", err)
+		return nil, fmt.Errorf("AzureStorage: Read: failed to download blob %q: %w", name, err)
 	}
-	// NOTE: automatically retries are performed if the connection fails
+	// NOTE: automatic retries are performed if the connection fails
 	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
 		MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
 	})
@@ -254,6 +257,66 @@ func (b *AzureStorage) Read(name string) ([]byte, error) {
 	return downloadedData.Bytes(), nil
 }
 
+// ReadStream returns a streaming reader for the specified blob path.
+func (b *AzureStorage) ReadStream(path string) (io.ReadCloser, error) {
+	path = trimLeading(path)
+	ctx := context.Background()
+
+	log.Debugf("AzureStorage::ReadStream::HTTPS(%s)", path)
+
+	downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
+	if err != nil {
+		if b.IsObjNotFoundErr(err) {
+			return nil, DoesNotExistError
+		}
+		return nil, fmt.Errorf("AzureStorage: ReadStream: failed to download blob %q: %w", path, err)
+	}
+
+	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
+		MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
+	})
+	return retryReader, nil
+}
+
+// ReadToLocalFile streams the specified blob at path to destPath on the local file system.
+func (b *AzureStorage) ReadToLocalFile(path, destPath string) error {
+	path = trimLeading(path)
+	ctx := context.Background()
+
+	log.Debugf("AzureStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
+
+	downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
+	if err != nil {
+		if b.IsObjNotFoundErr(err) {
+			return DoesNotExistError
+		}
+		return fmt.Errorf("AzureStorage: ReadToLocalFile: failed to download blob %q to %q: %w", path, destPath, err)
+	}
+	// NOTE: automatic retries are performed if the connection fails.
+	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
+		MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
+	})
+	defer retryReader.Close()
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return errors.Wrap(err, "creating destination directory")
+	}
+
+	f, err := os.Create(destPath)
+	if err != nil {
+		return errors.Wrapf(err, "creating destination file %s", destPath)
+	}
+	defer f.Close()
+
+	// Use 1 MB buffer for streaming operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(f, retryReader, buf); err != nil {
+		return errors.Wrapf(err, "streaming %s to %s", path, destPath)
+	}
+
+	return nil
+}
+
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 func (b *AzureStorage) Write(name string, data []byte) error {

+ 28 - 0
core/pkg/storage/bucketstorage_test.go

@@ -93,6 +93,34 @@ func TestBucketStorage_Stat(t *testing.T) {
 	TestStorageStat(t, store)
 }
 
+func TestBucketStorage_ReadToLocalFile(t *testing.T) {
+	configPath := os.Getenv("TEST_BUCKET_CONFIG")
+	if configPath == "" {
+		t.Skip("skipping integration test, set environment variable TEST_BUCKET_CONFIG")
+	}
+	store, err := createStorage(configPath)
+	if err != nil {
+		t.Errorf("failed to create storage: %s", err.Error())
+		return
+	}
+
+	TestStorageReadToLocalFile(t, store)
+}
+
+func TestBucketStorage_ReadStream(t *testing.T) {
+	configPath := os.Getenv("TEST_BUCKET_CONFIG")
+	if configPath == "" {
+		t.Skip("skipping integration test, set environment variable TEST_BUCKET_CONFIG")
+	}
+	store, err := createStorage(configPath)
+	if err != nil {
+		t.Errorf("failed to create storage: %s", err.Error())
+		return
+	}
+
+	TestStorageReadStream(t, store)
+}
+
 // We should be able to call validate function with and without write and delete check without any errors
 func TestBucketStorage_Validate(t *testing.T) {
 	configPath := os.Getenv("TEST_BUCKET_CONFIG")

+ 37 - 0
core/pkg/storage/clusterstorage.go

@@ -7,6 +7,8 @@ import (
 	"net"
 	"net/http"
 	"net/url"
+	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -278,6 +280,41 @@ func (c *ClusterStorage) Read(path string) ([]byte, error) {
 	return jsonResp.Data, nil
 }
 
+// ReadStream returns a reader for the specified object path.
+//
+// Note: ClusterStorage does not currently expose a remote streaming endpoint, so this
+// implementation materializes the response via Read and wraps it as an io.ReadCloser.
+func (c *ClusterStorage) ReadStream(path string) (io.ReadCloser, error) {
+	data, err := c.Read(path)
+	if err != nil {
+		return nil, err
+	}
+	return io.NopCloser(bytes.NewReader(data)), nil
+}
+
+// ReadToLocalFile downloads the specified object at path to destPath on the local file system.
+//
+// Note: ClusterStorage does not currently expose a streaming download endpoint, so this implementation
+// loads the content via Read() and then writes it to destPath.
+func (c *ClusterStorage) ReadToLocalFile(path, destPath string) error {
+	log.Debugf("ClusterStorage::ReadToLocalFile::%s(%s) -> %s", strings.ToUpper(c.scheme()), path, destPath)
+
+	data, err := c.Read(path)
+	if err != nil {
+		return err
+	}
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return fmt.Errorf("ClusterStorage: ReadToLocalFile: creating destination directory: %w", err)
+	}
+
+	if err := os.WriteFile(destPath, data, 0600); err != nil {
+		return fmt.Errorf("ClusterStorage: ReadToLocalFile: writing destination file: %w", err)
+	}
+
+	return nil
+}
+
 func (c *ClusterStorage) Write(path string, data []byte) error {
 	log.Debugf("ClusterStorage::Write::%s(%s)", strings.ToUpper(c.scheme()), path)
 

+ 108 - 0
core/pkg/storage/clusterstorage_test.go

@@ -2,7 +2,14 @@ package storage
 
 import (
 	"crypto/tls"
+	"encoding/json"
+	"io"
 	"net/http"
+	"net/http/httptest"
+	"net/url"
+	"os"
+	"path/filepath"
+	"strconv"
 	"strings"
 	"testing"
 )
@@ -63,3 +70,104 @@ func TestClusterStorage_scheme(t *testing.T) {
 		})
 	}
 }
+
+func TestClusterStorage_ReadToLocalFile(t *testing.T) {
+	expected := []byte("cluster-storage-contents")
+
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		if r.URL.Path != "/clusterStorage/read" {
+			w.WriteHeader(http.StatusNotFound)
+			return
+		}
+
+		resp := Response[[]byte]{
+			Code: 0,
+			Data: expected,
+		}
+
+		w.Header().Set("Content-Type", "application/json")
+		_ = json.NewEncoder(w).Encode(resp)
+	}))
+	defer srv.Close()
+
+	u, err := url.Parse(srv.URL)
+	if err != nil {
+		t.Fatalf("parsing test server URL: %s", err)
+	}
+
+	port, err := strconv.Atoi(u.Port())
+	if err != nil {
+		t.Fatalf("parsing test server port: %s", err)
+	}
+
+	cs := &ClusterStorage{
+		client: &http.Client{},
+		host:   u.Hostname(),
+		port:   port,
+	}
+
+	destPath := filepath.Join(t.TempDir(), "out.bin")
+	if err := cs.ReadToLocalFile("some/path", destPath); err != nil {
+		t.Fatalf("ReadToLocalFile failed: %s", err)
+	}
+
+	data, err := os.ReadFile(destPath)
+	if err != nil {
+		t.Fatalf("reading destination file: %s", err)
+	}
+
+	if string(data) != string(expected) {
+		t.Fatalf("destination file contents mismatch: got %q want %q", string(data), string(expected))
+	}
+}
+
+func TestClusterStorage_ReadStream(t *testing.T) {
+	expected := []byte("cluster-storage-stream-contents")
+
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		if r.URL.Path != "/clusterStorage/read" {
+			w.WriteHeader(http.StatusNotFound)
+			return
+		}
+
+		resp := Response[[]byte]{
+			Code: 0,
+			Data: expected,
+		}
+
+		w.Header().Set("Content-Type", "application/json")
+		_ = json.NewEncoder(w).Encode(resp)
+	}))
+	defer srv.Close()
+
+	u, err := url.Parse(srv.URL)
+	if err != nil {
+		t.Fatalf("parsing test server URL: %s", err)
+	}
+
+	port, err := strconv.Atoi(u.Port())
+	if err != nil {
+		t.Fatalf("parsing test server port: %s", err)
+	}
+
+	cs := &ClusterStorage{
+		client: &http.Client{},
+		host:   u.Hostname(),
+		port:   port,
+	}
+
+	r, err := cs.ReadStream("some/path")
+	if err != nil {
+		t.Fatalf("ReadStream failed: %s", err)
+	}
+	defer r.Close()
+
+	data, err := io.ReadAll(r)
+	if err != nil {
+		t.Fatalf("reading stream failed: %s", err)
+	}
+
+	if string(data) != string(expected) {
+		t.Fatalf("stream contents mismatch: got %q want %q", string(data), string(expected))
+	}
+}

+ 51 - 1
core/pkg/storage/filestorage.go

@@ -2,6 +2,7 @@ package storage
 
 import (
 	"fmt"
+	"io"
 	gofs "io/fs"
 	"os"
 	gopath "path"
@@ -104,7 +105,7 @@ func (fs *FileStorage) ListDirectories(path string) ([]*StorageInfo, error) {
 //
 // It takes advantage of flock() based locking to improve safety.
 func (fs *FileStorage) Read(path string) ([]byte, error) {
-	f := gopath.Join(fs.baseDir, path)
+	f := filepath.Join(fs.baseDir, path)
 
 	b, err := fileutil.ReadLocked(f)
 	if err != nil {
@@ -117,6 +118,55 @@ func (fs *FileStorage) Read(path string) ([]byte, error) {
 	return b, nil
 }
 
+// ReadStream returns a streaming reader for the specified file path.
+func (fs *FileStorage) ReadStream(path string) (io.ReadCloser, error) {
+	f := filepath.Join(fs.baseDir, path)
+
+	file, err := os.Open(f)
+	if err != nil {
+		if errors.Is(err, os.ErrNotExist) {
+			return nil, DoesNotExistError
+		}
+		return nil, fmt.Errorf("opening %s: %w", f, err)
+	}
+
+	return file, nil
+}
+
+// ReadToLocalFile streams the specified file at path to destPath on the local file system.
+//
+// For FileStorage, this is implemented as a local file copy.
+func (fs *FileStorage) ReadToLocalFile(path, destPath string) error {
+	src := filepath.Join(fs.baseDir, path)
+
+	in, err := os.Open(src)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return DoesNotExistError
+		}
+		return fmt.Errorf("reading %s: %w", src, err)
+	}
+	defer in.Close()
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return fmt.Errorf("creating destination directory: %w", err)
+	}
+
+	out, err := os.Create(destPath)
+	if err != nil {
+		return fmt.Errorf("creating destination file %s: %w", destPath, err)
+	}
+	defer out.Close()
+
+	// Use 1 MB buffer for file copy operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(out, in, buf); err != nil {
+		return fmt.Errorf("streaming %s to %s: %w", src, destPath, err)
+	}
+
+	return nil
+}
+
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 //

+ 12 - 0
core/pkg/storage/filestorage_test.go

@@ -35,3 +35,15 @@ func TestFileStorageListDirectoriesSymlink(t *testing.T) {
 		t.Errorf("Expected dir.Name to be '%s' but it was '%s'", filepath.Join(subdirName, slFileName), dir.Name)
 	}
 }
+
+func TestFileStorage_ReadToLocalFile(t *testing.T) {
+	storeBaseDir := t.TempDir()
+	store := NewFileStorage(storeBaseDir)
+	TestStorageReadToLocalFile(t, store)
+}
+
+func TestFileStorage_ReadStream(t *testing.T) {
+	storeBaseDir := t.TempDir()
+	store := NewFileStorage(storeBaseDir)
+	TestStorageReadStream(t, store)
+}

+ 70 - 0
core/pkg/storage/gcsstorage.go

@@ -7,12 +7,16 @@ package storage
 import (
 	"context"
 	"io"
+	"net/http"
+	"os"
+	"path/filepath"
 	"strings"
 
 	gcs "cloud.google.com/go/storage"
 	"github.com/opencost/opencost/core/pkg/log"
 	"github.com/pkg/errors"
 	"golang.org/x/oauth2/google"
+	"google.golang.org/api/googleapi"
 	"google.golang.org/api/iterator"
 	"google.golang.org/api/option"
 	"gopkg.in/yaml.v2"
@@ -127,8 +131,16 @@ func (gs *GCSStorage) Read(name string) ([]byte, error) {
 	ctx := context.Background()
 	reader, err := gs.bucket.Object(name).NewReader(ctx)
 	if err != nil {
+		// Normalize GCS "object not found" errors to DoesNotExistError for consistency
+		if err == gcs.ErrObjectNotExist {
+			return nil, DoesNotExistError
+		}
+		if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
+			return nil, DoesNotExistError
+		}
 		return nil, err
 	}
+	defer reader.Close()
 
 	data, err := io.ReadAll(reader)
 	if err != nil {
@@ -138,6 +150,64 @@ func (gs *GCSStorage) Read(name string) ([]byte, error) {
 	return data, nil
 }
 
+// ReadStream returns a streaming reader for the specified object path.
+func (gs *GCSStorage) ReadStream(path string) (io.ReadCloser, error) {
+	path = trimLeading(path)
+	log.Debugf("GCSStorage::ReadStream::HTTPS(%s)", path)
+
+	ctx := context.Background()
+	reader, err := gs.bucket.Object(path).NewReader(ctx)
+	if err != nil {
+		if err == gcs.ErrObjectNotExist {
+			return nil, DoesNotExistError
+		}
+		if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
+			return nil, DoesNotExistError
+		}
+		return nil, err
+	}
+
+	return reader, nil
+}
+
+// ReadToLocalFile streams the specified object at path to destPath on the local file system.
+func (gs *GCSStorage) ReadToLocalFile(path, destPath string) error {
+	path = trimLeading(path)
+	log.Debugf("GCSStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
+
+	ctx := context.Background()
+	reader, err := gs.bucket.Object(path).NewReader(ctx)
+	if err != nil {
+		// Normalize GCS "object not found" errors to DoesNotExistError for consistency
+		if err == gcs.ErrObjectNotExist {
+			return DoesNotExistError
+		}
+		if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
+			return DoesNotExistError
+		}
+		return err
+	}
+	defer reader.Close()
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return errors.Wrap(err, "creating destination directory")
+	}
+
+	f, err := os.Create(destPath)
+	if err != nil {
+		return errors.Wrapf(err, "creating destination file %s", destPath)
+	}
+	defer f.Close()
+
+	// Use 1 MB buffer for streaming operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(f, reader, buf); err != nil {
+		return errors.Wrapf(err, "streaming %s to %s", path, destPath)
+	}
+
+	return nil
+}
+
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 func (gs *GCSStorage) Write(name string, data []byte) error {

+ 46 - 2
core/pkg/storage/memorystorage.go

@@ -1,7 +1,10 @@
 package storage
 
 import (
+	"bytes"
 	"fmt"
+	"io"
+	"os"
 	"path/filepath"
 	"sync"
 
@@ -54,7 +57,7 @@ func (ms *MemoryStorage) Stat(path string) (*StorageInfo, error) {
 		}, nil
 	}
 
-	return nil, fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
+	return nil, DoesNotExistError
 }
 
 // Read uses the relative path of the storage combined with the provided path to
@@ -69,7 +72,48 @@ func (ms *MemoryStorage) Read(path string) ([]byte, error) {
 		return file.Contents, nil
 	}
 
-	return nil, fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
+	return nil, DoesNotExistError
+}
+
+// ReadStream returns a streaming reader for the specified in-memory object.
+func (ms *MemoryStorage) ReadStream(path string) (io.ReadCloser, error) {
+	ms.lock.Lock()
+	defer ms.lock.Unlock()
+
+	path = filepath.Clean(path)
+
+	if file, ok := ms.directPaths[path]; ok {
+		data := append([]byte(nil), file.Contents...)
+		return io.NopCloser(bytes.NewReader(data)), nil
+	}
+
+	return nil, DoesNotExistError
+}
+
+// ReadToLocalFile writes the specified object at path to destPath on the local file system.
+func (ms *MemoryStorage) ReadToLocalFile(path, destPath string) error {
+	ms.lock.Lock()
+	path = filepath.Clean(path)
+
+	file, ok := ms.directPaths[path]
+	if !ok {
+		ms.lock.Unlock()
+		return DoesNotExistError
+	}
+
+	// Copy the contents so we can release the lock before doing potentially slow disk IO.
+	data := append([]byte(nil), file.Contents...)
+	ms.lock.Unlock()
+
+	dir := filepath.Dir(destPath)
+	if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+		return fmt.Errorf("MemoryStorage: ReadToLocalFile: creating destination directory: %w", err)
+	}
+	if err := os.WriteFile(destPath, data, 0600); err != nil {
+		return fmt.Errorf("MemoryStorage: ReadToLocalFile: writing destination file: %w", err)
+	}
+
+	return nil
 }
 
 // Write uses the relative path of the storage combined with the provided path

+ 10 - 0
core/pkg/storage/memorystorage_test.go

@@ -380,3 +380,13 @@ func TestMemoryStorage_Stat(t *testing.T) {
 		})
 	}
 }
+
+func TestMemoryStorage_ReadToLocalFile(t *testing.T) {
+	store := NewMemoryStorage()
+	TestStorageReadToLocalFile(t, store)
+}
+
+func TestMemoryStorage_ReadStream(t *testing.T) {
+	store := NewMemoryStorage()
+	TestStorageReadStream(t, store)
+}

+ 11 - 0
core/pkg/storage/prefixedbucketstorage.go

@@ -6,6 +6,7 @@ package storage
 
 import (
 	"fmt"
+	"io"
 	"strings"
 
 	"github.com/pkg/errors"
@@ -81,6 +82,16 @@ func (pbs *PrefixedBucketStorage) Read(name string) ([]byte, error) {
 	return pbs.storage.Read(conditionalPrefix(pbs.prefix, name))
 }
 
+// ReadStream returns a streaming reader for the given object name.
+func (pbs *PrefixedBucketStorage) ReadStream(name string) (io.ReadCloser, error) {
+	return pbs.storage.ReadStream(conditionalPrefix(pbs.prefix, name))
+}
+
+// ReadToLocalFile streams the specified object at path to destPath on the local file system.
+func (pbs *PrefixedBucketStorage) ReadToLocalFile(path, destPath string) error {
+	return pbs.storage.ReadToLocalFile(conditionalPrefix(pbs.prefix, path), destPath)
+}
+
 // Remove deletes the object with the given name.
 func (pbs *PrefixedBucketStorage) Remove(name string) error {
 	return pbs.storage.Remove(conditionalPrefix(pbs.prefix, name))

+ 23 - 0
core/pkg/storage/prefixedbucketstorage_test.go

@@ -0,0 +1,23 @@
+package storage
+
+import "testing"
+
+func TestPrefixedBucketStorage_ReadToLocalFile(t *testing.T) {
+	base := NewMemoryStorage()
+	store, err := NewPrefixedBucketStorage(base, "myprefix")
+	if err != nil {
+		t.Fatalf("failed to create prefixed storage: %s", err)
+	}
+
+	TestStorageReadToLocalFile(t, store)
+}
+
+func TestPrefixedBucketStorage_ReadStream(t *testing.T) {
+	base := NewMemoryStorage()
+	store, err := NewPrefixedBucketStorage(base, "myprefix")
+	if err != nil {
+		t.Fatalf("failed to create prefixed storage: %s", err)
+	}
+
+	TestStorageReadStream(t, store)
+}

+ 137 - 10
core/pkg/storage/s3storage.go

@@ -10,6 +10,7 @@ import (
 	"io"
 	"net/http"
 	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -298,6 +299,118 @@ func (s3 *S3Storage) Read(name string) ([]byte, error) {
 
 }
 
+// ReadStream returns an io.ReadCloser that streams an object from S3.
+func (s3 *S3Storage) ReadStream(path string) (io.ReadCloser, error) {
+	path = trimLeading(path)
+
+	log.Debugf("S3Storage::ReadStream::%s(%s)", s3.protocol(), path)
+	ctx := context.Background()
+
+	sse, err := s3.getServerSideEncryption(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
+	r, err := s3.client.GetObject(ctx, s3.name, path, *opts)
+	if err != nil {
+		if s3.isObjNotFound(err) {
+			return nil, DoesNotExistError
+		}
+		return nil, err
+	}
+
+	// Force a metadata call and surface "not found" errors early,
+	// matching behavior in getRange().
+	if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil {
+		if s3.isObjNotFound(err) || s3.isDoesNotExist(err) {
+			_ = r.Close()
+			return nil, DoesNotExistError
+		}
+
+		_ = r.Close()
+		return nil, errors.Wrap(err, "StatObject from S3 failed")
+	}
+
+	return r, nil
+}
+
+// ReadToLocalFile streams the specified object at path to destPath on the local file system.
+func (s3 *S3Storage) ReadToLocalFile(path, destPath string) error {
+	path = trimLeading(path)
+
+	log.Debugf("S3Storage::ReadToLocalFile::%s(%s) -> %s", s3.protocol(), path, destPath)
+	ctx := context.Background()
+
+	sse, err := s3.getServerSideEncryption(ctx)
+	if err != nil {
+		return err
+	}
+
+	opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
+	r, err := s3.client.GetObject(ctx, s3.name, path, *opts)
+	if err != nil {
+		if s3.isObjNotFound(err) {
+			return DoesNotExistError
+		}
+		return err
+	}
+	defer r.Close()
+
+	// Force a metadata call and surface "not found" errors early,
+	// matching behavior in getRange().
+	if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil {
+		if s3.isObjNotFound(err) {
+			return DoesNotExistError
+		}
+		return errors.Wrap(err, "StatObject from S3 failed")
+	}
+
+	dir := filepath.Dir(destPath)
+	if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+		return errors.Wrap(err, "creating destination directory")
+	}
+
+	// Write to a temporary file in the same directory to avoid leaving a
+	// partially-written file at destPath on error. Rename atomically on success.
+	tmpFile, err := os.CreateTemp(dir, ".s3-read-*")
+	if err != nil {
+		return errors.Wrapf(err, "creating temporary file in %s", dir)
+	}
+	tmpPath := tmpFile.Name()
+
+	// Ensure temporary file is cleaned up on error.
+	success := false
+	defer func() {
+		if !success {
+			_ = tmpFile.Close()
+			_ = os.Remove(tmpPath)
+		}
+	}()
+
+	// Use 1 MB buffer for streaming operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(tmpFile, r, buf); err != nil {
+		return errors.Wrapf(err, "streaming %s to %s", path, destPath)
+	}
+
+	// Ensure data is flushed to disk before renaming.
+	if err := tmpFile.Sync(); err != nil {
+		return errors.Wrapf(err, "syncing temporary file for %s", destPath)
+	}
+	if err := tmpFile.Close(); err != nil {
+		return errors.Wrapf(err, "closing temporary file for %s", destPath)
+	}
+
+	// Atomically move the fully written temp file into place.
+	if err := os.Rename(tmpPath, destPath); err != nil {
+		return errors.Wrapf(err, "renaming temporary file to %s", destPath)
+	}
+
+	success = true
+	return nil
+}
+
 // Exists checks if the given object exists.
 func (s3 *S3Storage) Exists(name string) (bool, error) {
 	name = trimLeading(name)
@@ -503,15 +616,10 @@ func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int6
 	}
 
 	opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
-	if length != -1 {
-		if err := opts.SetRange(off, off+length-1); err != nil {
-			return nil, err
-		}
-	} else if off > 0 {
-		if err := opts.SetRange(off, 0); err != nil {
-			return nil, err
-		}
+	if err := setGetObjectRange(opts, off, length); err != nil {
+		return nil, err
 	}
+
 	r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
 	if err != nil {
 		if s3.isObjNotFound(err) {
@@ -519,21 +627,40 @@ func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int6
 		}
 		return nil, err
 	}
-	defer r.Close()
-
 	// NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
 	// for convenience.
 	if _, err := r.Read(nil); err != nil {
 		if s3.isObjNotFound(err) {
+			_ = r.Close()
 			return nil, DoesNotExistError
 		}
 
+		_ = r.Close()
 		return nil, errors.Wrap(err, "Read from S3 failed")
 	}
 
+	defer r.Close()
 	return io.ReadAll(r)
 }
 
+func setGetObjectRange(opts *minio.GetObjectOptions, off, length int64) error {
+	if off < 0 {
+		return errors.New("range offset must be >= 0")
+	}
+	if length < -1 || length == 0 {
+		return errors.New("range length must be -1 or > 0")
+	}
+
+	if length > 0 {
+		return opts.SetRange(off, off+length-1)
+	}
+	if off > 0 {
+		return opts.SetRange(off, 0)
+	}
+
+	return nil
+}
+
 // awsAuth retrieves credentials from the aws-sdk-go.
 type awsAuth struct {
 	Region string

+ 61 - 0
core/pkg/storage/s3storage_test.go

@@ -2,6 +2,8 @@ package storage
 
 import (
 	"testing"
+
+	"github.com/minio/minio-go/v7"
 )
 
 // TestS3Storage_protocol tests the protocol() method returns correct values based on insecure flag
@@ -35,3 +37,62 @@ func TestS3Storage_protocol(t *testing.T) {
 		})
 	}
 }
+
+func TestSetGetObjectRange(t *testing.T) {
+	tests := []struct {
+		name      string
+		off       int64
+		length    int64
+		expectErr bool
+	}{
+		{
+			name:      "full object range",
+			off:       0,
+			length:    -1,
+			expectErr: false,
+		},
+		{
+			name:      "offset to EOF range",
+			off:       100,
+			length:    -1,
+			expectErr: false,
+		},
+		{
+			name:      "bounded range",
+			off:       128,
+			length:    4096,
+			expectErr: false,
+		},
+		{
+			name:      "negative offset rejected",
+			off:       -1,
+			length:    -1,
+			expectErr: true,
+		},
+		{
+			name:      "zero length rejected",
+			off:       0,
+			length:    0,
+			expectErr: true,
+		},
+		{
+			name:      "invalid negative length rejected",
+			off:       0,
+			length:    -2,
+			expectErr: true,
+		},
+	}
+
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			opts := &minio.GetObjectOptions{}
+			err := setGetObjectRange(opts, tt.off, tt.length)
+			if tt.expectErr && err == nil {
+				t.Fatalf("expected error, got nil")
+			}
+			if !tt.expectErr && err != nil {
+				t.Fatalf("unexpected error: %v", err)
+			}
+		})
+	}
+}

+ 11 - 0
core/pkg/storage/storage.go

@@ -1,6 +1,7 @@
 package storage
 
 import (
+	"io"
 	"os"
 	"strings"
 	"time"
@@ -38,6 +39,16 @@ type Storage interface {
 	// read the contents.
 	Read(path string) ([]byte, error)
 
+	// ReadStream returns an io.ReadCloser for the specified path. Implementations should
+	// stream incrementally when possible to avoid loading entire objects into memory.
+	ReadStream(path string) (io.ReadCloser, error)
+
+	// ReadToLocalFile writes the specified file at path to destPath on the local file system.
+	// Implementations may stream data to minimize RAM usage, but some backends may still buffer
+	// data in memory depending on their capabilities. It is up to the caller to clean up the
+	// local file when finished.
+	ReadToLocalFile(path, destPath string) error
+
 	// Write uses the relative path of the storage combined with the provided path
 	// to write a new file or overwrite an existing file.
 	Write(path string, data []byte) error

+ 147 - 0
core/pkg/storage/test.go

@@ -2,7 +2,10 @@ package storage
 
 import (
 	"fmt"
+	"io"
+	"os"
 	"path"
+	"path/filepath"
 	"strings"
 	"testing"
 
@@ -426,3 +429,147 @@ func TestStorageStat(t *testing.T, store Storage) {
 		})
 	}
 }
+
+func TestStorageReadToLocalFile(t *testing.T, store Storage) {
+	testName := "read_to_local_file"
+
+	fileNames := []string{
+		"/file0.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Fatalf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Fatalf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expectErr bool
+	}{
+		"file exists": {
+			path:      path.Join(testpath, testName, "file0.json"),
+			expectErr: false,
+		},
+		"file does not exist": {
+			path:      path.Join(testpath, testName, "file1.json"),
+			expectErr: true,
+		},
+		"dir does not exist": {
+			path:      path.Join(testpath, testName, "dir0/file.json"),
+			expectErr: true,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			destPath := filepath.Join(t.TempDir(), "out.json")
+
+			err := store.ReadToLocalFile(tc.path, destPath)
+			if tc.expectErr {
+				if err == nil {
+					t.Fatalf("expected error was not thrown")
+				}
+				return
+			}
+
+			if err != nil {
+				t.Fatalf("unexpected error: %s", err.Error())
+			}
+
+			b, err := os.ReadFile(destPath)
+			if err != nil {
+				t.Fatalf("reading destination file: %s", err)
+			}
+
+			var content testFileContent
+			err = json.Unmarshal(b, &content)
+			if err != nil {
+				t.Fatalf("could not unmarshal file content: %s", err)
+			}
+
+			if content != tfc {
+				t.Fatalf("file content did not match written value")
+			}
+		})
+	}
+}
+
+func TestStorageReadStream(t *testing.T, store Storage) {
+	testName := "read_stream"
+
+	fileNames := []string{
+		"/file0.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Fatalf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Fatalf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expectErr bool
+	}{
+		"file exists": {
+			path:      path.Join(testpath, testName, "file0.json"),
+			expectErr: false,
+		},
+		"file does not exist": {
+			path:      path.Join(testpath, testName, "file1.json"),
+			expectErr: true,
+		},
+		"dir does not exist": {
+			path:      path.Join(testpath, testName, "dir0/file.json"),
+			expectErr: true,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			r, err := store.ReadStream(tc.path)
+			if tc.expectErr {
+				if err == nil {
+					if r != nil {
+						_ = r.Close()
+					}
+					t.Fatalf("expected error was not thrown")
+				}
+				return
+			}
+
+			if err != nil {
+				t.Fatalf("unexpected error: %s", err.Error())
+			}
+			defer r.Close()
+
+			b, err := io.ReadAll(r)
+			if err != nil {
+				t.Fatalf("reading stream: %s", err)
+			}
+
+			var content testFileContent
+			err = json.Unmarshal(b, &content)
+			if err != nil {
+				t.Fatalf("could not unmarshal file content: %s", err)
+			}
+
+			if content != tfc {
+				t.Fatalf("file content did not match written value")
+			}
+		})
+	}
+}