| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667 |
- package storage
- // Fork from Thanos S3 Bucket support to reuse configuration options
- // Licensed under the Apache License 2.0
- // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
- import (
- "bytes"
- "context"
- "fmt"
- "io"
- "net"
- "net/http"
- "net/url"
- "strings"
- "sync"
- "time"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/Azure/azure-pipeline-go/pipeline"
- blob "github.com/Azure/azure-storage-blob-go/azblob"
- "github.com/Azure/go-autorest/autorest/adal"
- "github.com/Azure/go-autorest/autorest/azure/auth"
- "github.com/pkg/errors"
- "github.com/prometheus/common/model"
- "gopkg.in/yaml.v2"
- )
- const (
- azureDefaultEndpoint = "blob.core.windows.net"
- )
- // Set default retry values to default Azure values. 0 = use Default Azure.
- var defaultAzureConfig = AzureConfig{
- PipelineConfig: PipelineConfig{
- MaxTries: 0,
- TryTimeout: 0,
- RetryDelay: 0,
- MaxRetryDelay: 0,
- },
- ReaderConfig: ReaderConfig{
- MaxRetryRequests: 0,
- },
- HTTPConfig: AzureHTTPConfig{
- IdleConnTimeout: model.Duration(90 * time.Second),
- ResponseHeaderTimeout: model.Duration(2 * time.Minute),
- TLSHandshakeTimeout: model.Duration(10 * time.Second),
- ExpectContinueTimeout: model.Duration(1 * time.Second),
- MaxIdleConns: 100,
- MaxIdleConnsPerHost: 100,
- MaxConnsPerHost: 0,
- DisableCompression: false,
- },
- }
- func init() {
- // Disable `ForceLog` in Azure storage module
- // As the time of this patch, the logging function in the storage module isn't correctly
- // detecting expected REST errors like 404 and so outputs them to syslog along with a stacktrace.
- // https://github.com/Azure/azure-storage-blob-go/issues/214
- //
- // This needs to be done at startup because the underlying variable is not thread safe.
- // https://github.com/Azure/azure-pipeline-go/blob/dc95902f1d32034f8f743ccc6c3f2eb36b84da27/pipeline/core.go#L276-L283
- pipeline.SetForceLogEnabled(false)
- }
- // AzureConfig Azure storage configuration.
- type AzureConfig struct {
- StorageAccountName string `yaml:"storage_account"`
- StorageAccountKey string `yaml:"storage_account_key"`
- ContainerName string `yaml:"container"`
- Endpoint string `yaml:"endpoint"`
- MaxRetries int `yaml:"max_retries"`
- MSIResource string `yaml:"msi_resource"`
- UserAssignedID string `yaml:"user_assigned_id"`
- PipelineConfig PipelineConfig `yaml:"pipeline_config"`
- ReaderConfig ReaderConfig `yaml:"reader_config"`
- HTTPConfig AzureHTTPConfig `yaml:"http_config"`
- }
- type ReaderConfig struct {
- MaxRetryRequests int `yaml:"max_retry_requests"`
- }
- type PipelineConfig struct {
- MaxTries int32 `yaml:"max_tries"`
- TryTimeout model.Duration `yaml:"try_timeout"`
- RetryDelay model.Duration `yaml:"retry_delay"`
- MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
- }
- type AzureHTTPConfig struct {
- IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
- ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
- InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
- TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"`
- ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
- MaxIdleConns int `yaml:"max_idle_conns"`
- MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
- MaxConnsPerHost int `yaml:"max_conns_per_host"`
- DisableCompression bool `yaml:"disable_compression"`
- TLSConfig TLSConfig `yaml:"tls_config"`
- }
- // AzureStorage implements the storage.Storage interface against Azure APIs.
- type AzureStorage struct {
- name string
- containerURL blob.ContainerURL
- config *AzureConfig
- }
- // Validate checks to see if any of the config options are set.
- func (conf *AzureConfig) validate() error {
- var errMsg []string
- if conf.MSIResource == "" {
- if conf.UserAssignedID == "" {
- if conf.StorageAccountName == "" ||
- conf.StorageAccountKey == "" {
- errMsg = append(errMsg, "invalid Azure storage configuration")
- }
- if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
- errMsg = append(errMsg, "no Azure storage_account specified while storage_account_key is present in config file; both should be present")
- }
- if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
- errMsg = append(errMsg, "no Azure storage_account_key specified while storage_account is present in config file; both should be present")
- }
- } else {
- if conf.StorageAccountName == "" {
- errMsg = append(errMsg, "UserAssignedID is configured but storage account name is missing")
- }
- if conf.StorageAccountKey != "" {
- errMsg = append(errMsg, "UserAssignedID is configured but storage account key is used")
- }
- }
- } else {
- if conf.StorageAccountName == "" {
- errMsg = append(errMsg, "MSI resource is configured but storage account name is missing")
- }
- if conf.StorageAccountKey != "" {
- errMsg = append(errMsg, "MSI resource is configured but storage account key is used")
- }
- }
- if conf.ContainerName == "" {
- errMsg = append(errMsg, "no Azure container specified")
- }
- if conf.Endpoint == "" {
- conf.Endpoint = azureDefaultEndpoint
- }
- if conf.PipelineConfig.MaxTries < 0 {
- errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
- }
- if conf.ReaderConfig.MaxRetryRequests < 0 {
- errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
- }
- if len(errMsg) > 0 {
- return errors.New(strings.Join(errMsg, ", "))
- }
- return nil
- }
- // parseAzureConfig unmarshals a buffer into a Config with default values.
- func parseAzureConfig(conf []byte) (AzureConfig, error) {
- config := defaultAzureConfig
- if err := yaml.UnmarshalStrict(conf, &config); err != nil {
- return AzureConfig{}, err
- }
- // If we don't have config specific retry values but we do have the generic MaxRetries.
- // This is for backwards compatibility but also ease of configuration.
- if config.MaxRetries > 0 {
- if config.PipelineConfig.MaxTries == 0 {
- config.PipelineConfig.MaxTries = int32(config.MaxRetries)
- }
- if config.ReaderConfig.MaxRetryRequests == 0 {
- config.ReaderConfig.MaxRetryRequests = config.MaxRetries
- }
- }
- return config, nil
- }
- // NewAzureStorage returns a new Storage using the provided Azure config.
- func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
- log.Debugf("Creating new Azure Bucket Connection")
- conf, err := parseAzureConfig(azureConfig)
- if err != nil {
- return nil, err
- }
- return NewAzureStorageWith(conf)
- }
- // NewAzureStorageWith returns a new Storage using the provided Azure config struct.
- func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
- if err := conf.validate(); err != nil {
- return nil, err
- }
- ctx := context.Background()
- container, err := createContainer(ctx, conf)
- if err != nil {
- ret, ok := err.(blob.StorageError)
- if !ok {
- return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
- }
- if ret.ServiceCode() == "ContainerAlreadyExists" {
- log.Debugf("Getting connection to existing Azure blob container: %s", conf.ContainerName)
- container, err = getContainer(ctx, conf)
- if err != nil {
- return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
- }
- } else {
- return nil, errors.Wrapf(err, "error creating Azure blob container: %s", container)
- }
- } else {
- log.Infof("Azure blob container successfully created. Address: %s", container)
- }
- return &AzureStorage{
- name: conf.ContainerName,
- containerURL: container,
- config: &conf,
- }, nil
- }
- // Name returns the bucket name for azure storage.
- func (as *AzureStorage) Name() string {
- return as.name
- }
- // StorageType returns a string identifier for the type of storage used by the implementation.
- func (as *AzureStorage) StorageType() StorageType {
- return StorageTypeBucketAzure
- }
- // FullPath returns the storage working path combined with the path provided
- func (as *AzureStorage) FullPath(name string) string {
- name = trimLeading(name)
- return name
- }
- // Stat returns the StorageStats for the specific path.
- func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
- name = trimLeading(name)
- ctx := context.Background()
- blobURL := getBlobURL(name, b.containerURL)
- props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
- if err != nil {
- return nil, err
- }
- return &StorageInfo{
- Name: trimName(name),
- Size: props.ContentLength(),
- ModTime: props.LastModified(),
- }, nil
- }
- // Read uses the relative path of the storage combined with the provided path to
- // read the contents.
- func (b *AzureStorage) Read(name string) ([]byte, error) {
- name = trimLeading(name)
- ctx := context.Background()
- log.Debugf("AzureStorage::Read(%s)", name)
- reader, err := b.getBlobReader(ctx, name, 0, blob.CountToEnd)
- if err != nil {
- return nil, err
- }
- data, err := io.ReadAll(reader)
- if err != nil {
- return nil, err
- }
- return data, nil
- }
- // Write uses the relative path of the storage combined with the provided path
- // to write a new file or overwrite an existing file.
- func (b *AzureStorage) Write(name string, data []byte) error {
- name = trimLeading(name)
- ctx := context.Background()
- log.Debugf("AzureStorage::Write(%s)", name)
- blobURL := getBlobURL(name, b.containerURL)
- r := bytes.NewReader(data)
- if _, err := blob.UploadStreamToBlockBlob(ctx, r, blobURL,
- blob.UploadStreamToBlockBlobOptions{
- BufferSize: len(data),
- MaxBuffers: 1,
- },
- ); err != nil {
- return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
- }
- return nil
- }
- // Remove uses the relative path of the storage combined with the provided path to
- // remove a file from storage permanently.
- func (b *AzureStorage) Remove(name string) error {
- name = trimLeading(name)
- log.Debugf("AzureStorage::Remove(%s)", name)
- ctx := context.Background()
- blobURL := getBlobURL(name, b.containerURL)
- if _, err := blobURL.Delete(ctx, blob.DeleteSnapshotsOptionInclude, blob.BlobAccessConditions{}); err != nil {
- return errors.Wrapf(err, "error deleting blob, address: %s", name)
- }
- return nil
- }
- // Exists uses the relative path of the storage combined with the provided path to
- // determine if the file exists.
- func (b *AzureStorage) Exists(name string) (bool, error) {
- name = trimLeading(name)
- ctx := context.Background()
- blobURL := getBlobURL(name, b.containerURL)
- if _, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{}); err != nil {
- var se blob.StorageError
- if errors.As(err, &se) && se.ServiceCode() == blob.ServiceCodeBlobNotFound {
- return false, nil
- }
- return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
- }
- return true, nil
- }
- // List uses the relative path of the storage combined with the provided path to return
- // storage information for the files.
- func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
- path = trimLeading(path)
- log.Debugf("AzureStorage::List(%s)", path)
- ctx := context.Background()
- // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
- // object itself as one prefix item.
- if path != "" {
- path = strings.TrimSuffix(path, DirDelim) + DirDelim
- }
- marker := blob.Marker{}
- listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
- var names []string
- for i := 1; ; i++ {
- var blobItems []blob.BlobItemInternal
- list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
- if err != nil {
- return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
- }
- marker = list.NextMarker
- blobItems = list.Segment.BlobItems
- for _, blob := range blobItems {
- names = append(names, blob.Name)
- }
- // Continue iterating if we are not done.
- if !marker.NotDone() {
- break
- }
- log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
- }
- // get the storage information for each blob (really unfortunate we have to do this)
- var lock sync.Mutex
- var stats []*StorageInfo
- var wg sync.WaitGroup
- wg.Add(len(names))
- for i := 0; i < len(names); i++ {
- go func(n string) {
- defer wg.Done()
- stat, err := b.Stat(n)
- if err != nil {
- log.Errorf("Error statting blob %s: %s", n, err)
- } else {
- lock.Lock()
- stats = append(stats, stat)
- lock.Unlock()
- }
- }(names[i])
- }
- wg.Wait()
- return stats, nil
- }
- func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
- path = trimLeading(path)
- log.Debugf("AzureStorage::ListDirectories(%s)", path)
- ctx := context.Background()
- // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
- // object itself as one prefix item.
- if path != "" {
- path = strings.TrimSuffix(path, DirDelim) + DirDelim
- }
- marker := blob.Marker{}
- listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
- var stats []*StorageInfo
- for i := 1; ; i++ {
- var blobPrefixes []blob.BlobPrefix
- list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
- if err != nil {
- return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
- }
- marker = list.NextMarker
- blobPrefixes = list.Segment.BlobPrefixes
- for _, prefix := range blobPrefixes {
- stats = append(stats, &StorageInfo{
- Name: trimLeading(prefix.Name),
- })
- }
- // Continue iterating if we are not done.
- if !marker.NotDone() {
- break
- }
- log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(stats), i)
- }
- return stats, nil
- }
- func (b *AzureStorage) getBlobReader(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) {
- log.Debugf("Getting blob: %s, offset: %d, length: %d", name, offset, length)
- if name == "" {
- return nil, errors.New("X-Ms-Error-Code: [EmptyContainerName]")
- }
- exists, err := b.Exists(name)
- if err != nil {
- return nil, errors.Wrapf(err, "cannot get blob reader: %s", name)
- }
- if !exists {
- return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
- }
- blobURL := getBlobURL(name, b.containerURL)
- if err != nil {
- return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
- }
- var props *blob.BlobGetPropertiesResponse
- props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
- if err != nil {
- return nil, errors.Wrapf(err, "cannot get properties for container: %s", name)
- }
- var size int64
- // If a length is specified and it won't go past the end of the file,
- // then set it as the size.
- if length > 0 && length <= props.ContentLength()-offset {
- size = length
- log.Debugf("set size to length. size: %d, length: %d, offset: %d, name: %s", size, length, offset, name)
- } else {
- size = props.ContentLength() - offset
- log.Debugf("set size to go to EOF. contentlength: %d, size: %d, length: %d, offset: %d, name: %s", props.ContentLength(), size, length, offset, name)
- }
- destBuffer := make([]byte, size)
- if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size,
- destBuffer, blob.DownloadFromBlobOptions{
- BlockSize: blob.BlobDefaultDownloadBlockSize,
- Parallelism: uint16(3),
- Progress: nil,
- RetryReaderOptionsPerBlock: blob.RetryReaderOptions{
- MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
- },
- },
- ); err != nil {
- return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL)
- }
- return io.NopCloser(bytes.NewReader(destBuffer)), nil
- }
- func getAzureStorageCredentials(conf AzureConfig) (blob.Credential, error) {
- if conf.MSIResource != "" || conf.UserAssignedID != "" {
- spt, err := getServicePrincipalToken(conf)
- if err != nil {
- return nil, err
- }
- if err := spt.Refresh(); err != nil {
- return nil, err
- }
- return blob.NewTokenCredential(spt.Token().AccessToken, func(tc blob.TokenCredential) time.Duration {
- err := spt.Refresh()
- if err != nil {
- log.Errorf("could not refresh MSI token. err: %s", err)
- // Retry later as the error can be related to API throttling
- return 30 * time.Second
- }
- tc.SetToken(spt.Token().AccessToken)
- return spt.Token().Expires().Sub(time.Now().Add(2 * time.Minute))
- }), nil
- }
- credential, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
- if err != nil {
- return nil, err
- }
- return credential, nil
- }
- func getServicePrincipalToken(conf AzureConfig) (*adal.ServicePrincipalToken, error) {
- resource := conf.MSIResource
- if resource == "" {
- resource = fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)
- }
- msiConfig := auth.MSIConfig{
- Resource: resource,
- }
- if conf.UserAssignedID != "" {
- log.Debugf("using user assigned identity. clientId: %s", conf.UserAssignedID)
- msiConfig.ClientID = conf.UserAssignedID
- } else {
- log.Debugf("using system assigned identity")
- }
- return msiConfig.ServicePrincipalToken()
- }
- func getContainerURL(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
- credentials, err := getAzureStorageCredentials(conf)
- if err != nil {
- return blob.ContainerURL{}, err
- }
- retryOptions := blob.RetryOptions{
- MaxTries: conf.PipelineConfig.MaxTries,
- TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
- RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
- MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
- }
- if deadline, ok := ctx.Deadline(); ok {
- retryOptions.TryTimeout = time.Until(deadline)
- }
- dt, err := DefaultAzureTransport(conf)
- if err != nil {
- return blob.ContainerURL{}, err
- }
- client := http.Client{
- Transport: dt,
- }
- p := blob.NewPipeline(credentials, blob.PipelineOptions{
- Retry: retryOptions,
- Telemetry: blob.TelemetryOptions{Value: "Kubecost"},
- RequestLog: blob.RequestLogOptions{
- // Log a warning if an operation takes longer than the specified duration.
- // (-1=no logging; 0=default 3s threshold)
- LogWarningIfTryOverThreshold: -1,
- },
- Log: pipeline.LogOptions{
- ShouldLog: nil,
- },
- HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
- return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
- resp, err := client.Do(request.WithContext(ctx))
- return pipeline.NewHTTPResponse(resp), err
- }
- }),
- })
- u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
- if err != nil {
- return blob.ContainerURL{}, err
- }
- service := blob.NewServiceURL(*u, p)
- return service.NewContainerURL(conf.ContainerName), nil
- }
- func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
- tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
- if err != nil {
- return nil, err
- }
- if config.HTTPConfig.InsecureSkipVerify {
- tlsConfig.InsecureSkipVerify = true
- }
- return &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- DialContext: (&net.Dialer{
- Timeout: 30 * time.Second,
- KeepAlive: 30 * time.Second,
- DualStack: true,
- }).DialContext,
- MaxIdleConns: config.HTTPConfig.MaxIdleConns,
- MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
- IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
- MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
- TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
- ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
- ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
- DisableCompression: config.HTTPConfig.DisableCompression,
- TLSClientConfig: tlsConfig,
- }, nil
- }
- func getContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
- c, err := getContainerURL(ctx, conf)
- if err != nil {
- return blob.ContainerURL{}, err
- }
- // Getting container properties to check if it exists or not. Returns error which will be parsed further.
- _, err = c.GetProperties(ctx, blob.LeaseAccessConditions{})
- return c, err
- }
- func createContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
- c, err := getContainerURL(ctx, conf)
- if err != nil {
- return blob.ContainerURL{}, err
- }
- _, err = c.Create(
- ctx,
- blob.Metadata{},
- blob.PublicAccessNone)
- return c, err
- }
- func getBlobURL(blobName string, c blob.ContainerURL) blob.BlockBlobURL {
- return c.NewBlockBlobURL(blobName)
- }
|