| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 |
- package storage
- // Fork from Thanos Azure Storage Bucket support to reuse configuration options
- // Licensed under the Apache License 2.0
- // https://github.com/thanos-io/objstore/blob/main/providers/azure/azure.go
- import (
- "bytes"
- "context"
- "fmt"
- "io"
- "net/http"
- "os"
- "path/filepath"
- "strings"
- "time"
- "github.com/Azure/azure-sdk-for-go/sdk/azcore"
- "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
- "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
- "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/pkg/errors"
- "github.com/prometheus/common/model"
- "gopkg.in/yaml.v2"
- )
- const (
- azureDefaultEndpoint = "blob.core.windows.net"
- )
- // Set default retry values to default Azure values. 0 = use Default Azure.
- var defaultAzureConfig = AzureConfig{
- PipelineConfig: PipelineConfig{
- MaxTries: 0,
- TryTimeout: 0,
- RetryDelay: 0,
- MaxRetryDelay: 0,
- },
- ReaderConfig: ReaderConfig{
- MaxRetryRequests: 0,
- },
- HTTPConfig: HTTPConfig{
- IdleConnTimeout: 90 * time.Second,
- ResponseHeaderTimeout: 2 * time.Minute,
- TLSHandshakeTimeout: 10 * time.Second,
- ExpectContinueTimeout: 1 * time.Second,
- MaxIdleConns: 100,
- MaxIdleConnsPerHost: 100,
- MaxConnsPerHost: 0,
- DisableCompression: false,
- },
- }
- // AzureConfig Azure storage configuration.
- type AzureConfig struct {
- StorageAccountName string `yaml:"storage_account"`
- StorageAccountKey string `yaml:"storage_account_key"`
- StorageConnectionString string `yaml:"storage_connection_string"`
- ContainerName string `yaml:"container"`
- Endpoint string `yaml:"endpoint"`
- MaxRetries int `yaml:"max_retries"`
- MSIResource string `yaml:"msi_resource"`
- UserAssignedID string `yaml:"user_assigned_id"`
- PipelineConfig PipelineConfig `yaml:"pipeline_config"`
- ReaderConfig ReaderConfig `yaml:"reader_config"`
- HTTPConfig HTTPConfig `yaml:"http_config"`
- }
- type ReaderConfig struct {
- MaxRetryRequests int `yaml:"max_retry_requests"`
- }
- type PipelineConfig struct {
- MaxTries int32 `yaml:"max_tries"`
- TryTimeout model.Duration `yaml:"try_timeout"`
- RetryDelay model.Duration `yaml:"retry_delay"`
- MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
- }
- // AzureStorage implements the storage.Storage interface against Azure APIs.
- type AzureStorage struct {
- name string
- containerClient *container.Client
- config *AzureConfig
- }
- // Validate checks to see if any of the config options are set.
- func (conf *AzureConfig) validate() error {
- var errMsg []string
- if conf.UserAssignedID != "" && conf.StorageAccountKey != "" {
- errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_account_key authentication")
- }
- if conf.UserAssignedID != "" && conf.StorageConnectionString != "" {
- errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_connection_string authentication")
- }
- if conf.StorageAccountKey != "" && conf.StorageConnectionString != "" {
- errMsg = append(errMsg, "storage_account_key and storage_connection_string cannot both be set")
- }
- if conf.StorageAccountName == "" {
- errMsg = append(errMsg, "storage_account_name is required but not configured")
- }
- if conf.ContainerName == "" {
- errMsg = append(errMsg, "no container specified")
- }
- if conf.PipelineConfig.MaxTries < 0 {
- errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
- }
- if conf.ReaderConfig.MaxRetryRequests < 0 {
- errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
- }
- if len(errMsg) > 0 {
- return errors.New(strings.Join(errMsg, ", "))
- }
- return nil
- }
- // parseAzureConfig unmarshals a buffer into a Config with default values.
- func parseAzureConfig(conf []byte) (AzureConfig, error) {
- config := defaultAzureConfig
- if err := yaml.UnmarshalStrict(conf, &config); err != nil {
- return AzureConfig{}, err
- }
- // If we don't have config specific retry values but we do have the generic MaxRetries.
- // This is for backwards compatibility but also ease of configuration.
- if config.MaxRetries > 0 {
- if config.PipelineConfig.MaxTries == 0 {
- config.PipelineConfig.MaxTries = int32(config.MaxRetries)
- }
- if config.ReaderConfig.MaxRetryRequests == 0 {
- config.ReaderConfig.MaxRetryRequests = config.MaxRetries
- }
- }
- return config, nil
- }
- // NewAzureStorage returns a new Storage using the provided Azure config.
- func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
- log.Debugf("Creating new Azure Bucket Connection")
- conf, err := parseAzureConfig(azureConfig)
- if err != nil {
- return nil, fmt.Errorf("error parsing azure storage config: %w", err)
- }
- return NewAzureStorageWith(conf)
- }
- // NewAzureStorageWith returns a new Storage using the provided Azure config struct.
- func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
- if err := conf.validate(); err != nil {
- return nil, fmt.Errorf("error validating azure storage config: %w", err)
- }
- containerClient, err := getContainerClient(conf)
- if err != nil {
- return nil, fmt.Errorf("error retrieving container client: %w", err)
- }
- // Check if storage account container already exists, and create one if it does not.
- ctx := context.Background()
- _, err = containerClient.GetProperties(ctx, &container.GetPropertiesOptions{})
- if err != nil {
- if !bloberror.HasCode(err, bloberror.ContainerNotFound) {
- return nil, err
- }
- _, err := containerClient.Create(ctx, nil)
- if err != nil {
- return nil, errors.Wrapf(err, "error creating Azure blob container: %s", conf.ContainerName)
- }
- log.Infof("Azure blob container successfully created %s", conf.ContainerName)
- }
- return &AzureStorage{
- name: conf.ContainerName,
- containerClient: containerClient,
- config: &conf,
- }, nil
- }
- // String returns the bucket name for azure storage.
- func (as *AzureStorage) String() string {
- return as.name
- }
- // StorageType returns a string identifier for the type of storage used by the implementation.
- func (as *AzureStorage) StorageType() StorageType {
- return StorageTypeBucketAzure
- }
- // FullPath returns the storage working path combined with the path provided
- func (as *AzureStorage) FullPath(name string) string {
- name = trimLeading(name)
- return name
- }
- // Stat returns the StorageStats for the specific path.
- func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
- name = trimLeading(name)
- ctx := context.Background()
- blobClient := b.containerClient.NewBlobClient(name)
- props, err := blobClient.GetProperties(ctx, nil)
- if err != nil {
- return nil, fmt.Errorf("error retrieving blob properties: %w", err)
- }
- return &StorageInfo{
- Name: trimName(name),
- Size: *props.ContentLength,
- ModTime: *props.LastModified,
- }, nil
- }
- // Read uses the relative path of the storage combined with the provided path to
- // read the contents.
- func (b *AzureStorage) Read(name string) ([]byte, error) {
- name = trimLeading(name)
- ctx := context.Background()
- log.Debugf("AzureStorage::Read::HTTPS(%s)", name)
- downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
- if err != nil {
- return nil, fmt.Errorf("AzureStorage: Read: failed to download blob %q: %w", name, err)
- }
- // NOTE: automatic retries are performed if the connection fails
- retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
- MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
- })
- defer retryReader.Close()
- // read the body into a buffer
- downloadedData := bytes.Buffer{}
- _, err = downloadedData.ReadFrom(retryReader)
- if err != nil {
- return nil, fmt.Errorf("AzureStorage: Read: failed to read downloaded data %w", err)
- }
- return downloadedData.Bytes(), nil
- }
- // ReadStream returns a streaming reader for the specified blob path.
- func (b *AzureStorage) ReadStream(path string) (io.ReadCloser, error) {
- path = trimLeading(path)
- ctx := context.Background()
- log.Debugf("AzureStorage::ReadStream::HTTPS(%s)", path)
- downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
- if err != nil {
- if b.IsObjNotFoundErr(err) {
- return nil, DoesNotExistError
- }
- return nil, fmt.Errorf("AzureStorage: ReadStream: failed to download blob %q: %w", path, err)
- }
- retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
- MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
- })
- return retryReader, nil
- }
- // ReadToLocalFile streams the specified blob at path to destPath on the local file system.
- func (b *AzureStorage) ReadToLocalFile(path, destPath string) error {
- path = trimLeading(path)
- ctx := context.Background()
- log.Debugf("AzureStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
- downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
- if err != nil {
- if b.IsObjNotFoundErr(err) {
- return DoesNotExistError
- }
- return fmt.Errorf("AzureStorage: ReadToLocalFile: failed to download blob %q to %q: %w", path, destPath, err)
- }
- // NOTE: automatic retries are performed if the connection fails.
- retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
- MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
- })
- defer retryReader.Close()
- if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
- return errors.Wrap(err, "creating destination directory")
- }
- f, err := os.Create(destPath)
- if err != nil {
- return errors.Wrapf(err, "creating destination file %s", destPath)
- }
- defer f.Close()
- // Use 1 MB buffer for streaming operations
- buf := make([]byte, 1024*1024)
- if _, err := io.CopyBuffer(f, retryReader, buf); err != nil {
- return errors.Wrapf(err, "streaming %s to %s", path, destPath)
- }
- return nil
- }
- // Write uses the relative path of the storage combined with the provided path
- // to write a new file or overwrite an existing file.
- func (b *AzureStorage) Write(name string, data []byte) error {
- name = trimLeading(name)
- ctx := context.Background()
- log.Debugf("AzureStorage::Write::HTTPS(%s)", name)
- r := bytes.NewReader(data)
- blobClient := b.containerClient.NewBlockBlobClient(name)
- opts := &blockblob.UploadStreamOptions{
- BlockSize: 3 * 1024 * 1024,
- Concurrency: 4,
- }
- if _, err := blobClient.UploadStream(ctx, r, opts); err != nil {
- return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
- }
- return nil
- }
- // Remove uses the relative path of the storage combined with the provided path to
- // remove a file from storage permanently.
- func (b *AzureStorage) Remove(name string) error {
- name = trimLeading(name)
- log.Debugf("AzureStorage::Remove::HTTPS(%s)", name)
- ctx := context.Background()
- blobClient := b.containerClient.NewBlobClient(name)
- opt := &blob.DeleteOptions{
- DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
- }
- if _, err := blobClient.Delete(ctx, opt); err != nil {
- return errors.Wrapf(err, "error deleting blob, address: %s", name)
- }
- return nil
- }
- // Exists uses the relative path of the storage combined with the provided path to
- // determine if the file exists.
- func (b *AzureStorage) Exists(name string) (bool, error) {
- name = trimLeading(name)
- ctx := context.Background()
- blobClient := b.containerClient.NewBlobClient(name)
- if _, err := blobClient.GetProperties(ctx, nil); err != nil {
- if b.IsObjNotFoundErr(err) {
- return false, nil
- }
- return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
- }
- return true, nil
- }
- // List uses the relative path of the storage combined with the provided path to return
- // storage information for the files.
- func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
- path = trimLeading(path)
- log.Debugf("AzureStorage::List::HTTPS(%s)", path)
- ctx := context.Background()
- // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
- // object itself as one prefix item.
- if path != "" {
- path = strings.TrimSuffix(path, DirDelim) + DirDelim
- }
- var stats []*StorageInfo
- list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
- Prefix: &path,
- })
- for list.More() {
- page, err := list.NextPage(ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to retrieve page: %s", err)
- }
- segment := page.ListBlobsHierarchySegmentResponse.Segment
- if segment == nil {
- continue
- }
- for _, blob := range segment.BlobItems {
- if blob.Name == nil {
- continue
- }
- if blob.Properties == nil {
- continue
- }
- stats = append(stats, &StorageInfo{
- Name: trimName(*blob.Name),
- Size: *blob.Properties.ContentLength,
- ModTime: *blob.Properties.LastModified,
- })
- }
- }
- return stats, nil
- }
- func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
- path = trimLeading(path)
- log.Debugf("AzureStorage::ListDirectories::HTTPS(%s)", path)
- ctx := context.Background()
- // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
- // object itself as one prefix item.
- if path != "" {
- path = strings.TrimSuffix(path, DirDelim) + DirDelim
- }
- var stats []*StorageInfo
- list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
- Prefix: &path,
- })
- for list.More() {
- page, err := list.NextPage(ctx)
- if err != nil {
- return nil, fmt.Errorf("failed to retrieve page: %s", err)
- }
- segment := page.ListBlobsHierarchySegmentResponse.Segment
- if segment == nil {
- continue
- }
- for _, dir := range segment.BlobPrefixes {
- if dir.Name == nil {
- continue
- }
- stats = append(stats, &StorageInfo{
- Name: *dir.Name,
- })
- }
- }
- return stats, nil
- }
- // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
- func (b *AzureStorage) IsObjNotFoundErr(err error) bool {
- if err == nil {
- return false
- }
- return bloberror.HasCode(err, bloberror.BlobNotFound) || bloberror.HasCode(err, bloberror.InvalidURI)
- }
- // IsAccessDeniedErr returns true if access to object is denied.
- func (b *AzureStorage) IsAccessDeniedErr(err error) bool {
- if err == nil {
- return false
- }
- return bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) || bloberror.HasCode(err, bloberror.InsufficientAccountPermissions)
- }
- func getContainerClient(conf AzureConfig) (*container.Client, error) {
- dt, err := conf.HTTPConfig.GetHTTPTransport()
- if err != nil {
- return nil, fmt.Errorf("error creating transport: %w", err)
- }
- opt := &container.ClientOptions{
- ClientOptions: azcore.ClientOptions{
- Retry: policy.RetryOptions{
- MaxRetries: conf.PipelineConfig.MaxTries,
- TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
- RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
- MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
- },
- Telemetry: policy.TelemetryOptions{
- ApplicationID: "Thanos",
- },
- Transport: &http.Client{Transport: dt},
- },
- }
- // Use connection string if set
- if conf.StorageConnectionString != "" {
- containerClient, err := container.NewClientFromConnectionString(conf.StorageConnectionString, conf.ContainerName, opt)
- if err != nil {
- return nil, fmt.Errorf("error creating client from connection string: %w", err)
- }
- log.Debugf("AzureStorage: New Azure client initialized for container '%s' using connection string", conf.ContainerName)
- return containerClient, nil
- }
- if conf.Endpoint == "" {
- conf.Endpoint = "blob.core.windows.net"
- }
- // HTTPS Protocol Configuration: Azure Storage always uses HTTPS protocol.
- // The containerURL is explicitly constructed with "https://" scheme.
- // All Azure blob operations (read, write, delete, list) use this HTTPS URL.
- containerURL := fmt.Sprintf("https://%s.%s/%s", conf.StorageAccountName, conf.Endpoint, conf.ContainerName)
- log.Debugf("AzureStorage: New Azure client initialized with '%s'", containerURL)
- // Use shared keys if set
- if conf.StorageAccountKey != "" {
- cred, err := container.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
- if err != nil {
- return nil, fmt.Errorf("error getting shared key credential: %w", err)
- }
- containerClient, err := container.NewClientWithSharedKeyCredential(containerURL, cred, opt)
- if err != nil {
- return nil, fmt.Errorf("error creating client with shared key credential: %w", err)
- }
- return containerClient, nil
- }
- // Otherwise use a token credential
- var cred azcore.TokenCredential
- // Use Managed Identity Credential if a user assigned ID is set
- if conf.UserAssignedID != "" {
- msiOpt := &azidentity.ManagedIdentityCredentialOptions{}
- msiOpt.ID = azidentity.ClientID(conf.UserAssignedID)
- cred, err = azidentity.NewManagedIdentityCredential(msiOpt)
- } else {
- // Otherwise use Default Azure Credential
- cred, err = azidentity.NewDefaultAzureCredential(nil)
- }
- if err != nil {
- return nil, fmt.Errorf("error creating token credential: %w", err)
- }
- containerClient, err := container.NewClient(containerURL, cred, opt)
- if err != nil {
- return nil, fmt.Errorf("error creating client from token credential: %w", err)
- }
- return containerClient, nil
- }
|