Przeglądaj źródła

impl a streaming storage poc

Alex Meijer 1 miesiąc temu
rodzic
commit
ea5a0e14e3

+ 39 - 0
core/pkg/storage/azurestorage.go

@@ -8,7 +8,10 @@ import (
 	"bytes"
 	"context"
 	"fmt"
+	"io"
 	"net/http"
+	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -254,6 +257,42 @@ func (b *AzureStorage) Read(name string) ([]byte, error) {
 	return downloadedData.Bytes(), nil
 }
 
+// ReadToLocalFile streams the specified blob at path to destPath on the local file system.
+func (b *AzureStorage) ReadToLocalFile(path, destPath string) error {
+	path = trimLeading(path)
+	ctx := context.Background()
+
+	log.Debugf("AzureStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
+
+	downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
+	if err != nil {
+		return fmt.Errorf("AzureStorage: ReadToLocalFile: failed to download %w", err)
+	}
+	// NOTE: automatically retries are performed if the connection fails.
+	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
+		MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
+	})
+	defer retryReader.Close()
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return errors.Wrap(err, "creating destination directory")
+	}
+
+	f, err := os.Create(destPath)
+	if err != nil {
+		return errors.Wrapf(err, "creating destination file %s", destPath)
+	}
+	defer f.Close()
+
+	// Use 1 MB buffer for streaming operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(f, retryReader, buf); err != nil {
+		return errors.Wrapf(err, "streaming %s to %s", path, destPath)
+	}
+
+	return nil
+}
+
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 func (b *AzureStorage) Write(name string, data []byte) error {

+ 14 - 0
core/pkg/storage/bucketstorage_test.go

@@ -93,6 +93,20 @@ func TestBucketStorage_Stat(t *testing.T) {
 	TestStorageStat(t, store)
 }
 
+func TestBucketStorage_ReadToLocalFile(t *testing.T) {
+	configPath := os.Getenv("TEST_BUCKET_CONFIG")
+	if configPath == "" {
+		t.Skip("skipping integration test, set environment variable TEST_BUCKET_CONFIG")
+	}
+	store, err := createStorage(configPath)
+	if err != nil {
+		t.Errorf("failed to create storage: %s", err.Error())
+		return
+	}
+
+	TestStorageReadToLocalFile(t, store)
+}
+
 // We should be able to call validate function with and without write and delete check without any errors
 func TestBucketStorage_Validate(t *testing.T) {
 	configPath := os.Getenv("TEST_BUCKET_CONFIG")

+ 25 - 0
core/pkg/storage/clusterstorage.go

@@ -7,6 +7,8 @@ import (
 	"net"
 	"net/http"
 	"net/url"
+	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -278,6 +280,29 @@ func (c *ClusterStorage) Read(path string) ([]byte, error) {
 	return jsonResp.Data, nil
 }
 
+// ReadToLocalFile downloads the specified object at path to destPath on the local file system.
+//
+// Note: ClusterStorage does not currently expose a streaming download endpoint, so this implementation
+// loads the content via Read() and then writes it to destPath.
+func (c *ClusterStorage) ReadToLocalFile(path, destPath string) error {
+	log.Debugf("ClusterStorage::ReadToLocalFile::%s(%s) -> %s", strings.ToUpper(c.scheme()), path, destPath)
+
+	data, err := c.Read(path)
+	if err != nil {
+		return err
+	}
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return fmt.Errorf("ClusterStorage: ReadToLocalFile: creating destination directory: %w", err)
+	}
+
+	if err := os.WriteFile(destPath, data, 0600); err != nil {
+		return fmt.Errorf("ClusterStorage: ReadToLocalFile: writing destination file: %w", err)
+	}
+
+	return nil
+}
+
 func (c *ClusterStorage) Write(path string, data []byte) error {
 	log.Debugf("ClusterStorage::Write::%s(%s)", strings.ToUpper(c.scheme()), path)
 

+ 56 - 0
core/pkg/storage/clusterstorage_test.go

@@ -2,7 +2,13 @@ package storage
 
 import (
 	"crypto/tls"
+	"encoding/json"
+	"net/http/httptest"
 	"net/http"
+	"net/url"
+	"os"
+	"path/filepath"
+	"strconv"
 	"strings"
 	"testing"
 )
@@ -63,3 +69,53 @@ func TestClusterStorage_scheme(t *testing.T) {
 		})
 	}
 }
+
+func TestClusterStorage_ReadToLocalFile(t *testing.T) {
+	expected := []byte("cluster-storage-contents")
+
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		if r.URL.Path != "/clusterStorage/read" {
+			w.WriteHeader(http.StatusNotFound)
+			return
+		}
+
+		resp := Response[[]byte]{
+			Code: 0,
+			Data: expected,
+		}
+
+		w.Header().Set("Content-Type", "application/json")
+		_ = json.NewEncoder(w).Encode(resp)
+	}))
+	defer srv.Close()
+
+	u, err := url.Parse(srv.URL)
+	if err != nil {
+		t.Fatalf("parsing test server URL: %s", err)
+	}
+
+	port, err := strconv.Atoi(u.Port())
+	if err != nil {
+		t.Fatalf("parsing test server port: %s", err)
+	}
+
+	cs := &ClusterStorage{
+		client: &http.Client{},
+		host:   u.Hostname(),
+		port:   port,
+	}
+
+	destPath := filepath.Join(t.TempDir(), "out.bin")
+	if err := cs.ReadToLocalFile("some/path", destPath); err != nil {
+		t.Fatalf("ReadToLocalFile failed: %s", err)
+	}
+
+	data, err := os.ReadFile(destPath)
+	if err != nil {
+		t.Fatalf("reading destination file: %s", err)
+	}
+
+	if string(data) != string(expected) {
+		t.Fatalf("destination file contents mismatch: got %q want %q", string(data), string(expected))
+	}
+}

+ 35 - 0
core/pkg/storage/filestorage.go

@@ -2,6 +2,7 @@ package storage
 
 import (
 	"fmt"
+	"io"
 	gofs "io/fs"
 	"os"
 	gopath "path"
@@ -117,6 +118,40 @@ func (fs *FileStorage) Read(path string) ([]byte, error) {
 	return b, nil
 }
 
+// ReadToLocalFile streams the specified file at path to destPath on the local file system.
+//
+// For FileStorage, this is implemented as a local file copy.
+func (fs *FileStorage) ReadToLocalFile(path, destPath string) error {
+	src := gopath.Join(fs.baseDir, path)
+
+	in, err := os.Open(src)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return DoesNotExistError
+		}
+		return fmt.Errorf("reading %s: %w", src, err)
+	}
+	defer in.Close()
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return fmt.Errorf("creating destination directory: %w", err)
+	}
+
+	out, err := os.Create(destPath)
+	if err != nil {
+		return fmt.Errorf("creating destination file %s: %w", destPath, err)
+	}
+	defer out.Close()
+
+	// Use 1 MB buffer for file copy operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(out, in, buf); err != nil {
+		return fmt.Errorf("streaming %s to %s: %w", src, destPath, err)
+	}
+
+	return nil
+}
+
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 //

+ 6 - 0
core/pkg/storage/filestorage_test.go

@@ -35,3 +35,9 @@ func TestFileStorageListDirectoriesSymlink(t *testing.T) {
 		t.Errorf("Expected dir.Name to be '%s' but it was '%s'", filepath.Join(subdirName, slFileName), dir.Name)
 	}
 }
+
+func TestFileStorage_ReadToLocalFile(t *testing.T) {
+	storeBaseDir := t.TempDir()
+	store := NewFileStorage(storeBaseDir)
+	TestStorageReadToLocalFile(t, store)
+}

+ 33 - 0
core/pkg/storage/gcsstorage.go

@@ -7,6 +7,8 @@ package storage
 import (
 	"context"
 	"io"
+	"os"
+	"path/filepath"
 	"strings"
 
 	gcs "cloud.google.com/go/storage"
@@ -138,6 +140,37 @@ func (gs *GCSStorage) Read(name string) ([]byte, error) {
 	return data, nil
 }
 
+// ReadToLocalFile streams the specified object at path to destPath on the local file system.
+func (gs *GCSStorage) ReadToLocalFile(path, destPath string) error {
+	path = trimLeading(path)
+	log.Debugf("GCSStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
+
+	ctx := context.Background()
+	reader, err := gs.bucket.Object(path).NewReader(ctx)
+	if err != nil {
+		return err
+	}
+	defer reader.Close()
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return errors.Wrap(err, "creating destination directory")
+	}
+
+	f, err := os.Create(destPath)
+	if err != nil {
+		return errors.Wrapf(err, "creating destination file %s", destPath)
+	}
+	defer f.Close()
+
+	// Use 1 MB buffer for streaming operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(f, reader, buf); err != nil {
+		return errors.Wrapf(err, "streaming %s to %s", path, destPath)
+	}
+
+	return nil
+}
+
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 func (gs *GCSStorage) Write(name string, data []byte) error {

+ 27 - 0
core/pkg/storage/memorystorage.go

@@ -2,6 +2,7 @@ package storage
 
 import (
 	"fmt"
+	"os"
 	"path/filepath"
 	"sync"
 
@@ -72,6 +73,32 @@ func (ms *MemoryStorage) Read(path string) ([]byte, error) {
 	return nil, fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
 }
 
+// ReadToLocalFile writes the specified object at path to destPath on the local file system.
+func (ms *MemoryStorage) ReadToLocalFile(path, destPath string) error {
+	ms.lock.Lock()
+	path = filepath.Clean(path)
+
+	file, ok := ms.directPaths[path]
+	if !ok {
+		ms.lock.Unlock()
+		return fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
+	}
+
+	// Copy the contents so we can release the lock before doing potentially slow disk IO.
+	data := append([]byte(nil), file.Contents...)
+	ms.lock.Unlock()
+
+	dir := filepath.Dir(destPath)
+	if err := os.MkdirAll(dir, os.ModePerm); err != nil {
+		return fmt.Errorf("MemoryStorage: ReadToLocalFile: creating destination directory: %w", err)
+	}
+	if err := os.WriteFile(destPath, data, 0600); err != nil {
+		return fmt.Errorf("MemoryStorage: ReadToLocalFile: writing destination file: %w", err)
+	}
+
+	return nil
+}
+
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 func (ms *MemoryStorage) Write(path string, data []byte) error {

+ 5 - 0
core/pkg/storage/memorystorage_test.go

@@ -380,3 +380,8 @@ func TestMemoryStorage_Stat(t *testing.T) {
 		})
 	}
 }
+
+func TestMemoryStorage_ReadToLocalFile(t *testing.T) {
+	store := NewMemoryStorage()
+	TestStorageReadToLocalFile(t, store)
+}

+ 5 - 0
core/pkg/storage/prefixedbucketstorage.go

@@ -81,6 +81,11 @@ func (pbs *PrefixedBucketStorage) Read(name string) ([]byte, error) {
 	return pbs.storage.Read(conditionalPrefix(pbs.prefix, name))
 }
 
+// ReadToLocalFile streams the specified object at path to destPath on the local file system.
+func (pbs *PrefixedBucketStorage) ReadToLocalFile(path, destPath string) error {
+	return pbs.storage.ReadToLocalFile(conditionalPrefix(pbs.prefix, path), destPath)
+}
+
 // Remove deletes the object with the given name.
 func (pbs *PrefixedBucketStorage) Remove(name string) error {
 	return pbs.storage.Remove(conditionalPrefix(pbs.prefix, name))

+ 14 - 0
core/pkg/storage/prefixedbucketstorage_test.go

@@ -0,0 +1,14 @@
+package storage
+
+import "testing"
+
+func TestPrefixedBucketStorage_ReadToLocalFile(t *testing.T) {
+	base := NewMemoryStorage()
+	store, err := NewPrefixedBucketStorage(base, "myprefix")
+	if err != nil {
+		t.Fatalf("failed to create prefixed storage: %s", err)
+	}
+
+	TestStorageReadToLocalFile(t, store)
+}
+

+ 51 - 0
core/pkg/storage/s3storage.go

@@ -10,6 +10,7 @@ import (
 	"io"
 	"net/http"
 	"os"
+	"path/filepath"
 	"strings"
 	"time"
 
@@ -298,6 +299,56 @@ func (s3 *S3Storage) Read(name string) ([]byte, error) {
 
 }
 
+// ReadToLocalFile streams the specified object at path to destPath on the local file system.
+func (s3 *S3Storage) ReadToLocalFile(path, destPath string) error {
+	path = trimLeading(path)
+
+	log.Debugf("S3Storage::ReadToLocalFile::%s(%s) -> %s", s3.protocol(), path, destPath)
+	ctx := context.Background()
+
+	sse, err := s3.getServerSideEncryption(ctx)
+	if err != nil {
+		return err
+	}
+
+	opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
+	r, err := s3.client.GetObject(ctx, s3.name, path, *opts)
+	if err != nil {
+		if s3.isObjNotFound(err) {
+			return DoesNotExistError
+		}
+		return err
+	}
+	defer r.Close()
+
+	// Force the initial GetObject call and surface "not found" errors early,
+	// matching behavior in getRange().
+	if _, err := r.Read(nil); err != nil {
+		if s3.isObjNotFound(err) {
+			return DoesNotExistError
+		}
+		return errors.Wrap(err, "Read from S3 failed")
+	}
+
+	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+		return errors.Wrap(err, "creating destination directory")
+	}
+
+	f, err := os.Create(destPath)
+	if err != nil {
+		return errors.Wrapf(err, "creating destination file %s", destPath)
+	}
+	defer f.Close()
+
+	// Use 1 MB buffer for streaming operations
+	buf := make([]byte, 1024*1024)
+	if _, err := io.CopyBuffer(f, r, buf); err != nil {
+		return errors.Wrapf(err, "streaming %s to %s", path, destPath)
+	}
+
+	return nil
+}
+
 // Exists checks if the given object exists.
 func (s3 *S3Storage) Exists(name string) (bool, error) {
 	name = trimLeading(name)

+ 5 - 0
core/pkg/storage/storage.go

@@ -38,6 +38,11 @@ type Storage interface {
 	// read the contents.
 	Read(path string) ([]byte, error)
 
+	// ReadToLocalFile streams the specified file at path to destPath on the local file system
+	// designed for minimal RAM consumption 
+	// note, it is up to caller to clean up file when finished
+	ReadToLocalFile(path, destPath string) (error)
+
 	// Write uses the relative path of the storage combined with the provided path
 	// to write a new file or overwrite an existing file.
 	Write(path string, data []byte) error

+ 73 - 0
core/pkg/storage/test.go

@@ -2,7 +2,9 @@ package storage
 
 import (
 	"fmt"
+	"os"
 	"path"
+	"path/filepath"
 	"strings"
 	"testing"
 
@@ -426,3 +428,74 @@ func TestStorageStat(t *testing.T, store Storage) {
 		})
 	}
 }
+
+func TestStorageReadToLocalFile(t *testing.T, store Storage) {
+	testName := "read_to_local_file"
+
+	fileNames := []string{
+		"/file0.json",
+	}
+
+	err := createFiles(fileNames, testName, store)
+	if err != nil {
+		t.Fatalf("failed to create files: %s", err)
+	}
+
+	defer func() {
+		err = cleanupFiles(fileNames, testName, store)
+		if err != nil {
+			t.Fatalf("failed to clean up files: %s", err)
+		}
+	}()
+
+	testCases := map[string]struct {
+		path      string
+		expectErr bool
+	}{
+		"file exists": {
+			path:      path.Join(testpath, testName, "file0.json"),
+			expectErr: false,
+		},
+		"file does not exist": {
+			path:      path.Join(testpath, testName, "file1.json"),
+			expectErr: true,
+		},
+		"dir does not exist": {
+			path:      path.Join(testpath, testName, "dir0/file.json"),
+			expectErr: true,
+		},
+	}
+
+	for name, tc := range testCases {
+		t.Run(name, func(t *testing.T) {
+			destPath := filepath.Join(t.TempDir(), "out.json")
+
+			err := store.ReadToLocalFile(tc.path, destPath)
+			if tc.expectErr {
+				if err == nil {
+					t.Fatalf("expected error was not thrown")
+				}
+				return
+			}
+
+			if err != nil {
+				t.Fatalf("unexpected error: %s", err.Error())
+			}
+
+			b, err := os.ReadFile(destPath)
+			if err != nil {
+				t.Fatalf("reading destination file: %s", err)
+			}
+
+			var content testFileContent
+			err = json.Unmarshal(b, &content)
+			if err != nil {
+				t.Fatalf("could not unmarshal file content: %s", err)
+			}
+
+			if content != tfc {
+				t.Fatalf("file content did not match written value")
+			}
+		})
+	}
+}