|
|
@@ -1,27 +1,29 @@
|
|
|
package storage
|
|
|
|
|
|
-// Fork from Thanos S3 Bucket support to reuse configuration options
|
|
|
+// Fork from Thanos Azure Storage Bucket support to reuse configuration options
|
|
|
// Licensed under the Apache License 2.0
|
|
|
-// https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
|
|
|
+// https://github.com/thanos-io/objstore/blob/main/providers/azure/azure.go
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
"context"
|
|
|
"fmt"
|
|
|
- "io"
|
|
|
"net"
|
|
|
"net/http"
|
|
|
- "net/url"
|
|
|
"strings"
|
|
|
- "sync"
|
|
|
"time"
|
|
|
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
|
|
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
|
|
"github.com/opencost/opencost/core/pkg/log"
|
|
|
|
|
|
- "github.com/Azure/azure-pipeline-go/pipeline"
|
|
|
- blob "github.com/Azure/azure-storage-blob-go/azblob"
|
|
|
- "github.com/Azure/go-autorest/autorest/adal"
|
|
|
- "github.com/Azure/go-autorest/autorest/azure/auth"
|
|
|
"github.com/pkg/errors"
|
|
|
"github.com/prometheus/common/model"
|
|
|
"gopkg.in/yaml.v2"
|
|
|
@@ -54,29 +56,19 @@ var defaultAzureConfig = AzureConfig{
|
|
|
},
|
|
|
}
|
|
|
|
|
|
-func init() {
|
|
|
- // Disable `ForceLog` in Azure storage module
|
|
|
- // As the time of this patch, the logging function in the storage module isn't correctly
|
|
|
- // detecting expected REST errors like 404 and so outputs them to syslog along with a stacktrace.
|
|
|
- // https://github.com/Azure/azure-storage-blob-go/issues/214
|
|
|
- //
|
|
|
- // This needs to be done at startup because the underlying variable is not thread safe.
|
|
|
- // https://github.com/Azure/azure-pipeline-go/blob/dc95902f1d32034f8f743ccc6c3f2eb36b84da27/pipeline/core.go#L276-L283
|
|
|
- pipeline.SetForceLogEnabled(false)
|
|
|
-}
|
|
|
-
|
|
|
// AzureConfig Azure storage configuration.
|
|
|
type AzureConfig struct {
|
|
|
- StorageAccountName string `yaml:"storage_account"`
|
|
|
- StorageAccountKey string `yaml:"storage_account_key"`
|
|
|
- ContainerName string `yaml:"container"`
|
|
|
- Endpoint string `yaml:"endpoint"`
|
|
|
- MaxRetries int `yaml:"max_retries"`
|
|
|
- MSIResource string `yaml:"msi_resource"`
|
|
|
- UserAssignedID string `yaml:"user_assigned_id"`
|
|
|
- PipelineConfig PipelineConfig `yaml:"pipeline_config"`
|
|
|
- ReaderConfig ReaderConfig `yaml:"reader_config"`
|
|
|
- HTTPConfig AzureHTTPConfig `yaml:"http_config"`
|
|
|
+ StorageAccountName string `yaml:"storage_account"`
|
|
|
+ StorageAccountKey string `yaml:"storage_account_key"`
|
|
|
+ StorageConnectionString string `yaml:"storage_connection_string"`
|
|
|
+ ContainerName string `yaml:"container"`
|
|
|
+ Endpoint string `yaml:"endpoint"`
|
|
|
+ MaxRetries int `yaml:"max_retries"`
|
|
|
+ MSIResource string `yaml:"msi_resource"`
|
|
|
+ UserAssignedID string `yaml:"user_assigned_id"`
|
|
|
+ PipelineConfig PipelineConfig `yaml:"pipeline_config"`
|
|
|
+ ReaderConfig ReaderConfig `yaml:"reader_config"`
|
|
|
+ HTTPConfig AzureHTTPConfig `yaml:"http_config"`
|
|
|
}
|
|
|
|
|
|
type ReaderConfig struct {
|
|
|
@@ -107,48 +99,32 @@ type AzureHTTPConfig struct {
|
|
|
|
|
|
// AzureStorage implements the storage.Storage interface against Azure APIs.
|
|
|
type AzureStorage struct {
|
|
|
- name string
|
|
|
- containerURL blob.ContainerURL
|
|
|
- config *AzureConfig
|
|
|
+ name string
|
|
|
+ containerClient *container.Client
|
|
|
+ config *AzureConfig
|
|
|
}
|
|
|
|
|
|
// Validate checks to see if any of the config options are set.
|
|
|
func (conf *AzureConfig) validate() error {
|
|
|
var errMsg []string
|
|
|
- if conf.MSIResource == "" {
|
|
|
- if conf.UserAssignedID == "" {
|
|
|
- if conf.StorageAccountName == "" ||
|
|
|
- conf.StorageAccountKey == "" {
|
|
|
- errMsg = append(errMsg, "invalid Azure storage configuration")
|
|
|
- }
|
|
|
- if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
|
|
|
- errMsg = append(errMsg, "no Azure storage_account specified while storage_account_key is present in config file; both should be present")
|
|
|
- }
|
|
|
- if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
|
|
|
- errMsg = append(errMsg, "no Azure storage_account_key specified while storage_account is present in config file; both should be present")
|
|
|
- }
|
|
|
- } else {
|
|
|
- if conf.StorageAccountName == "" {
|
|
|
- errMsg = append(errMsg, "UserAssignedID is configured but storage account name is missing")
|
|
|
- }
|
|
|
- if conf.StorageAccountKey != "" {
|
|
|
- errMsg = append(errMsg, "UserAssignedID is configured but storage account key is used")
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- if conf.StorageAccountName == "" {
|
|
|
- errMsg = append(errMsg, "MSI resource is configured but storage account name is missing")
|
|
|
- }
|
|
|
- if conf.StorageAccountKey != "" {
|
|
|
- errMsg = append(errMsg, "MSI resource is configured but storage account key is used")
|
|
|
- }
|
|
|
+ if conf.UserAssignedID != "" && conf.StorageAccountKey != "" {
|
|
|
+ errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_account_key authentication")
|
|
|
}
|
|
|
|
|
|
- if conf.ContainerName == "" {
|
|
|
- errMsg = append(errMsg, "no Azure container specified")
|
|
|
+ if conf.UserAssignedID != "" && conf.StorageConnectionString != "" {
|
|
|
+ errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_connection_string authentication")
|
|
|
}
|
|
|
- if conf.Endpoint == "" {
|
|
|
- conf.Endpoint = azureDefaultEndpoint
|
|
|
+
|
|
|
+ if conf.StorageAccountKey != "" && conf.StorageConnectionString != "" {
|
|
|
+ errMsg = append(errMsg, "storage_account_key and storage_connection_string cannot both be set")
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.StorageAccountName == "" {
|
|
|
+ errMsg = append(errMsg, "storage_account_name is required but not configured")
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.ContainerName == "" {
|
|
|
+ errMsg = append(errMsg, "no container specified")
|
|
|
}
|
|
|
|
|
|
if conf.PipelineConfig.MaxTries < 0 {
|
|
|
@@ -193,7 +169,7 @@ func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
|
|
|
|
|
|
conf, err := parseAzureConfig(azureConfig)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("error parsing azure storage config: %w", err)
|
|
|
}
|
|
|
|
|
|
return NewAzureStorageWith(conf)
|
|
|
@@ -202,33 +178,32 @@ func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
|
|
|
// NewAzureStorageWith returns a new Storage using the provided Azure config struct.
|
|
|
func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
|
|
|
if err := conf.validate(); err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("error validating azure storage config: %w", err)
|
|
|
}
|
|
|
|
|
|
+ containerClient, err := getContainerClient(conf)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error retrieving container client: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if storage account container already exists, and create one if it does not.
|
|
|
ctx := context.Background()
|
|
|
- container, err := createContainer(ctx, conf)
|
|
|
+ _, err = containerClient.GetProperties(ctx, &container.GetPropertiesOptions{})
|
|
|
if err != nil {
|
|
|
- ret, ok := err.(blob.StorageError)
|
|
|
- if !ok {
|
|
|
- return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
|
|
|
+ if !bloberror.HasCode(err, bloberror.ContainerNotFound) {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
- if ret.ServiceCode() == "ContainerAlreadyExists" {
|
|
|
- log.Debugf("Getting connection to existing Azure blob container: %s", conf.ContainerName)
|
|
|
- container, err = getContainer(ctx, conf)
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
|
|
|
- }
|
|
|
- } else {
|
|
|
- return nil, errors.Wrapf(err, "error creating Azure blob container: %s", container)
|
|
|
+ _, err := containerClient.Create(ctx, nil)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "error creating Azure blob container: %s", conf.ContainerName)
|
|
|
}
|
|
|
- } else {
|
|
|
- log.Infof("Azure blob container successfully created. Address: %s", container)
|
|
|
+ log.Infof("Azure blob container successfully created %s", conf.ContainerName)
|
|
|
}
|
|
|
|
|
|
return &AzureStorage{
|
|
|
- name: conf.ContainerName,
|
|
|
- containerURL: container,
|
|
|
- config: &conf,
|
|
|
+ name: conf.ContainerName,
|
|
|
+ containerClient: containerClient,
|
|
|
+ config: &conf,
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
@@ -253,17 +228,16 @@ func (as *AzureStorage) FullPath(name string) string {
|
|
|
func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
|
|
|
name = trimLeading(name)
|
|
|
ctx := context.Background()
|
|
|
-
|
|
|
- blobURL := getBlobURL(name, b.containerURL)
|
|
|
- props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
|
|
|
+ blobClient := b.containerClient.NewBlobClient(name)
|
|
|
+ props, err := blobClient.GetProperties(ctx, nil)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("error retrieving blob properties: %w", err)
|
|
|
}
|
|
|
|
|
|
return &StorageInfo{
|
|
|
Name: trimName(name),
|
|
|
- Size: props.ContentLength(),
|
|
|
- ModTime: props.LastModified(),
|
|
|
+ Size: *props.ContentLength,
|
|
|
+ ModTime: *props.LastModified,
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
@@ -275,17 +249,25 @@ func (b *AzureStorage) Read(name string) ([]byte, error) {
|
|
|
|
|
|
log.Debugf("AzureStorage::Read(%s)", name)
|
|
|
|
|
|
- reader, err := b.getBlobReader(ctx, name, 0, blob.CountToEnd)
|
|
|
+ downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("AzureStorage: Read: failed to download %w", err)
|
|
|
}
|
|
|
+ // NOTE: automatically retries are performed if the connection fails
|
|
|
+ retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
|
|
|
+ MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
|
|
|
+ })
|
|
|
+ defer retryReader.Close()
|
|
|
+
|
|
|
+ // read the body into a buffer
|
|
|
+ downloadedData := bytes.Buffer{}
|
|
|
|
|
|
- data, err := io.ReadAll(reader)
|
|
|
+ _, err = downloadedData.ReadFrom(retryReader)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("AzureStorage: Read: failed to read downloaded data %w", err)
|
|
|
}
|
|
|
|
|
|
- return data, nil
|
|
|
+ return downloadedData.Bytes(), nil
|
|
|
}
|
|
|
|
|
|
// Write uses the relative path of the storage combined with the provided path
|
|
|
@@ -296,14 +278,13 @@ func (b *AzureStorage) Write(name string, data []byte) error {
|
|
|
|
|
|
log.Debugf("AzureStorage::Write(%s)", name)
|
|
|
|
|
|
- blobURL := getBlobURL(name, b.containerURL)
|
|
|
r := bytes.NewReader(data)
|
|
|
- if _, err := blob.UploadStreamToBlockBlob(ctx, r, blobURL,
|
|
|
- blob.UploadStreamToBlockBlobOptions{
|
|
|
- BufferSize: len(data),
|
|
|
- MaxBuffers: 1,
|
|
|
- },
|
|
|
- ); err != nil {
|
|
|
+ blobClient := b.containerClient.NewBlockBlobClient(name)
|
|
|
+ opts := &blockblob.UploadStreamOptions{
|
|
|
+ BlockSize: 3 * 1024 * 1024,
|
|
|
+ Concurrency: 4,
|
|
|
+ }
|
|
|
+ if _, err := blobClient.UploadStream(ctx, r, opts); err != nil {
|
|
|
return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
|
|
|
}
|
|
|
return nil
|
|
|
@@ -317,8 +298,11 @@ func (b *AzureStorage) Remove(name string) error {
|
|
|
log.Debugf("AzureStorage::Remove(%s)", name)
|
|
|
ctx := context.Background()
|
|
|
|
|
|
- blobURL := getBlobURL(name, b.containerURL)
|
|
|
- if _, err := blobURL.Delete(ctx, blob.DeleteSnapshotsOptionInclude, blob.BlobAccessConditions{}); err != nil {
|
|
|
+ blobClient := b.containerClient.NewBlobClient(name)
|
|
|
+ opt := &blob.DeleteOptions{
|
|
|
+ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
|
|
|
+ }
|
|
|
+ if _, err := blobClient.Delete(ctx, opt); err != nil {
|
|
|
return errors.Wrapf(err, "error deleting blob, address: %s", name)
|
|
|
}
|
|
|
return nil
|
|
|
@@ -329,16 +313,13 @@ func (b *AzureStorage) Remove(name string) error {
|
|
|
func (b *AzureStorage) Exists(name string) (bool, error) {
|
|
|
name = trimLeading(name)
|
|
|
ctx := context.Background()
|
|
|
- blobURL := getBlobURL(name, b.containerURL)
|
|
|
- if _, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{}); err != nil {
|
|
|
- var se blob.StorageError
|
|
|
- if errors.As(err, &se) && se.ServiceCode() == blob.ServiceCodeBlobNotFound {
|
|
|
+ blobClient := b.containerClient.NewBlobClient(name)
|
|
|
+ if _, err := blobClient.GetProperties(ctx, nil); err != nil {
|
|
|
+ if b.IsObjNotFoundErr(err) {
|
|
|
return false, nil
|
|
|
}
|
|
|
-
|
|
|
return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
|
|
|
}
|
|
|
-
|
|
|
return true, nil
|
|
|
}
|
|
|
|
|
|
@@ -356,61 +337,38 @@ func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
|
|
|
path = strings.TrimSuffix(path, DirDelim) + DirDelim
|
|
|
}
|
|
|
|
|
|
- marker := blob.Marker{}
|
|
|
- listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
|
|
|
-
|
|
|
- var names []string
|
|
|
- for i := 1; ; i++ {
|
|
|
- var blobItems []blob.BlobItemInternal
|
|
|
-
|
|
|
- list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
|
|
|
+ var stats []*StorageInfo
|
|
|
+ list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
|
|
|
+ Prefix: &path,
|
|
|
+ })
|
|
|
+ for list.More() {
|
|
|
+ page, err := list.NextPage(ctx)
|
|
|
if err != nil {
|
|
|
- return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
|
|
|
+ return nil, fmt.Errorf("failed to retrieve page: %s", err)
|
|
|
}
|
|
|
-
|
|
|
- marker = list.NextMarker
|
|
|
- blobItems = list.Segment.BlobItems
|
|
|
-
|
|
|
- for _, blob := range blobItems {
|
|
|
- names = append(names, blob.Name)
|
|
|
+ segment := page.ListBlobsHierarchySegmentResponse.Segment
|
|
|
+ if segment == nil {
|
|
|
+ continue
|
|
|
}
|
|
|
-
|
|
|
- // Continue iterating if we are not done.
|
|
|
- if !marker.NotDone() {
|
|
|
- break
|
|
|
- }
|
|
|
-
|
|
|
- log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
|
|
|
- }
|
|
|
-
|
|
|
- // get the storage information for each blob (really unfortunate we have to do this)
|
|
|
- var lock sync.Mutex
|
|
|
- var stats []*StorageInfo
|
|
|
- var wg sync.WaitGroup
|
|
|
- wg.Add(len(names))
|
|
|
-
|
|
|
- for i := 0; i < len(names); i++ {
|
|
|
- go func(n string) {
|
|
|
- defer wg.Done()
|
|
|
-
|
|
|
- stat, err := b.Stat(n)
|
|
|
- if err != nil {
|
|
|
- log.Errorf("Error statting blob %s: %s", n, err)
|
|
|
- } else {
|
|
|
- lock.Lock()
|
|
|
- stats = append(stats, stat)
|
|
|
- lock.Unlock()
|
|
|
+ for _, blob := range segment.BlobItems {
|
|
|
+ if blob.Name == nil {
|
|
|
+ continue
|
|
|
}
|
|
|
- }(names[i])
|
|
|
+ if blob.Properties == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ stats = append(stats, &StorageInfo{
|
|
|
+ Name: trimName(*blob.Name),
|
|
|
+ Size: *blob.Properties.ContentLength,
|
|
|
+ ModTime: *blob.Properties.LastModified,
|
|
|
+ })
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- wg.Wait()
|
|
|
-
|
|
|
return stats, nil
|
|
|
}
|
|
|
|
|
|
func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
|
|
|
-
|
|
|
path = trimLeading(path)
|
|
|
|
|
|
log.Debugf("AzureStorage::ListDirectories(%s)", path)
|
|
|
@@ -422,198 +380,53 @@ func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
|
|
|
path = strings.TrimSuffix(path, DirDelim) + DirDelim
|
|
|
}
|
|
|
|
|
|
- marker := blob.Marker{}
|
|
|
- listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
|
|
|
-
|
|
|
var stats []*StorageInfo
|
|
|
- for i := 1; ; i++ {
|
|
|
- var blobPrefixes []blob.BlobPrefix
|
|
|
-
|
|
|
- list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
|
|
|
+ list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
|
|
|
+ Prefix: &path,
|
|
|
+ })
|
|
|
+ for list.More() {
|
|
|
+ page, err := list.NextPage(ctx)
|
|
|
if err != nil {
|
|
|
- return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
|
|
|
+ return nil, fmt.Errorf("failed to retrieve page: %s", err)
|
|
|
}
|
|
|
+ segment := page.ListBlobsHierarchySegmentResponse.Segment
|
|
|
+ if segment == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ for _, dir := range segment.BlobPrefixes {
|
|
|
+ if dir.Name == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
|
|
|
- marker = list.NextMarker
|
|
|
- blobPrefixes = list.Segment.BlobPrefixes
|
|
|
-
|
|
|
- for _, prefix := range blobPrefixes {
|
|
|
stats = append(stats, &StorageInfo{
|
|
|
- Name: trimLeading(prefix.Name),
|
|
|
+ Name: *dir.Name,
|
|
|
})
|
|
|
}
|
|
|
-
|
|
|
- // Continue iterating if we are not done.
|
|
|
- if !marker.NotDone() {
|
|
|
- break
|
|
|
- }
|
|
|
-
|
|
|
- log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(stats), i)
|
|
|
}
|
|
|
|
|
|
return stats, nil
|
|
|
}
|
|
|
|
|
|
-func (b *AzureStorage) getBlobReader(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) {
|
|
|
- log.Debugf("Getting blob: %s, offset: %d, length: %d", name, offset, length)
|
|
|
- if name == "" {
|
|
|
- return nil, errors.New("X-Ms-Error-Code: [EmptyContainerName]")
|
|
|
- }
|
|
|
- exists, err := b.Exists(name)
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrapf(err, "cannot get blob reader: %s", name)
|
|
|
- }
|
|
|
-
|
|
|
- if !exists {
|
|
|
- return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
|
|
|
- }
|
|
|
-
|
|
|
- blobURL := getBlobURL(name, b.containerURL)
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
|
|
|
- }
|
|
|
- var props *blob.BlobGetPropertiesResponse
|
|
|
- props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
|
|
|
- if err != nil {
|
|
|
- return nil, errors.Wrapf(err, "cannot get properties for container: %s", name)
|
|
|
- }
|
|
|
-
|
|
|
- var size int64
|
|
|
- // If a length is specified and it won't go past the end of the file,
|
|
|
- // then set it as the size.
|
|
|
- if length > 0 && length <= props.ContentLength()-offset {
|
|
|
- size = length
|
|
|
- log.Debugf("set size to length. size: %d, length: %d, offset: %d, name: %s", size, length, offset, name)
|
|
|
- } else {
|
|
|
- size = props.ContentLength() - offset
|
|
|
- log.Debugf("set size to go to EOF. contentlength: %d, size: %d, length: %d, offset: %d, name: %s", props.ContentLength(), size, length, offset, name)
|
|
|
- }
|
|
|
-
|
|
|
- destBuffer := make([]byte, size)
|
|
|
-
|
|
|
- if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size,
|
|
|
- destBuffer, blob.DownloadFromBlobOptions{
|
|
|
- BlockSize: blob.BlobDefaultDownloadBlockSize,
|
|
|
- Parallelism: uint16(3),
|
|
|
- Progress: nil,
|
|
|
- RetryReaderOptionsPerBlock: blob.RetryReaderOptions{
|
|
|
- MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
|
|
|
- },
|
|
|
- },
|
|
|
- ); err != nil {
|
|
|
- return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL)
|
|
|
- }
|
|
|
-
|
|
|
- return io.NopCloser(bytes.NewReader(destBuffer)), nil
|
|
|
-}
|
|
|
-
|
|
|
-func getAzureStorageCredentials(conf AzureConfig) (blob.Credential, error) {
|
|
|
- if conf.MSIResource != "" || conf.UserAssignedID != "" {
|
|
|
- spt, err := getServicePrincipalToken(conf)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if err := spt.Refresh(); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
-
|
|
|
- return blob.NewTokenCredential(spt.Token().AccessToken, func(tc blob.TokenCredential) time.Duration {
|
|
|
- err := spt.Refresh()
|
|
|
- if err != nil {
|
|
|
- log.Errorf("could not refresh MSI token. err: %s", err)
|
|
|
- // Retry later as the error can be related to API throttling
|
|
|
- return 30 * time.Second
|
|
|
- }
|
|
|
- tc.SetToken(spt.Token().AccessToken)
|
|
|
- return spt.Token().Expires().Sub(time.Now().Add(2 * time.Minute))
|
|
|
- }), nil
|
|
|
+// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
|
|
|
+func (b *AzureStorage) IsObjNotFoundErr(err error) bool {
|
|
|
+ if err == nil {
|
|
|
+ return false
|
|
|
}
|
|
|
-
|
|
|
- credential, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- return credential, nil
|
|
|
+ return bloberror.HasCode(err, bloberror.BlobNotFound) || bloberror.HasCode(err, bloberror.InvalidURI)
|
|
|
}
|
|
|
|
|
|
-func getServicePrincipalToken(conf AzureConfig) (*adal.ServicePrincipalToken, error) {
|
|
|
- resource := conf.MSIResource
|
|
|
- if resource == "" {
|
|
|
- resource = fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)
|
|
|
- }
|
|
|
-
|
|
|
- msiConfig := auth.MSIConfig{
|
|
|
- Resource: resource,
|
|
|
+// IsAccessDeniedErr returns true if access to object is denied.
|
|
|
+func (b *AzureStorage) IsAccessDeniedErr(err error) bool {
|
|
|
+ if err == nil {
|
|
|
+ return false
|
|
|
}
|
|
|
-
|
|
|
- if conf.UserAssignedID != "" {
|
|
|
- log.Debugf("using user assigned identity. clientId: %s", conf.UserAssignedID)
|
|
|
- msiConfig.ClientID = conf.UserAssignedID
|
|
|
- } else {
|
|
|
- log.Debugf("using system assigned identity")
|
|
|
- }
|
|
|
-
|
|
|
- return msiConfig.ServicePrincipalToken()
|
|
|
-}
|
|
|
-
|
|
|
-func getContainerURL(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
|
|
|
- credentials, err := getAzureStorageCredentials(conf)
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- return blob.ContainerURL{}, err
|
|
|
- }
|
|
|
-
|
|
|
- retryOptions := blob.RetryOptions{
|
|
|
- MaxTries: conf.PipelineConfig.MaxTries,
|
|
|
- TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
|
|
|
- RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
|
|
|
- MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
|
|
|
- }
|
|
|
-
|
|
|
- if deadline, ok := ctx.Deadline(); ok {
|
|
|
- retryOptions.TryTimeout = time.Until(deadline)
|
|
|
- }
|
|
|
-
|
|
|
- dt, err := DefaultAzureTransport(conf)
|
|
|
- if err != nil {
|
|
|
- return blob.ContainerURL{}, err
|
|
|
- }
|
|
|
- client := http.Client{
|
|
|
- Transport: dt,
|
|
|
- }
|
|
|
-
|
|
|
- p := blob.NewPipeline(credentials, blob.PipelineOptions{
|
|
|
- Retry: retryOptions,
|
|
|
- Telemetry: blob.TelemetryOptions{Value: "Kubecost"},
|
|
|
- RequestLog: blob.RequestLogOptions{
|
|
|
- // Log a warning if an operation takes longer than the specified duration.
|
|
|
- // (-1=no logging; 0=default 3s threshold)
|
|
|
- LogWarningIfTryOverThreshold: -1,
|
|
|
- },
|
|
|
- Log: pipeline.LogOptions{
|
|
|
- ShouldLog: nil,
|
|
|
- },
|
|
|
- HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
|
|
|
- return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
|
|
- resp, err := client.Do(request.WithContext(ctx))
|
|
|
-
|
|
|
- return pipeline.NewHTTPResponse(resp), err
|
|
|
- }
|
|
|
- }),
|
|
|
- })
|
|
|
- u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
|
|
|
- if err != nil {
|
|
|
- return blob.ContainerURL{}, err
|
|
|
- }
|
|
|
- service := blob.NewServiceURL(*u, p)
|
|
|
-
|
|
|
- return service.NewContainerURL(conf.ContainerName), nil
|
|
|
+ return bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) || bloberror.HasCode(err, bloberror.InsufficientAccountPermissions)
|
|
|
}
|
|
|
|
|
|
func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
|
|
|
tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
|
|
|
if err != nil {
|
|
|
- return nil, err
|
|
|
+ return nil, fmt.Errorf("error creating TLS config: %w", err)
|
|
|
}
|
|
|
|
|
|
if config.HTTPConfig.InsecureSkipVerify {
|
|
|
@@ -640,28 +453,75 @@ func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
-func getContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
|
|
|
- c, err := getContainerURL(ctx, conf)
|
|
|
+func getContainerClient(conf AzureConfig) (*container.Client, error) {
|
|
|
+ dt, err := DefaultAzureTransport(conf)
|
|
|
if err != nil {
|
|
|
- return blob.ContainerURL{}, err
|
|
|
+ return nil, fmt.Errorf("error creating default transport: %w", err)
|
|
|
+ }
|
|
|
+ opt := &container.ClientOptions{
|
|
|
+ ClientOptions: azcore.ClientOptions{
|
|
|
+ Retry: policy.RetryOptions{
|
|
|
+ MaxRetries: conf.PipelineConfig.MaxTries,
|
|
|
+ TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
|
|
|
+ RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
|
|
|
+ MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
|
|
|
+ },
|
|
|
+ Telemetry: policy.TelemetryOptions{
|
|
|
+ ApplicationID: "Thanos",
|
|
|
+ },
|
|
|
+ Transport: &http.Client{Transport: dt},
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ // Use connection string if set
|
|
|
+ if conf.StorageConnectionString != "" {
|
|
|
+ containerClient, err := container.NewClientFromConnectionString(conf.StorageConnectionString, conf.ContainerName, opt)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error creating client from connection string: %w", err)
|
|
|
+ }
|
|
|
+ return containerClient, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.Endpoint == "" {
|
|
|
+ conf.Endpoint = "blob.core.windows.net"
|
|
|
+ }
|
|
|
+
|
|
|
+ containerURL := fmt.Sprintf("https://%s.%s/%s", conf.StorageAccountName, conf.Endpoint, conf.ContainerName)
|
|
|
+
|
|
|
+ // Use shared keys if set
|
|
|
+ if conf.StorageAccountKey != "" {
|
|
|
+ cred, err := container.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error getting shared key credential: %w", err)
|
|
|
+ }
|
|
|
+ containerClient, err := container.NewClientWithSharedKeyCredential(containerURL, cred, opt)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error creating client with shared key credential: %w", err)
|
|
|
+ }
|
|
|
+ return containerClient, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Otherwise use a token credential
|
|
|
+ var cred azcore.TokenCredential
|
|
|
+
|
|
|
+ // Use Managed Identity Credential if a user assigned ID is set
|
|
|
+ if conf.UserAssignedID != "" {
|
|
|
+ msiOpt := &azidentity.ManagedIdentityCredentialOptions{}
|
|
|
+ msiOpt.ID = azidentity.ClientID(conf.UserAssignedID)
|
|
|
+ cred, err = azidentity.NewManagedIdentityCredential(msiOpt)
|
|
|
+ } else {
|
|
|
+ // Otherwise use Default Azure Credential
|
|
|
+ cred, err = azidentity.NewDefaultAzureCredential(nil)
|
|
|
}
|
|
|
- // Getting container properties to check if it exists or not. Returns error which will be parsed further.
|
|
|
- _, err = c.GetProperties(ctx, blob.LeaseAccessConditions{})
|
|
|
- return c, err
|
|
|
-}
|
|
|
|
|
|
-func createContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
|
|
|
- c, err := getContainerURL(ctx, conf)
|
|
|
if err != nil {
|
|
|
- return blob.ContainerURL{}, err
|
|
|
+ return nil, fmt.Errorf("error creating token credential: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ containerClient, err := container.NewClient(containerURL, cred, opt)
|
|
|
+ if err != nil {
|
|
|
+ return nil, fmt.Errorf("error creating client from token credential: %w", err)
|
|
|
}
|
|
|
- _, err = c.Create(
|
|
|
- ctx,
|
|
|
- blob.Metadata{},
|
|
|
- blob.PublicAccessNone)
|
|
|
- return c, err
|
|
|
-}
|
|
|
|
|
|
-func getBlobURL(blobName string, c blob.ContainerURL) blob.BlockBlobURL {
|
|
|
- return c.NewBlockBlobURL(blobName)
|
|
|
+ return containerClient, nil
|
|
|
}
|