|
@@ -8,7 +8,10 @@ import (
|
|
|
"bytes"
|
|
"bytes"
|
|
|
"context"
|
|
"context"
|
|
|
"fmt"
|
|
"fmt"
|
|
|
|
|
+ "io"
|
|
|
"net/http"
|
|
"net/http"
|
|
|
|
|
+ "os"
|
|
|
|
|
+ "path/filepath"
|
|
|
"strings"
|
|
"strings"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
@@ -235,9 +238,9 @@ func (b *AzureStorage) Read(name string) ([]byte, error) {
|
|
|
|
|
|
|
|
downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
|
|
downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- return nil, fmt.Errorf("AzureStorage: Read: failed to download %w", err)
|
|
|
|
|
|
|
+ return nil, fmt.Errorf("AzureStorage: Read: failed to download blob %q: %w", name, err)
|
|
|
}
|
|
}
|
|
|
- // NOTE: automatically retries are performed if the connection fails
|
|
|
|
|
|
|
+ // NOTE: automatic retries are performed if the connection fails
|
|
|
retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
|
|
retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
|
|
|
MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
|
|
MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
|
|
|
})
|
|
})
|
|
@@ -254,6 +257,42 @@ func (b *AzureStorage) Read(name string) ([]byte, error) {
|
|
|
return downloadedData.Bytes(), nil
|
|
return downloadedData.Bytes(), nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// ReadToLocalFile streams the specified blob at path to destPath on the local file system.
|
|
|
|
|
+func (b *AzureStorage) ReadToLocalFile(path, destPath string) error {
|
|
|
|
|
+ path = trimLeading(path)
|
|
|
|
|
+ ctx := context.Background()
|
|
|
|
|
+
|
|
|
|
|
+ log.Debugf("AzureStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
|
|
|
|
|
+
|
|
|
|
|
+ downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return fmt.Errorf("AzureStorage: ReadToLocalFile: failed to download blob %q to %q: %w", path, destPath, err)
|
|
|
|
|
+ }
|
|
|
|
|
+ // NOTE: automatic retries are performed if the connection fails.
|
|
|
|
|
+ retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
|
|
|
|
|
+ MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
|
|
|
|
|
+ })
|
|
|
|
|
+ defer retryReader.Close()
|
|
|
|
|
+
|
|
|
|
|
+ if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
|
|
|
|
|
+ return errors.Wrap(err, "creating destination directory")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ f, err := os.Create(destPath)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return errors.Wrapf(err, "creating destination file %s", destPath)
|
|
|
|
|
+ }
|
|
|
|
|
+ defer f.Close()
|
|
|
|
|
+
|
|
|
|
|
+ // Use 1 MB buffer for streaming operations
|
|
|
|
|
+ buf := make([]byte, 1024*1024)
|
|
|
|
|
+ if _, err := io.CopyBuffer(f, retryReader, buf); err != nil {
|
|
|
|
|
+ return errors.Wrapf(err, "streaming %s to %s", path, destPath)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// Write uses the relative path of the storage combined with the provided path
|
|
// Write uses the relative path of the storage combined with the provided path
|
|
|
// to write a new file or overwrite an existing file.
|
|
// to write a new file or overwrite an existing file.
|
|
|
func (b *AzureStorage) Write(name string, data []byte) error {
|
|
func (b *AzureStorage) Write(name string, data []byte) error {
|