|
|
@@ -0,0 +1,647 @@
|
|
|
+package storage
|
|
|
+
|
|
|
+// Fork from Thanos S3 Bucket support to reuse configuration options
|
|
|
+// Licensed under the Apache License 2.0
|
|
|
+// https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "context"
|
|
|
+ "fmt"
|
|
|
+ "io"
|
|
|
+ "io/ioutil"
|
|
|
+ "net"
|
|
|
+ "net/http"
|
|
|
+ "net/url"
|
|
|
+ "regexp"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/kubecost/cost-model/pkg/log"
|
|
|
+
|
|
|
+ "github.com/Azure/azure-pipeline-go/pipeline"
|
|
|
+ blob "github.com/Azure/azure-storage-blob-go/azblob"
|
|
|
+ "github.com/Azure/go-autorest/autorest/adal"
|
|
|
+ "github.com/Azure/go-autorest/autorest/azure/auth"
|
|
|
+ "github.com/pkg/errors"
|
|
|
+ "github.com/prometheus/common/model"
|
|
|
+ "gopkg.in/yaml.v2"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ azureDefaultEndpoint = "blob.core.windows.net"
|
|
|
+)
|
|
|
+
|
|
|
+var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`)
|
|
|
+
|
|
|
+// Set default retry values to default Azure values. 0 = use Default Azure.
|
|
|
+var defaultAzureConfig = AzureConfig{
|
|
|
+ PipelineConfig: PipelineConfig{
|
|
|
+ MaxTries: 0,
|
|
|
+ TryTimeout: 0,
|
|
|
+ RetryDelay: 0,
|
|
|
+ MaxRetryDelay: 0,
|
|
|
+ },
|
|
|
+ ReaderConfig: ReaderConfig{
|
|
|
+ MaxRetryRequests: 0,
|
|
|
+ },
|
|
|
+ HTTPConfig: AzureHTTPConfig{
|
|
|
+ IdleConnTimeout: model.Duration(90 * time.Second),
|
|
|
+ ResponseHeaderTimeout: model.Duration(2 * time.Minute),
|
|
|
+ TLSHandshakeTimeout: model.Duration(10 * time.Second),
|
|
|
+ ExpectContinueTimeout: model.Duration(1 * time.Second),
|
|
|
+ MaxIdleConns: 100,
|
|
|
+ MaxIdleConnsPerHost: 100,
|
|
|
+ MaxConnsPerHost: 0,
|
|
|
+ DisableCompression: false,
|
|
|
+ },
|
|
|
+}
|
|
|
+
|
|
|
+func init() {
|
|
|
+ // Disable `ForceLog` in Azure storage module
|
|
|
+ // As the time of this patch, the logging function in the storage module isn't correctly
|
|
|
+ // detecting expected REST errors like 404 and so outputs them to syslog along with a stacktrace.
|
|
|
+ // https://github.com/Azure/azure-storage-blob-go/issues/214
|
|
|
+ //
|
|
|
+ // This needs to be done at startup because the underlying variable is not thread safe.
|
|
|
+ // https://github.com/Azure/azure-pipeline-go/blob/dc95902f1d32034f8f743ccc6c3f2eb36b84da27/pipeline/core.go#L276-L283
|
|
|
+ pipeline.SetForceLogEnabled(false)
|
|
|
+}
|
|
|
+
|
|
|
+// AzureConfig Azure storage configuration.
|
|
|
+type AzureConfig struct {
|
|
|
+ StorageAccountName string `yaml:"storage_account"`
|
|
|
+ StorageAccountKey string `yaml:"storage_account_key"`
|
|
|
+ ContainerName string `yaml:"container"`
|
|
|
+ Endpoint string `yaml:"endpoint"`
|
|
|
+ MaxRetries int `yaml:"max_retries"`
|
|
|
+ MSIResource string `yaml:"msi_resource"`
|
|
|
+ UserAssignedID string `yaml:"user_assigned_id"`
|
|
|
+ PipelineConfig PipelineConfig `yaml:"pipeline_config"`
|
|
|
+ ReaderConfig ReaderConfig `yaml:"reader_config"`
|
|
|
+ HTTPConfig AzureHTTPConfig `yaml:"http_config"`
|
|
|
+}
|
|
|
+
|
|
|
+type ReaderConfig struct {
|
|
|
+ MaxRetryRequests int `yaml:"max_retry_requests"`
|
|
|
+}
|
|
|
+
|
|
|
+type PipelineConfig struct {
|
|
|
+ MaxTries int32 `yaml:"max_tries"`
|
|
|
+ TryTimeout model.Duration `yaml:"try_timeout"`
|
|
|
+ RetryDelay model.Duration `yaml:"retry_delay"`
|
|
|
+ MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
|
|
|
+}
|
|
|
+
|
|
|
+type AzureHTTPConfig struct {
|
|
|
+ IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
|
|
|
+ ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
|
|
|
+ InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
|
|
|
+
|
|
|
+ TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"`
|
|
|
+ ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
|
|
|
+ MaxIdleConns int `yaml:"max_idle_conns"`
|
|
|
+ MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
|
|
|
+ MaxConnsPerHost int `yaml:"max_conns_per_host"`
|
|
|
+ DisableCompression bool `yaml:"disable_compression"`
|
|
|
+
|
|
|
+ TLSConfig TLSConfig `yaml:"tls_config"`
|
|
|
+}
|
|
|
+
|
|
|
+// AzureStorage implements the storage.Storage interface against Azure APIs.
|
|
|
+type AzureStorage struct {
|
|
|
+ name string
|
|
|
+ containerURL blob.ContainerURL
|
|
|
+ config *AzureConfig
|
|
|
+}
|
|
|
+
|
|
|
+// Validate checks to see if any of the config options are set.
|
|
|
+func (conf *AzureConfig) validate() error {
|
|
|
+ var errMsg []string
|
|
|
+ if conf.MSIResource == "" {
|
|
|
+ if conf.UserAssignedID == "" {
|
|
|
+ if conf.StorageAccountName == "" ||
|
|
|
+ conf.StorageAccountKey == "" {
|
|
|
+ errMsg = append(errMsg, "invalid Azure storage configuration")
|
|
|
+ }
|
|
|
+ if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
|
|
|
+ errMsg = append(errMsg, "no Azure storage_account specified while storage_account_key is present in config file; both should be present")
|
|
|
+ }
|
|
|
+ if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
|
|
|
+ errMsg = append(errMsg, "no Azure storage_account_key specified while storage_account is present in config file; both should be present")
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if conf.StorageAccountName == "" {
|
|
|
+ errMsg = append(errMsg, "UserAssignedID is configured but storage account name is missing")
|
|
|
+ }
|
|
|
+ if conf.StorageAccountKey != "" {
|
|
|
+ errMsg = append(errMsg, "UserAssignedID is configured but storage account key is used")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if conf.StorageAccountName == "" {
|
|
|
+ errMsg = append(errMsg, "MSI resource is configured but storage account name is missing")
|
|
|
+ }
|
|
|
+ if conf.StorageAccountKey != "" {
|
|
|
+ errMsg = append(errMsg, "MSI resource is configured but storage account key is used")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.ContainerName == "" {
|
|
|
+ errMsg = append(errMsg, "no Azure container specified")
|
|
|
+ }
|
|
|
+ if conf.Endpoint == "" {
|
|
|
+ conf.Endpoint = azureDefaultEndpoint
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.PipelineConfig.MaxTries < 0 {
|
|
|
+ errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.ReaderConfig.MaxRetryRequests < 0 {
|
|
|
+ errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(errMsg) > 0 {
|
|
|
+ return errors.New(strings.Join(errMsg, ", "))
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// parseAzureConfig unmarshals a buffer into a Config with default values.
|
|
|
+func parseAzureConfig(conf []byte) (AzureConfig, error) {
|
|
|
+ config := defaultAzureConfig
|
|
|
+ if err := yaml.UnmarshalStrict(conf, &config); err != nil {
|
|
|
+ return AzureConfig{}, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we don't have config specific retry values but we do have the generic MaxRetries.
|
|
|
+ // This is for backwards compatibility but also ease of configuration.
|
|
|
+ if config.MaxRetries > 0 {
|
|
|
+ if config.PipelineConfig.MaxTries == 0 {
|
|
|
+ config.PipelineConfig.MaxTries = int32(config.MaxRetries)
|
|
|
+ }
|
|
|
+ if config.ReaderConfig.MaxRetryRequests == 0 {
|
|
|
+ config.ReaderConfig.MaxRetryRequests = config.MaxRetries
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return config, nil
|
|
|
+}
|
|
|
+
|
|
|
+// NewAzureStorage returns a new Storage using the provided Azure config.
|
|
|
+func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
|
|
|
+ log.Debugf("Creating new Azure Bucket Connection")
|
|
|
+
|
|
|
+ conf, err := parseAzureConfig(azureConfig)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return NewAzureStorageWith(conf)
|
|
|
+}
|
|
|
+
|
|
|
+// NewAzureStorageWith returns a new Storage using the provided Azure config struct.
|
|
|
+func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
|
|
|
+ if err := conf.validate(); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ container, err := createContainer(ctx, conf)
|
|
|
+ if err != nil {
|
|
|
+ ret, ok := err.(blob.StorageError)
|
|
|
+ if !ok {
|
|
|
+ return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
|
|
|
+ }
|
|
|
+ if ret.ServiceCode() == "ContainerAlreadyExists" {
|
|
|
+ log.Debugf("Getting connection to existing Azure blob container: %s", conf.ContainerName)
|
|
|
+ container, err = getContainer(ctx, conf)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return nil, errors.Wrapf(err, "error creating Azure blob container: %s", container)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.Infof("Azure blob container successfully created. Address: %s", container)
|
|
|
+ }
|
|
|
+
|
|
|
+ return &AzureStorage{
|
|
|
+ name: conf.ContainerName,
|
|
|
+ containerURL: container,
|
|
|
+ config: &conf,
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Name returns the bucket name for azure storage.
|
|
|
+func (as *AzureStorage) Name() string {
|
|
|
+ return as.name
|
|
|
+}
|
|
|
+
|
|
|
+// StorageType returns a string identifier for the type of storage used by the implementation.
|
|
|
+func (as *AzureStorage) StorageType() StorageType {
|
|
|
+ return StorageTypeBucketAzure
|
|
|
+}
|
|
|
+
|
|
|
+// FullPath returns the storage working path combined with the path provided
|
|
|
+func (as *AzureStorage) FullPath(name string) string {
|
|
|
+ name = trimLeading(name)
|
|
|
+
|
|
|
+ return name
|
|
|
+}
|
|
|
+
|
|
|
+// Stat returns the StorageStats for the specific path.
|
|
|
+func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
|
|
|
+ name = trimLeading(name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ blobURL := getBlobURL(name, b.containerURL)
|
|
|
+ props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return &StorageInfo{
|
|
|
+ Name: trimName(name),
|
|
|
+ Size: props.ContentLength(),
|
|
|
+ ModTime: props.LastModified(),
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Read uses the relative path of the storage combined with the provided path to
|
|
|
+// read the contents.
|
|
|
+func (b *AzureStorage) Read(name string) ([]byte, error) {
|
|
|
+ name = trimLeading(name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ log.Debugf("AzureStorage::Read(%s)", name)
|
|
|
+
|
|
|
+ reader, err := b.getBlobReader(ctx, name, 0, blob.CountToEnd)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ data, err := io.ReadAll(reader)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return data, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Write uses the relative path of the storage combined with the provided path
|
|
|
+// to write a new file or overwrite an existing file.
|
|
|
+func (b *AzureStorage) Write(name string, data []byte) error {
|
|
|
+ name = trimLeading(name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ log.Debugf("AzureStorage::Write(%s)", name)
|
|
|
+
|
|
|
+ blobURL := getBlobURL(name, b.containerURL)
|
|
|
+ r := bytes.NewReader(data)
|
|
|
+ if _, err := blob.UploadStreamToBlockBlob(ctx, r, blobURL,
|
|
|
+ blob.UploadStreamToBlockBlobOptions{
|
|
|
+ BufferSize: len(data),
|
|
|
+ MaxBuffers: 1,
|
|
|
+ },
|
|
|
+ ); err != nil {
|
|
|
+ return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// Remove uses the relative path of the storage combined with the provided path to
|
|
|
+// remove a file from storage permanently.
|
|
|
+func (b *AzureStorage) Remove(name string) error {
|
|
|
+ name = trimLeading(name)
|
|
|
+
|
|
|
+ log.Debugf("AzureStorage::Remove(%s)", name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ blobURL := getBlobURL(name, b.containerURL)
|
|
|
+ if _, err := blobURL.Delete(ctx, blob.DeleteSnapshotsOptionInclude, blob.BlobAccessConditions{}); err != nil {
|
|
|
+ return errors.Wrapf(err, "error deleting blob, address: %s", name)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// Exists uses the relative path of the storage combined with the provided path to
|
|
|
+// determine if the file exists.
|
|
|
+func (b *AzureStorage) Exists(name string) (bool, error) {
|
|
|
+ name = trimLeading(name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ blobURL := getBlobURL(name, b.containerURL)
|
|
|
+ if _, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{}); err != nil {
|
|
|
+ if b.isObjNotFoundErr(err) {
|
|
|
+ return false, nil
|
|
|
+ }
|
|
|
+ return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
|
|
|
+ }
|
|
|
+
|
|
|
+ return true, nil
|
|
|
+}
|
|
|
+
|
|
|
+// List uses the relative path of the storage combined with the provided path to return
|
|
|
+// storage information for the files.
|
|
|
+func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
|
|
|
+ path = trimLeading(path)
|
|
|
+
|
|
|
+ log.Debugf("AzureStorage::List(%s)", path)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
|
|
|
+ // object itself as one prefix item.
|
|
|
+ if path != "" {
|
|
|
+ path = strings.TrimSuffix(path, DirDelim) + DirDelim
|
|
|
+ }
|
|
|
+
|
|
|
+ marker := blob.Marker{}
|
|
|
+ listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
|
|
|
+
|
|
|
+ var names []string
|
|
|
+ for i := 1; ; i++ {
|
|
|
+ var blobItems []blob.BlobItemInternal
|
|
|
+
|
|
|
+ list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
|
|
|
+ }
|
|
|
+
|
|
|
+ marker = list.NextMarker
|
|
|
+ blobItems = list.Segment.BlobItems
|
|
|
+
|
|
|
+ for _, blob := range blobItems {
|
|
|
+ names = append(names, blob.Name)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Continue iterating if we are not done.
|
|
|
+ if !marker.NotDone() {
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
|
|
|
+ }
|
|
|
+
|
|
|
+ // get the storage information for each blob (really unfortunate we have to do this)
|
|
|
+ var lock sync.Mutex
|
|
|
+ var stats []*StorageInfo
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(len(names))
|
|
|
+
|
|
|
+ for i := 0; i < len(names); i++ {
|
|
|
+ go func(n string) {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ stat, err := b.Stat(n)
|
|
|
+ if err != nil {
|
|
|
+ log.Errorf("Error statting blob %s: %s", n, err)
|
|
|
+ } else {
|
|
|
+ lock.Lock()
|
|
|
+ stats = append(stats, stat)
|
|
|
+ lock.Unlock()
|
|
|
+ }
|
|
|
+ }(names[i])
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ return stats, nil
|
|
|
+}
|
|
|
+
|
|
|
+// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
|
|
|
+func (b *AzureStorage) isObjNotFoundErr(err error) bool {
|
|
|
+ if err == nil {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ errorCode := parseError(err.Error())
|
|
|
+ if errorCode == "InvalidUri" || errorCode == "BlobNotFound" {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+func (b *AzureStorage) getBlobReader(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) {
|
|
|
+ log.Debugf("Getting blob: %s, offset: %d, length: %d", name, offset, length)
|
|
|
+ if name == "" {
|
|
|
+ return nil, errors.New("X-Ms-Error-Code: [EmptyContainerName]")
|
|
|
+ }
|
|
|
+ exists, err := b.Exists(name)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "cannot get blob reader: %s", name)
|
|
|
+ }
|
|
|
+
|
|
|
+ if !exists {
|
|
|
+ return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
|
|
|
+ }
|
|
|
+
|
|
|
+ blobURL := getBlobURL(name, b.containerURL)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
|
|
|
+ }
|
|
|
+ var props *blob.BlobGetPropertiesResponse
|
|
|
+ props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "cannot get properties for container: %s", name)
|
|
|
+ }
|
|
|
+
|
|
|
+ var size int64
|
|
|
+ // If a length is specified and it won't go past the end of the file,
|
|
|
+ // then set it as the size.
|
|
|
+ if length > 0 && length <= props.ContentLength()-offset {
|
|
|
+ size = length
|
|
|
+ log.Debugf("set size to length. size: %d, length: %d, offset: %d, name: %s", size, length, offset, name)
|
|
|
+ } else {
|
|
|
+ size = props.ContentLength() - offset
|
|
|
+ log.Debugf("set size to go to EOF. contentlength: %d, size: %d, length: %d, offset: %d, name: %s", props.ContentLength(), size, length, offset, name)
|
|
|
+ }
|
|
|
+
|
|
|
+ destBuffer := make([]byte, size)
|
|
|
+
|
|
|
+ if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size,
|
|
|
+ destBuffer, blob.DownloadFromBlobOptions{
|
|
|
+ BlockSize: blob.BlobDefaultDownloadBlockSize,
|
|
|
+ Parallelism: uint16(3),
|
|
|
+ Progress: nil,
|
|
|
+ RetryReaderOptionsPerBlock: blob.RetryReaderOptions{
|
|
|
+ MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ ); err != nil {
|
|
|
+ return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL)
|
|
|
+ }
|
|
|
+
|
|
|
+ return ioutil.NopCloser(bytes.NewReader(destBuffer)), nil
|
|
|
+}
|
|
|
+
|
|
|
+func getAzureStorageCredentials(conf AzureConfig) (blob.Credential, error) {
|
|
|
+ if conf.MSIResource != "" || conf.UserAssignedID != "" {
|
|
|
+ spt, err := getServicePrincipalToken(conf)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if err := spt.Refresh(); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return blob.NewTokenCredential(spt.Token().AccessToken, func(tc blob.TokenCredential) time.Duration {
|
|
|
+ err := spt.Refresh()
|
|
|
+ if err != nil {
|
|
|
+ log.Errorf("could not refresh MSI token. err: %s", err)
|
|
|
+ // Retry later as the error can be related to API throttling
|
|
|
+ return 30 * time.Second
|
|
|
+ }
|
|
|
+ tc.SetToken(spt.Token().AccessToken)
|
|
|
+ return spt.Token().Expires().Sub(time.Now().Add(2 * time.Minute))
|
|
|
+ }), nil
|
|
|
+ }
|
|
|
+
|
|
|
+ credential, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return credential, nil
|
|
|
+}
|
|
|
+
|
|
|
+func getServicePrincipalToken(conf AzureConfig) (*adal.ServicePrincipalToken, error) {
|
|
|
+ resource := conf.MSIResource
|
|
|
+ if resource == "" {
|
|
|
+ resource = fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)
|
|
|
+ }
|
|
|
+
|
|
|
+ msiConfig := auth.MSIConfig{
|
|
|
+ Resource: resource,
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.UserAssignedID != "" {
|
|
|
+ log.Debugf("using user assigned identity. clientId: %s", conf.UserAssignedID)
|
|
|
+ msiConfig.ClientID = conf.UserAssignedID
|
|
|
+ } else {
|
|
|
+ log.Debugf("using system assigned identity")
|
|
|
+ }
|
|
|
+
|
|
|
+ return msiConfig.ServicePrincipalToken()
|
|
|
+}
|
|
|
+
|
|
|
+func getContainerURL(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
|
|
|
+ credentials, err := getAzureStorageCredentials(conf)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return blob.ContainerURL{}, err
|
|
|
+ }
|
|
|
+
|
|
|
+ retryOptions := blob.RetryOptions{
|
|
|
+ MaxTries: conf.PipelineConfig.MaxTries,
|
|
|
+ TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
|
|
|
+ RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
|
|
|
+ MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
|
|
|
+ }
|
|
|
+
|
|
|
+ if deadline, ok := ctx.Deadline(); ok {
|
|
|
+ retryOptions.TryTimeout = time.Until(deadline)
|
|
|
+ }
|
|
|
+
|
|
|
+ dt, err := DefaultAzureTransport(conf)
|
|
|
+ if err != nil {
|
|
|
+ return blob.ContainerURL{}, err
|
|
|
+ }
|
|
|
+ client := http.Client{
|
|
|
+ Transport: dt,
|
|
|
+ }
|
|
|
+
|
|
|
+ p := blob.NewPipeline(credentials, blob.PipelineOptions{
|
|
|
+ Retry: retryOptions,
|
|
|
+ Telemetry: blob.TelemetryOptions{Value: "Kubecost"},
|
|
|
+ RequestLog: blob.RequestLogOptions{
|
|
|
+ // Log a warning if an operation takes longer than the specified duration.
|
|
|
+ // (-1=no logging; 0=default 3s threshold)
|
|
|
+ LogWarningIfTryOverThreshold: -1,
|
|
|
+ },
|
|
|
+ Log: pipeline.LogOptions{
|
|
|
+ ShouldLog: nil,
|
|
|
+ },
|
|
|
+ HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
|
|
|
+ return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
|
|
|
+ resp, err := client.Do(request.WithContext(ctx))
|
|
|
+
|
|
|
+ return pipeline.NewHTTPResponse(resp), err
|
|
|
+ }
|
|
|
+ }),
|
|
|
+ })
|
|
|
+ u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
|
|
|
+ if err != nil {
|
|
|
+ return blob.ContainerURL{}, err
|
|
|
+ }
|
|
|
+ service := blob.NewServiceURL(*u, p)
|
|
|
+
|
|
|
+ return service.NewContainerURL(conf.ContainerName), nil
|
|
|
+}
|
|
|
+
|
|
|
+func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
|
|
|
+ tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.HTTPConfig.InsecureSkipVerify {
|
|
|
+ tlsConfig.InsecureSkipVerify = true
|
|
|
+ }
|
|
|
+ return &http.Transport{
|
|
|
+ Proxy: http.ProxyFromEnvironment,
|
|
|
+ DialContext: (&net.Dialer{
|
|
|
+ Timeout: 30 * time.Second,
|
|
|
+ KeepAlive: 30 * time.Second,
|
|
|
+ DualStack: true,
|
|
|
+ }).DialContext,
|
|
|
+
|
|
|
+ MaxIdleConns: config.HTTPConfig.MaxIdleConns,
|
|
|
+ MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
|
|
|
+ IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
|
|
|
+ MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
|
|
|
+ TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
|
|
|
+ ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
|
|
|
+
|
|
|
+ ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
|
|
|
+ DisableCompression: config.HTTPConfig.DisableCompression,
|
|
|
+ TLSClientConfig: tlsConfig,
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+func getContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
|
|
|
+ c, err := getContainerURL(ctx, conf)
|
|
|
+ if err != nil {
|
|
|
+ return blob.ContainerURL{}, err
|
|
|
+ }
|
|
|
+ // Getting container properties to check if it exists or not. Returns error which will be parsed further.
|
|
|
+ _, err = c.GetProperties(ctx, blob.LeaseAccessConditions{})
|
|
|
+ return c, err
|
|
|
+}
|
|
|
+
|
|
|
+func createContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
|
|
|
+ c, err := getContainerURL(ctx, conf)
|
|
|
+ if err != nil {
|
|
|
+ return blob.ContainerURL{}, err
|
|
|
+ }
|
|
|
+ _, err = c.Create(
|
|
|
+ ctx,
|
|
|
+ blob.Metadata{},
|
|
|
+ blob.PublicAccessNone)
|
|
|
+ return c, err
|
|
|
+}
|
|
|
+
|
|
|
+func getBlobURL(blobName string, c blob.ContainerURL) blob.BlockBlobURL {
|
|
|
+ return c.NewBlockBlobURL(blobName)
|
|
|
+}
|
|
|
+
|
|
|
+func parseError(errorCode string) string {
|
|
|
+ match := errorCodeRegex.FindStringSubmatch(errorCode)
|
|
|
+ if len(match) == 2 {
|
|
|
+ return match[1]
|
|
|
+ }
|
|
|
+ return errorCode
|
|
|
+}
|