Bläddra i källkod

Added a double-buffered stream reader for azure downloads to stream directly from blob storage instead of downloading to filesystem first

Signed-off-by: Matt Bolt <mbolt35@gmail.com>
Matt Bolt 2 år sedan
förälder
incheckning
69792a7176

+ 3 - 3
pkg/cloud/azure/storagebillingparser.go

@@ -1,7 +1,6 @@
 package azure
 
 import (
-	"bytes"
 	"context"
 	"encoding/csv"
 	"fmt"
@@ -86,12 +85,13 @@ func (asbp *AzureStorageBillingParser) ParseBillingData(start, end time.Time, re
 				return err
 			}
 		} else {
-			blobBytes, err2 := asbp.DownloadBlob(blobName, client, ctx)
+			streamReader, err2 := asbp.StreamBlob(blobName, client)
 			if err2 != nil {
 				asbp.ConnectionStatus = cloud.FailedConnection
 				return err2
 			}
-			err2 = asbp.parseCSV(start, end, csv.NewReader(bytes.NewReader(blobBytes)), resultFn)
+
+			err2 = asbp.parseCSV(start, end, csv.NewReader(streamReader), resultFn)
 			if err2 != nil {
 				asbp.ConnectionStatus = cloud.ParseError
 				return err2

+ 6 - 0
pkg/cloud/azure/storageconnection.go

@@ -70,6 +70,12 @@ func (sc *StorageConnection) DownloadBlob(blobName string, client *azblob.Client
 	return downloadedData.Bytes(), nil
 }
 
+// StreamBlob returns an io.Reader for the given blob which uses a re-usable double buffer approach to stream directly
+// from blob storage.
+func (sc *StorageConnection) StreamBlob(blobName string, client *azblob.Client) (*StreamReader, error) {
+	return NewStreamReader(client, sc.Container, blobName)
+}
+
 // DownloadBlobToFile downloads the Azure Billing CSV to a local file
 func (sc *StorageConnection) DownloadBlobToFile(localFilePath string, blobName string, client *azblob.Client, ctx context.Context) error {
 	// If file exists, don't download it again

+ 247 - 0
pkg/cloud/azure/streamreader.go

@@ -0,0 +1,247 @@
+package azure
+
+import (
+	"bytes"
+	"context"
+	"io"
+
+	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
+)
+
+const defaultBlockSize = int(8 * 1024 * 1024) // 8MB
+
+// StreamReader is a double buffered streaming reader for Azure Blob Storage.
+type StreamReader struct {
+	client    *azblob.Client
+	container string
+	blobName  string
+	block     *bytes.Buffer
+	next      *streamingBlock
+	position  int64
+	size      int64
+}
+
+// NewStreamReader creates a new streaming reader for the specified blob.
+func NewStreamReader(client *azblob.Client, container string, blobName string) (*StreamReader, error) {
+	sar := &StreamReader{
+		client:    client,
+		container: container,
+		blobName:  blobName,
+		block:     nil,
+		next:      nil,
+		position:  0,
+		size:      0,
+	}
+
+	// get the size of the blob
+	blobClient := client.ServiceClient().NewContainerClient(container).NewBlobClient(blobName)
+	gr, err := blobClient.GetProperties(context.Background(), nil)
+	if err != nil {
+		return nil, err
+	}
+
+	sar.size = *gr.ContentLength
+
+	return sar, nil
+}
+
+// See io.Reader.Read
+func (r *StreamReader) Read(p []byte) (n int, err error) {
+	if r.position >= r.size {
+		return 0, io.EOF
+	}
+
+	// fetch the blocks on demand
+	if r.block == nil || r.block.Len() == 0 {
+		err := r.nextBlock()
+		if err != nil {
+			return 0, err
+		}
+	}
+
+	// block.Next() constrains the bytes read even if len(p) is larger
+	// than the rest of the block
+	copied := copy(p, r.block.Next(len(p)))
+	r.position += int64(copied)
+
+	return copied, nil
+}
+
+// nextBlock fetches the next block of data from the blob and starts the download of
+// the next block in the background.
+func (r *StreamReader) nextBlock() error {
+	// if we don't have a block, we need to fetch the first block, and start fetching
+	// the next block in the background
+	if r.block == nil {
+		current := newStreamBlock(
+			r.client,
+			r.container,
+			r.blobName,
+			nil,
+			r.position,
+			int64(defaultBlockSize),
+			r.size,
+		)
+
+		// explicitly wait here for the first block
+		err := current.Wait()
+		if err != nil {
+			return err
+		}
+
+		// set the current block and start the next block download
+		r.block = current.buffer
+
+		// if the block size capacity was reduced to a value different than the default block size,
+		// we can assume there is no more data beyond this block, so we don't need to start the next block
+		if current.capacity != int64(defaultBlockSize) {
+			return nil
+		}
+
+		// start next block stream
+		r.next = newStreamBlock(
+			r.client,
+			r.container,
+			r.blobName,
+			nil,
+			r.position+current.capacity,
+			int64(defaultBlockSize),
+			r.size,
+		)
+
+		return nil
+	}
+
+	// we have a block and a next block, so we need to wait for the next block to finish
+	// buffering, then we can swap current and next buffers
+	err := r.next.Wait()
+	if err != nil {
+		return err
+	}
+
+	// save the current buffer to re-use in the next block and set the current to the next block
+	currentBuffer := r.block
+	r.block = r.next.buffer
+
+	if r.next.capacity != int64(defaultBlockSize) {
+		return nil
+	}
+
+	// start next block stream
+	r.next = newStreamBlock(
+		r.client,
+		r.container,
+		r.blobName,
+		currentBuffer, // recycle the old current buffer as the next buffer
+		r.position+int64(r.block.Cap()),
+		int64(defaultBlockSize),
+		r.size,
+	)
+
+	return nil
+}
+
+// streamingBlock is a buffered block of data that runs in a separate goroutine
+// to allow the next block to download while the current block is being read.
+type streamingBlock struct {
+	client    *azblob.Client
+	container string
+	blob      string
+
+	done   chan struct{}
+	buffer *bytes.Buffer
+	err    error
+
+	start    int64
+	capacity int64
+}
+
+// newStreamBlock creates a new buffered block of data the down the specific
+// range of the blob. While the block download runs in a separate goroutine,
+// we will never attempt to access the passed buffer until after the Wait()
+// returns. This just ensures that we will never attempt to swap buffers
+// mid-download.
+func newStreamBlock(
+	client *azblob.Client,
+	container string,
+	blob string,
+	buffer *bytes.Buffer,
+	start int64,
+	capacity int64,
+	max int64,
+) *streamingBlock {
+	sb := &streamingBlock{
+		client:    client,
+		container: container,
+		blob:      blob,
+		done:      make(chan struct{}),
+		buffer:    buffer,
+		start:     start,
+		capacity:  capacity,
+	}
+
+	// determine if we need to reallocate a new block buffer or if we can re-use the existing storage
+	blockSize := capacity
+	if start+blockSize > max {
+		blockSize = max - start
+	}
+
+	// if the provided buffer is nil or the blockSize is different than the provided capacity, we need to reallocate
+	// reallocation will likely happen once at the end of the stream
+	if sb.buffer == nil || blockSize != capacity {
+		sb.buffer = bytes.NewBuffer(make([]byte, 0, blockSize))
+		sb.capacity = blockSize
+	} else {
+		sb.buffer.Reset()
+	}
+
+	// start a goroutine to fetch the block of data, close the done channel when the block
+	// is fetched or an error occurs
+	go func(block *streamingBlock) {
+		ctx := context.Background()
+
+		opts := azblob.DownloadStreamOptions{
+			Range: azblob.HTTPRange{
+				Offset: block.start,
+				Count:  block.capacity,
+			},
+		}
+
+		resp, err := block.client.DownloadStream(ctx, block.container, block.blob, &opts)
+		if err != nil {
+			block.err = err
+			close(block.done)
+			return
+		}
+
+		retryOpts := &azblob.RetryReaderOptions{
+			MaxRetries: 3,
+		}
+
+		var body io.ReadCloser = resp.NewRetryReader(ctx, retryOpts)
+		_, err = io.Copy(block.buffer, body)
+		if err != nil {
+			block.err = err
+			close(block.done)
+			return
+		}
+
+		err = body.Close()
+		if err != nil {
+			block.err = err
+			close(block.done)
+			return
+		}
+
+		close(block.done)
+	}(sb)
+
+	return sb
+}
+
+// Wait blocks until the block is downloaded and returns any error that occurred.
+func (sb *streamingBlock) Wait() error {
+	<-sb.done
+
+	return sb.err
+}