Explorar el Código

Apply suggestions from code review

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Alex Meijer <ameijer@users.noreply.github.com>
Alex Meijer hace 1 mes
padre
commit
a96b28a8fb

+ 4 - 4
core/pkg/storage/azurestorage.go

@@ -238,9 +238,9 @@ func (b *AzureStorage) Read(name string) ([]byte, error) {
 
 	downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
 	if err != nil {
-		return nil, fmt.Errorf("AzureStorage: Read: failed to download %w", err)
+		return nil, fmt.Errorf("AzureStorage: Read: failed to download blob %q: %w", name, err)
 	}
-	// NOTE: automatically retries are performed if the connection fails
+	// NOTE: automatic retries are performed if the connection fails
 	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
 		MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
 	})
@@ -266,9 +266,9 @@ func (b *AzureStorage) ReadToLocalFile(path, destPath string) error {
 
 	downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
 	if err != nil {
-		return fmt.Errorf("AzureStorage: ReadToLocalFile: failed to download %w", err)
+		return fmt.Errorf("AzureStorage: ReadToLocalFile: failed to download blob %q to %q: %w", path, destPath, err)
 	}
-	// NOTE: automatically retries are performed if the connection fails.
+	// NOTE: automatic retries are performed if the connection fails.
 	retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
 		MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
 	})

+ 2 - 2
core/pkg/storage/filestorage.go

@@ -105,7 +105,7 @@ func (fs *FileStorage) ListDirectories(path string) ([]*StorageInfo, error) {
 //
 // It takes advantage of flock() based locking to improve safety.
 func (fs *FileStorage) Read(path string) ([]byte, error) {
-	f := gopath.Join(fs.baseDir, path)
+	f := filepath.Join(fs.baseDir, path)
 
 	b, err := fileutil.ReadLocked(f)
 	if err != nil {
@@ -122,7 +122,7 @@ func (fs *FileStorage) Read(path string) ([]byte, error) {
 //
 // For FileStorage, this is implemented as a local file copy.
 func (fs *FileStorage) ReadToLocalFile(path, destPath string) error {
-	src := gopath.Join(fs.baseDir, path)
+	src := filepath.Join(fs.baseDir, path)
 
 	in, err := os.Open(src)
 	if err != nil {

+ 14 - 0
core/pkg/storage/gcsstorage.go

@@ -129,6 +129,13 @@ func (gs *GCSStorage) Read(name string) ([]byte, error) {
 	ctx := context.Background()
 	reader, err := gs.bucket.Object(name).NewReader(ctx)
 	if err != nil {
+		// Normalize GCS "object not found" errors to DoesNotExistError for consistency
+		if err == gcs.ErrObjectNotExist {
+			return nil, DoesNotExistError
+		}
+		if e, ok := err.(*gcs.Error); ok && e.Code == 404 {
+			return nil, DoesNotExistError
+		}
 		return nil, err
 	}
 
@@ -148,6 +155,13 @@ func (gs *GCSStorage) ReadToLocalFile(path, destPath string) error {
 	ctx := context.Background()
 	reader, err := gs.bucket.Object(path).NewReader(ctx)
 	if err != nil {
+		// Normalize GCS "object not found" errors to DoesNotExistError for consistency
+		if err == gcs.ErrObjectNotExist {
+			return DoesNotExistError
+		}
+		if e, ok := err.(*gcs.Error); ok && e.Code == 404 {
+			return DoesNotExistError
+		}
 		return err
 	}
 	defer reader.Close()

+ 3 - 3
core/pkg/storage/memorystorage.go

@@ -55,7 +55,7 @@ func (ms *MemoryStorage) Stat(path string) (*StorageInfo, error) {
 		}, nil
 	}
 
-	return nil, fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
+	return nil, DoesNotExistError
 }
 
 // Read uses the relative path of the storage combined with the provided path to
@@ -70,7 +70,7 @@ func (ms *MemoryStorage) Read(path string) ([]byte, error) {
 		return file.Contents, nil
 	}
 
-	return nil, fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
+	return nil, DoesNotExistError
 }
 
 // ReadToLocalFile writes the specified object at path to destPath on the local file system.
@@ -81,7 +81,7 @@ func (ms *MemoryStorage) ReadToLocalFile(path, destPath string) error {
 	file, ok := ms.directPaths[path]
 	if !ok {
 		ms.lock.Unlock()
-		return fmt.Errorf("file not found: %s - %w", path, DoesNotExistError)
+		return DoesNotExistError
 	}
 
 	// Copy the contents so we can release the lock before doing potentially slow disk IO.

+ 34 - 8
core/pkg/storage/s3storage.go

@@ -321,31 +321,57 @@ func (s3 *S3Storage) ReadToLocalFile(path, destPath string) error {
 	}
 	defer r.Close()
 
-	// Force the initial GetObject call and surface "not found" errors early,
+	// Force a metadata call and surface "not found" errors early,
 	// matching behavior in getRange().
-	if _, err := r.Read(nil); err != nil {
+	if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil {
 		if s3.isObjNotFound(err) {
 			return DoesNotExistError
 		}
-		return errors.Wrap(err, "Read from S3 failed")
+		return errors.Wrap(err, "StatObject from S3 failed")
 	}
 
-	if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
+	dir := filepath.Dir(destPath)
+	if err := os.MkdirAll(dir, os.ModePerm); err != nil {
 		return errors.Wrap(err, "creating destination directory")
 	}
 
-	f, err := os.Create(destPath)
+	// Write to a temporary file in the same directory to avoid leaving a
+	// partially-written file at destPath on error. Rename atomically on success.
+	tmpFile, err := os.CreateTemp(dir, ".s3-read-*")
 	if err != nil {
-		return errors.Wrapf(err, "creating destination file %s", destPath)
+		return errors.Wrapf(err, "creating temporary file in %s", dir)
 	}
-	defer f.Close()
+	tmpPath := tmpFile.Name()
+
+	// Ensure temporary file is cleaned up on error.
+	success := false
+	defer func() {
+		if !success {
+			_ = tmpFile.Close()
+			_ = os.Remove(tmpPath)
+		}
+	}()
 
 	// Use 1 MB buffer for streaming operations
 	buf := make([]byte, 1024*1024)
-	if _, err := io.CopyBuffer(f, r, buf); err != nil {
+	if _, err := io.CopyBuffer(tmpFile, r, buf); err != nil {
 		return errors.Wrapf(err, "streaming %s to %s", path, destPath)
 	}
 
+	// Ensure data is flushed to disk before renaming.
+	if err := tmpFile.Sync(); err != nil {
+		return errors.Wrapf(err, "syncing temporary file for %s", destPath)
+	}
+	if err := tmpFile.Close(); err != nil {
+		return errors.Wrapf(err, "closing temporary file for %s", destPath)
+	}
+
+	// Atomically move the fully written temp file into place.
+	if err := os.Rename(tmpPath, destPath); err != nil {
+		return errors.Wrapf(err, "renaming temporary file to %s", destPath)
+	}
+
+	success = true
 	return nil
 }
 

+ 5 - 4
core/pkg/storage/storage.go

@@ -38,10 +38,11 @@ type Storage interface {
 	// read the contents.
 	Read(path string) ([]byte, error)
 
-	// ReadToLocalFile streams the specified file at path to destPath on the local file system
-	// designed for minimal RAM consumption 
-	// note, it is up to caller to clean up file when finished
-	ReadToLocalFile(path, destPath string) (error)
+	// ReadToLocalFile writes the specified file at path to destPath on the local file system.
+	// Implementations may stream data to minimize RAM usage, but some backends may still buffer
+	// data in memory depending on their capabilities. It is up to the caller to clean up the
+	// local file when finished.
+	ReadToLocalFile(path, destPath string) error
 
 	// Write uses the relative path of the storage combined with the provided path
 	// to write a new file or overwrite an existing file.