Browse Source

Add Azure Storage Support, Update TLSConfig Settings

Matt Bolt 4 years ago
parent
commit
1254a28280
7 changed files with 820 additions and 62 deletions
  1. 4 1
      go.mod
  2. 14 0
      go.sum
  3. 642 0
      pkg/storage/azurestorage.go
  4. 28 5
      pkg/storage/bucketstorage.go
  5. 11 11
      pkg/storage/gcsstorage.go
  6. 37 45
      pkg/storage/s3storage.go
  7. 84 0
      pkg/storage/tlsconfig.go

+ 4 - 1
go.mod

@@ -49,6 +49,8 @@ require (
 )
 
 require (
+	github.com/Azure/azure-pipeline-go v0.2.3 // indirect
+	github.com/Azure/azure-storage-blob-go v0.15.0 // indirect
 	github.com/Azure/go-autorest v14.2.0+incompatible // indirect
 	github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect
 	github.com/Azure/go-autorest/autorest/azure/cli v0.4.5 // indirect
@@ -92,6 +94,7 @@ require (
 	github.com/klauspost/compress v1.13.5 // indirect
 	github.com/klauspost/cpuid v1.3.1 // indirect
 	github.com/magiconair/properties v1.8.5 // indirect
+	github.com/mattn/go-ieproxy v0.0.1 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/minio/md5-simd v1.1.0 // indirect
 	github.com/minio/sha256-simd v0.1.1 // indirect
@@ -126,7 +129,7 @@ require (
 	google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect
 	google.golang.org/grpc v1.38.0 // indirect
 	google.golang.org/protobuf v1.26.0 // indirect
-	gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
+	gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
 	gopkg.in/inf.v0 v0.9.1 // indirect
 	gopkg.in/ini.v1 v1.62.0 // indirect
 	k8s.io/klog/v2 v2.4.0 // indirect

+ 14 - 0
go.sum

@@ -41,8 +41,12 @@ cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
+github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
+github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
 github.com/Azure/azure-sdk-for-go v61.6.0+incompatible h1:jdHWEqRK9boUrdUPIWDE9dKLmxbHmz+PFk3jRQ9s1C0=
 github.com/Azure/azure-sdk-for-go v61.6.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
+github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
+github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
 github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
 github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
 github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
@@ -51,6 +55,7 @@ github.com/Azure/go-autorest/autorest v0.11.27 h1:F3R3q42aWytozkV8ihzcgMO4OA4cuq
 github.com/Azure/go-autorest/autorest v0.11.27/go.mod h1:7l8ybrIdUmGqZMTD0sRtAr8NvbHjfofbf8RSP2q7w7U=
 github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQWWNcK9YU+MH96NgK+jErpbcg=
 github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
+github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
 github.com/Azure/go-autorest/autorest/adal v0.9.18 h1:kLnPsRjzZZUF3K5REu/Kc+qMQrvuza2bwSnNdhmzLfQ=
 github.com/Azure/go-autorest/autorest/adal v0.9.18/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ=
 github.com/Azure/go-autorest/autorest/azure/auth v0.5.11 h1:P6bYXFoao05z5uhOQzbC3Qd8JqF3jUoocoTeIxkp2cA=
@@ -298,6 +303,7 @@ github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLe
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@@ -391,6 +397,7 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
@@ -408,6 +415,8 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
 github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
 github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
 github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
+github.com/mattn/go-ieproxy v0.0.1 h1:qiyop7gCflfhwCzGyeT0gro3sF9AIg9HU98JORTkqfI=
+github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
 github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
 github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
@@ -599,6 +608,7 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20201216223049-8b5274cf687f/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
@@ -662,6 +672,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -684,6 +695,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
@@ -735,6 +747,7 @@ golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -961,6 +974,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=

+ 642 - 0
pkg/storage/azurestorage.go

@@ -0,0 +1,642 @@
+package storage
+
+// Fork from Thanos S3 Bucket support to reuse configuration options
+// Licensed under the Apache License 2.0
+// https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"net/url"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/kubecost/cost-model/pkg/log"
+
+	"github.com/Azure/azure-pipeline-go/pipeline"
+	blob "github.com/Azure/azure-storage-blob-go/azblob"
+	"github.com/Azure/go-autorest/autorest/adal"
+	"github.com/Azure/go-autorest/autorest/azure/auth"
+	"github.com/pkg/errors"
+	"github.com/prometheus/common/model"
+	"gopkg.in/yaml.v2"
+)
+
+const (
+	azureDefaultEndpoint = "blob.core.windows.net"
+)
+
+var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`)
+
+// Set default retry values to default Azure values. 0 = use Default Azure.
+var defaultAzureConfig = AzureConfig{
+	PipelineConfig: PipelineConfig{
+		MaxTries:      0,
+		TryTimeout:    0,
+		RetryDelay:    0,
+		MaxRetryDelay: 0,
+	},
+	ReaderConfig: ReaderConfig{
+		MaxRetryRequests: 0,
+	},
+	HTTPConfig: AzureHTTPConfig{
+		IdleConnTimeout:       model.Duration(90 * time.Second),
+		ResponseHeaderTimeout: model.Duration(2 * time.Minute),
+		TLSHandshakeTimeout:   model.Duration(10 * time.Second),
+		ExpectContinueTimeout: model.Duration(1 * time.Second),
+		MaxIdleConns:          100,
+		MaxIdleConnsPerHost:   100,
+		MaxConnsPerHost:       0,
+		DisableCompression:    false,
+	},
+}
+
+func init() {
+	// Disable `ForceLog` in Azure storage module
+	// As the time of this patch, the logging function in the storage module isn't correctly
+	// detecting expected REST errors like 404 and so outputs them to syslog along with a stacktrace.
+	// https://github.com/Azure/azure-storage-blob-go/issues/214
+	//
+	// This needs to be done at startup because the underlying variable is not thread safe.
+	// https://github.com/Azure/azure-pipeline-go/blob/dc95902f1d32034f8f743ccc6c3f2eb36b84da27/pipeline/core.go#L276-L283
+	pipeline.SetForceLogEnabled(false)
+}
+
+// AzureConfig Azure storage configuration.
+type AzureConfig struct {
+	StorageAccountName string          `yaml:"storage_account"`
+	StorageAccountKey  string          `yaml:"storage_account_key"`
+	ContainerName      string          `yaml:"container"`
+	Endpoint           string          `yaml:"endpoint"`
+	MaxRetries         int             `yaml:"max_retries"`
+	MSIResource        string          `yaml:"msi_resource"`
+	UserAssignedID     string          `yaml:"user_assigned_id"`
+	PipelineConfig     PipelineConfig  `yaml:"pipeline_config"`
+	ReaderConfig       ReaderConfig    `yaml:"reader_config"`
+	HTTPConfig         AzureHTTPConfig `yaml:"http_config"`
+}
+
+type ReaderConfig struct {
+	MaxRetryRequests int `yaml:"max_retry_requests"`
+}
+
+type PipelineConfig struct {
+	MaxTries      int32          `yaml:"max_tries"`
+	TryTimeout    model.Duration `yaml:"try_timeout"`
+	RetryDelay    model.Duration `yaml:"retry_delay"`
+	MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
+}
+
+type AzureHTTPConfig struct {
+	IdleConnTimeout       model.Duration `yaml:"idle_conn_timeout"`
+	ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
+	InsecureSkipVerify    bool           `yaml:"insecure_skip_verify"`
+
+	TLSHandshakeTimeout   model.Duration `yaml:"tls_handshake_timeout"`
+	ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
+	MaxIdleConns          int            `yaml:"max_idle_conns"`
+	MaxIdleConnsPerHost   int            `yaml:"max_idle_conns_per_host"`
+	MaxConnsPerHost       int            `yaml:"max_conns_per_host"`
+	DisableCompression    bool           `yaml:"disable_compression"`
+
+	TLSConfig TLSConfig `yaml:"tls_config"`
+}
+
+// AzureStorage implements the storeage.Storage interface against Azure APIs.
+type AzureStorage struct {
+	name         string
+	containerURL blob.ContainerURL
+	config       *AzureConfig
+}
+
+// Validate checks to see if any of the config options are set.
+func (conf *AzureConfig) validate() error {
+	var errMsg []string
+	if conf.MSIResource == "" {
+		if conf.UserAssignedID == "" {
+			if conf.StorageAccountName == "" ||
+				conf.StorageAccountKey == "" {
+				errMsg = append(errMsg, "invalid Azure storage configuration")
+			}
+			if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
+				errMsg = append(errMsg, "no Azure storage_account specified while storage_account_key is present in config file; both should be present")
+			}
+			if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
+				errMsg = append(errMsg, "no Azure storage_account_key specified while storage_account is present in config file; both should be present")
+			}
+		} else {
+			if conf.StorageAccountName == "" {
+				errMsg = append(errMsg, "UserAssignedID is configured but storage account name is missing")
+			}
+			if conf.StorageAccountKey != "" {
+				errMsg = append(errMsg, "UserAssignedID is configured but storage account key is used")
+			}
+		}
+	} else {
+		if conf.StorageAccountName == "" {
+			errMsg = append(errMsg, "MSI resource is configured but storage account name is missing")
+		}
+		if conf.StorageAccountKey != "" {
+			errMsg = append(errMsg, "MSI resource is configured but storage account key is used")
+		}
+	}
+
+	if conf.ContainerName == "" {
+		errMsg = append(errMsg, "no Azure container specified")
+	}
+	if conf.Endpoint == "" {
+		conf.Endpoint = azureDefaultEndpoint
+	}
+
+	if conf.PipelineConfig.MaxTries < 0 {
+		errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
+	}
+
+	if conf.ReaderConfig.MaxRetryRequests < 0 {
+		errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
+	}
+
+	if len(errMsg) > 0 {
+		return errors.New(strings.Join(errMsg, ", "))
+	}
+
+	return nil
+}
+
+// parseAzureConfig unmarshals a buffer into a Config with default values.
+func parseAzureConfig(conf []byte) (AzureConfig, error) {
+	config := defaultAzureConfig
+	if err := yaml.UnmarshalStrict(conf, &config); err != nil {
+		return AzureConfig{}, err
+	}
+
+	// If we don't have config specific retry values but we do have the generic MaxRetries.
+	// This is for backwards compatibility but also ease of configuration.
+	if config.MaxRetries > 0 {
+		if config.PipelineConfig.MaxTries == 0 {
+			config.PipelineConfig.MaxTries = int32(config.MaxRetries)
+		}
+		if config.ReaderConfig.MaxRetryRequests == 0 {
+			config.ReaderConfig.MaxRetryRequests = config.MaxRetries
+		}
+	}
+
+	return config, nil
+}
+
+// NewAzureStorage returns a new Storage using the provided Azure config.
+func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
+	log.Debugf("Creating new Azure Bucket Connection")
+
+	conf, err := parseAzureConfig(azureConfig)
+	if err != nil {
+		return nil, err
+	}
+
+	return NewAzureStorageWith(conf)
+}
+
+// NewAzureStorageWith returns a new Storage using the provided Azure config struct.
+func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
+	if err := conf.validate(); err != nil {
+		return nil, err
+	}
+
+	ctx := context.Background()
+	container, err := createContainer(ctx, conf)
+	if err != nil {
+		ret, ok := err.(blob.StorageError)
+		if !ok {
+			return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
+		}
+		if ret.ServiceCode() == "ContainerAlreadyExists" {
+			log.Debugf("Getting connection to existing Azure blob container: %s", conf.ContainerName)
+			container, err = getContainer(ctx, conf)
+			if err != nil {
+				return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
+			}
+		} else {
+			return nil, errors.Wrapf(err, "error creating Azure blob container: %s", container)
+		}
+	} else {
+		log.Infof("Azure blob container successfully created. Address: %s", container)
+	}
+
+	return &AzureStorage{
+		name:         conf.ContainerName,
+		containerURL: container,
+		config:       &conf,
+	}, nil
+}
+
+// Name returns the bucket name for azure storage.
+func (as *AzureStorage) Name() string {
+	return as.name
+}
+
+// FullPath returns the storage working path combined with the path provided
+func (as *AzureStorage) FullPath(name string) string {
+	name = trimLeading(name)
+
+	return name
+}
+
+// Stat returns the StorageStats for the specific path.
+func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
+	name = trimLeading(name)
+	ctx := context.Background()
+
+	blobURL := getBlobURL(name, b.containerURL)
+	props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
+	if err != nil {
+		return nil, err
+	}
+
+	return &StorageInfo{
+		Name:    trimName(name),
+		Size:    props.ContentLength(),
+		ModTime: props.LastModified(),
+	}, nil
+}
+
+// Read uses the relative path of the storage combined with the provided path to
+// read the contents.
+func (b *AzureStorage) Read(name string) ([]byte, error) {
+	name = trimLeading(name)
+	ctx := context.Background()
+
+	log.Infof("AzureStorage::Read(%s)", name)
+
+	reader, err := b.getBlobReader(ctx, name, 0, blob.CountToEnd)
+	if err != nil {
+		return nil, err
+	}
+
+	data, err := io.ReadAll(reader)
+	if err != nil {
+		return nil, err
+	}
+
+	return data, nil
+}
+
+// Write uses the relative path of the storage combined with the provided path
+// to write a new file or overwrite an existing file.
+func (b *AzureStorage) Write(name string, data []byte) error {
+	name = trimLeading(name)
+	ctx := context.Background()
+
+	log.Infof("AzureStorage::Write(%s)", name)
+
+	blobURL := getBlobURL(name, b.containerURL)
+	r := bytes.NewReader(data)
+	if _, err := blob.UploadStreamToBlockBlob(ctx, r, blobURL,
+		blob.UploadStreamToBlockBlobOptions{
+			BufferSize: len(data),
+			MaxBuffers: 1,
+		},
+	); err != nil {
+		return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
+	}
+	return nil
+}
+
+// Remove uses the relative path of the storage combined with the provided path to
+// remove a file from storage permanently.
+func (b *AzureStorage) Remove(name string) error {
+	name = trimLeading(name)
+
+	log.Infof("S3Storage::Remove(%s)", name)
+	ctx := context.Background()
+
+	blobURL := getBlobURL(name, b.containerURL)
+	if _, err := blobURL.Delete(ctx, blob.DeleteSnapshotsOptionInclude, blob.BlobAccessConditions{}); err != nil {
+		return errors.Wrapf(err, "error deleting blob, address: %s", name)
+	}
+	return nil
+}
+
+// Exists uses the relative path of the storage combined with the provided path to
+// determine if the file exists.
+func (b *AzureStorage) Exists(name string) (bool, error) {
+	name = trimLeading(name)
+	ctx := context.Background()
+
+	blobURL := getBlobURL(name, b.containerURL)
+	if _, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{}); err != nil {
+		if b.isObjNotFoundErr(err) {
+			return false, nil
+		}
+		return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
+	}
+
+	return true, nil
+}
+
+// List uses the relative path of the storage combined with the provided path to return
+// storage information for the files.
+func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
+	path = trimLeading(path)
+
+	log.Infof("S3Storage::List(%s)", path)
+	ctx := context.Background()
+
+	// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
+	// object itself as one prefix item.
+	if path != "" {
+		path = strings.TrimSuffix(path, DirDelim) + DirDelim
+	}
+
+	marker := blob.Marker{}
+	listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
+
+	var names []string
+	for i := 1; ; i++ {
+		var blobItems []blob.BlobItemInternal
+
+		list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
+		if err != nil {
+			return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
+		}
+
+		marker = list.NextMarker
+		blobItems = list.Segment.BlobItems
+
+		for _, blob := range blobItems {
+			names = append(names, blob.Name)
+		}
+
+		// Continue iterating if we are not done.
+		if !marker.NotDone() {
+			break
+		}
+
+		log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
+	}
+
+	// get the storage information for each blob (really unfortunate we have to do this)
+	var lock sync.Mutex
+	var stats []*StorageInfo
+	var wg sync.WaitGroup
+	wg.Add(len(names))
+
+	for i := 0; i < len(names); i++ {
+		go func(n string) {
+			defer wg.Done()
+
+			stat, err := b.Stat(n)
+			if err != nil {
+				log.Errorf("Error statting blob %s: %s", n, err)
+			} else {
+				lock.Lock()
+				stats = append(stats, stat)
+				lock.Unlock()
+			}
+		}(names[i])
+	}
+
+	wg.Wait()
+
+	return stats, nil
+}
+
+// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
+func (b *AzureStorage) isObjNotFoundErr(err error) bool {
+	if err == nil {
+		return false
+	}
+
+	errorCode := parseError(err.Error())
+	if errorCode == "InvalidUri" || errorCode == "BlobNotFound" {
+		return true
+	}
+
+	return false
+}
+
+func (b *AzureStorage) getBlobReader(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) {
+	log.Debugf("Getting blob: %s, offset: %d, length: %d", name, offset, length)
+	if name == "" {
+		return nil, errors.New("X-Ms-Error-Code: [EmptyContainerName]")
+	}
+	exists, err := b.Exists(name)
+	if err != nil {
+		return nil, errors.Wrapf(err, "cannot get blob reader: %s", name)
+	}
+
+	if !exists {
+		return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
+	}
+
+	blobURL := getBlobURL(name, b.containerURL)
+	if err != nil {
+		return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
+	}
+	var props *blob.BlobGetPropertiesResponse
+	props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
+	if err != nil {
+		return nil, errors.Wrapf(err, "cannot get properties for container: %s", name)
+	}
+
+	var size int64
+	// If a length is specified and it won't go past the end of the file,
+	// then set it as the size.
+	if length > 0 && length <= props.ContentLength()-offset {
+		size = length
+		log.Debugf("set size to length. size: %d, length: %d, offset: %d, name: %s", size, length, offset, name)
+	} else {
+		size = props.ContentLength() - offset
+		log.Debugf("set size to go to EOF. contentlength: %d, size: %d, length: %d, offset: %d, name: %s", props.ContentLength(), size, length, offset, name)
+	}
+
+	destBuffer := make([]byte, size)
+
+	if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size,
+		destBuffer, blob.DownloadFromBlobOptions{
+			BlockSize:   blob.BlobDefaultDownloadBlockSize,
+			Parallelism: uint16(3),
+			Progress:    nil,
+			RetryReaderOptionsPerBlock: blob.RetryReaderOptions{
+				MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
+			},
+		},
+	); err != nil {
+		return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL)
+	}
+
+	return ioutil.NopCloser(bytes.NewReader(destBuffer)), nil
+}
+
+func getAzureStorageCredentials(conf AzureConfig) (blob.Credential, error) {
+	if conf.MSIResource != "" || conf.UserAssignedID != "" {
+		spt, err := getServicePrincipalToken(conf)
+		if err != nil {
+			return nil, err
+		}
+		if err := spt.Refresh(); err != nil {
+			return nil, err
+		}
+
+		return blob.NewTokenCredential(spt.Token().AccessToken, func(tc blob.TokenCredential) time.Duration {
+			err := spt.Refresh()
+			if err != nil {
+				log.Errorf("could not refresh MSI token. err: %s", err)
+				// Retry later as the error can be related to API throttling
+				return 30 * time.Second
+			}
+			tc.SetToken(spt.Token().AccessToken)
+			return spt.Token().Expires().Sub(time.Now().Add(2 * time.Minute))
+		}), nil
+	}
+
+	credential, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
+	if err != nil {
+		return nil, err
+	}
+	return credential, nil
+}
+
+func getServicePrincipalToken(conf AzureConfig) (*adal.ServicePrincipalToken, error) {
+	resource := conf.MSIResource
+	if resource == "" {
+		resource = fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)
+	}
+
+	msiConfig := auth.MSIConfig{
+		Resource: resource,
+	}
+
+	if conf.UserAssignedID != "" {
+		log.Debugf("using user assigned identity. clientId: %s", conf.UserAssignedID)
+		msiConfig.ClientID = conf.UserAssignedID
+	} else {
+		log.Debugf("using system assigned identity")
+	}
+
+	return msiConfig.ServicePrincipalToken()
+}
+
+func getContainerURL(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
+	credentials, err := getAzureStorageCredentials(conf)
+
+	if err != nil {
+		return blob.ContainerURL{}, err
+	}
+
+	retryOptions := blob.RetryOptions{
+		MaxTries:      conf.PipelineConfig.MaxTries,
+		TryTimeout:    time.Duration(conf.PipelineConfig.TryTimeout),
+		RetryDelay:    time.Duration(conf.PipelineConfig.RetryDelay),
+		MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
+	}
+
+	if deadline, ok := ctx.Deadline(); ok {
+		retryOptions.TryTimeout = time.Until(deadline)
+	}
+
+	dt, err := DefaultAzureTransport(conf)
+	if err != nil {
+		return blob.ContainerURL{}, err
+	}
+	client := http.Client{
+		Transport: dt,
+	}
+
+	p := blob.NewPipeline(credentials, blob.PipelineOptions{
+		Retry:     retryOptions,
+		Telemetry: blob.TelemetryOptions{Value: "Thanos"},
+		RequestLog: blob.RequestLogOptions{
+			// Log a warning if an operation takes longer than the specified duration.
+			// (-1=no logging; 0=default 3s threshold)
+			LogWarningIfTryOverThreshold: -1,
+		},
+		Log: pipeline.LogOptions{
+			ShouldLog: nil,
+		},
+		HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
+			return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
+				resp, err := client.Do(request.WithContext(ctx))
+
+				return pipeline.NewHTTPResponse(resp), err
+			}
+		}),
+	})
+	u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
+	if err != nil {
+		return blob.ContainerURL{}, err
+	}
+	service := blob.NewServiceURL(*u, p)
+
+	return service.NewContainerURL(conf.ContainerName), nil
+}
+
+func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
+	tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
+	if err != nil {
+		return nil, err
+	}
+
+	if config.HTTPConfig.InsecureSkipVerify {
+		tlsConfig.InsecureSkipVerify = true
+	}
+	return &http.Transport{
+		Proxy: http.ProxyFromEnvironment,
+		DialContext: (&net.Dialer{
+			Timeout:   30 * time.Second,
+			KeepAlive: 30 * time.Second,
+			DualStack: true,
+		}).DialContext,
+
+		MaxIdleConns:          config.HTTPConfig.MaxIdleConns,
+		MaxIdleConnsPerHost:   config.HTTPConfig.MaxIdleConnsPerHost,
+		IdleConnTimeout:       time.Duration(config.HTTPConfig.IdleConnTimeout),
+		MaxConnsPerHost:       config.HTTPConfig.MaxConnsPerHost,
+		TLSHandshakeTimeout:   time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
+		ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
+
+		ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
+		DisableCompression:    config.HTTPConfig.DisableCompression,
+		TLSClientConfig:       tlsConfig,
+	}, nil
+}
+
+func getContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
+	c, err := getContainerURL(ctx, conf)
+	if err != nil {
+		return blob.ContainerURL{}, err
+	}
+	// Getting container properties to check if it exists or not. Returns error which will be parsed further.
+	_, err = c.GetProperties(ctx, blob.LeaseAccessConditions{})
+	return c, err
+}
+
+func createContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
+	c, err := getContainerURL(ctx, conf)
+	if err != nil {
+		return blob.ContainerURL{}, err
+	}
+	_, err = c.Create(
+		ctx,
+		blob.Metadata{},
+		blob.PublicAccessNone)
+	return c, err
+}
+
+func getBlobURL(blobName string, c blob.ContainerURL) blob.BlockBlobURL {
+	return c.NewBlockBlobURL(blobName)
+}
+
+func parseError(errorCode string) string {
+	match := errorCodeRegex.FindStringSubmatch(errorCode)
+	if len(match) == 2 {
+		return match[1]
+	}
+	return errorCode
+}

+ 28 - 5
pkg/storage/bucketstorage.go

@@ -12,9 +12,9 @@ import (
 type StorageProvider string
 
 const (
-	S3  StorageProvider = "S3"
-	GCS StorageProvider = "GCS"
-	// AZURE StorageProvider = "AZURE"
+	S3    StorageProvider = "S3"
+	GCS   StorageProvider = "GCS"
+	AZURE StorageProvider = "AZURE"
 )
 
 // StorageConfig is the configuration type used as the "parent" configuration. It contains a type, which will
@@ -45,8 +45,8 @@ func NewBucketStorage(config []byte) (Storage, error) {
 		storage, err = NewS3Storage(config)
 	case string(GCS):
 		storage, err = NewGCSStorage(config)
-	//case string(AZURE):
-	//	storage, err = NewAzureStorage(config)
+	case string(AZURE):
+		storage, err = NewAzureStorage(config)
 	default:
 		return nil, errors.Errorf("storage with type %s is not supported", storageConfig.Type)
 	}
@@ -56,3 +56,26 @@ func NewBucketStorage(config []byte) (Storage, error) {
 
 	return storage, nil
 }
+
+// trimLeading removes a leading / from the file name
+func trimLeading(file string) string {
+	if len(file) == 0 {
+		return file
+	}
+
+	if file[0] == '/' {
+		return file[1:]
+	}
+	return file
+}
+
+// trimName removes the leading directory prefix
+func trimName(file string) string {
+	slashIndex := strings.LastIndex(file, "/")
+	if slashIndex < 0 {
+		return file
+	}
+
+	name := file[slashIndex+1:]
+	return name
+}

+ 11 - 11
pkg/storage/gcsstorage.go

@@ -5,7 +5,7 @@ package storage
 
 import (
 	"context"
-	"io/ioutil"
+	"io"
 	"strings"
 
 	gcs "cloud.google.com/go/storage"
@@ -77,14 +77,14 @@ func (gs *GCSStorage) Name() string {
 
 // FullPath returns the storage working path combined with the path provided
 func (gs *GCSStorage) FullPath(name string) string {
-	name = gs.trimLeading(name)
+	name = trimLeading(name)
 
 	return name
 }
 
 // Stat returns the StorageStats for the specific path.
 func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
-	name = gs.trimLeading(name)
+	name = trimLeading(name)
 	//log.Infof("GCSStorage::Stat(%s)", name)]
 
 	ctx := context.Background()
@@ -97,7 +97,7 @@ func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
 	}
 
 	return &StorageInfo{
-		Name:    gs.trimName(attrs.Name),
+		Name:    trimName(attrs.Name),
 		Size:    attrs.Size,
 		ModTime: attrs.Updated,
 	}, nil
@@ -112,7 +112,7 @@ func (gs *GCSStorage) isDoesNotExist(err error) bool {
 // Read uses the relative path of the storage combined with the provided path to
 // read the contents.
 func (gs *GCSStorage) Read(name string) ([]byte, error) {
-	name = gs.trimLeading(name)
+	name = trimLeading(name)
 	log.Infof("GCSStorage::Read(%s)", name)
 
 	ctx := context.Background()
@@ -121,7 +121,7 @@ func (gs *GCSStorage) Read(name string) ([]byte, error) {
 		return nil, err
 	}
 
-	data, err := ioutil.ReadAll(reader)
+	data, err := io.ReadAll(reader)
 	if err != nil {
 		return nil, err
 	}
@@ -132,7 +132,7 @@ func (gs *GCSStorage) Read(name string) ([]byte, error) {
 // Write uses the relative path of the storage combined with the provided path
 // to write a new file or overwrite an existing file.
 func (gs *GCSStorage) Write(name string, data []byte) error {
-	name = gs.trimLeading(name)
+	name = trimLeading(name)
 	log.Infof("GCSStorage::Write(%s)", name)
 
 	ctx := context.Background()
@@ -160,7 +160,7 @@ func (gs *GCSStorage) Write(name string, data []byte) error {
 // Remove uses the relative path of the storage combined with the provided path to
 // remove a file from storage permanently.
 func (gs *GCSStorage) Remove(name string) error {
-	name = gs.trimLeading(name)
+	name = trimLeading(name)
 
 	log.Infof("GCSStorage::Remove(%s)", name)
 	ctx := context.Background()
@@ -171,7 +171,7 @@ func (gs *GCSStorage) Remove(name string) error {
 // Exists uses the relative path of the storage combined with the provided path to
 // determine if the file exists.
 func (gs *GCSStorage) Exists(name string) (bool, error) {
-	name = gs.trimLeading(name)
+	name = trimLeading(name)
 	//log.Infof("GCSStorage::Exists(%s)", name)
 
 	ctx := context.Background()
@@ -189,7 +189,7 @@ func (gs *GCSStorage) Exists(name string) (bool, error) {
 // List uses the relative path of the storage combined with the provided path to return
 // storage information for the files.
 func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
-	path = gs.trimLeading(path)
+	path = trimLeading(path)
 
 	log.Infof("GCSStorage::List(%s)", path)
 	ctx := context.Background()
@@ -222,7 +222,7 @@ func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
 		}
 
 		stats = append(stats, &StorageInfo{
-			Name:    gs.trimName(attrs.Name),
+			Name:    trimName(attrs.Name),
 			Size:    attrs.Size,
 			ModTime: attrs.Updated,
 		})

+ 37 - 45
pkg/storage/s3storage.go

@@ -1,12 +1,12 @@
+package storage
+
 // Fork from Thanos S3 Bucket support to reuse configuration options
 // Licensed under the Apache License 2.0
 // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
-package storage
 
 import (
 	"bytes"
 	"context"
-	"crypto/tls"
 	"io/ioutil"
 	"net"
 	"net/http"
@@ -46,9 +46,9 @@ const (
 	sseConfigKey = ctxKey(0)
 )
 
-var DefaultConfig = S3Config{
+var defaultS3Config = S3Config{
 	PutUserMetadata: map[string]string{},
-	HTTPConfig: HTTPConfig{
+	HTTPConfig: S3HTTPConfig{
 		IdleConnTimeout:       time.Duration(90 * time.Second),
 		ResponseHeaderTimeout: time.Duration(2 * time.Minute),
 		TLSHandshakeTimeout:   time.Duration(10 * time.Second),
@@ -71,7 +71,7 @@ type S3Config struct {
 	SignatureV2        bool              `yaml:"signature_version2"`
 	SecretKey          string            `yaml:"secret_key"`
 	PutUserMetadata    map[string]string `yaml:"put_user_metadata"`
-	HTTPConfig         HTTPConfig        `yaml:"http_config"`
+	HTTPConfig         S3HTTPConfig      `yaml:"http_config"`
 	TraceConfig        TraceConfig       `yaml:"trace"`
 	ListObjectsVersion string            `yaml:"list_objects_version"`
 	// PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
@@ -95,7 +95,7 @@ type TraceConfig struct {
 }
 
 // HTTPConfig stores the http.Transport configuration for the s3 minio client.
-type HTTPConfig struct {
+type S3HTTPConfig struct {
 	IdleConnTimeout       time.Duration `yaml:"idle_conn_timeout"`
 	ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
 	InsecureSkipVerify    bool          `yaml:"insecure_skip_verify"`
@@ -108,13 +108,24 @@ type HTTPConfig struct {
 
 	// Allow upstream callers to inject a round tripper
 	Transport http.RoundTripper `yaml:"-"`
+
+	TLSConfig TLSConfig `yaml:"tls_config"`
 }
 
 // DefaultTransport - this default transport is based on the Minio
 // DefaultTransport up until the following commit:
 // https://githus3.com/minio/minio-go/commit/008c7aa71fc17e11bf980c209a4f8c4d687fc884
 // The values have since diverged.
-func DefaultTransport(config S3Config) *http.Transport {
+func DefaultS3Transport(config S3Config) (*http.Transport, error) {
+	tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
+	if err != nil {
+		return nil, err
+	}
+
+	if config.HTTPConfig.InsecureSkipVerify {
+		tlsConfig.InsecureSkipVerify = true
+	}
+
 	return &http.Transport{
 		Proxy: http.ProxyFromEnvironment,
 		DialContext: (&net.Dialer{
@@ -140,8 +151,8 @@ func DefaultTransport(config S3Config) *http.Transport {
 		// Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
 		DisableCompression: true,
 		// #nosec It's up to the user to decide on TLS configs
-		TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify},
-	}
+		TLSClientConfig: tlsConfig,
+	}, nil
 }
 
 // S3Storage provides storage via S3
@@ -155,8 +166,8 @@ type S3Storage struct {
 }
 
 // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
-func parseConfig(conf []byte) (S3Config, error) {
-	config := DefaultConfig
+func parseS3Config(conf []byte) (S3Config, error) {
+	config := defaultS3Config
 	if err := yaml.UnmarshalStrict(conf, &config); err != nil {
 		return S3Config{}, err
 	}
@@ -168,7 +179,7 @@ func parseConfig(conf []byte) (S3Config, error) {
 func NewS3Storage(conf []byte) (*S3Storage, error) {
 	log.Infof("Creating new S3 Storage...")
 
-	config, err := parseConfig(conf)
+	config, err := parseS3Config(conf)
 	if err != nil {
 		return nil, err
 	}
@@ -224,7 +235,11 @@ func NewS3StorageWith(config S3Config) (*S3Storage, error) {
 	if config.HTTPConfig.Transport != nil {
 		rt = config.HTTPConfig.Transport
 	} else {
-		rt = DefaultTransport(config)
+		var err error
+		rt, err = DefaultS3Transport(config)
+		if err != nil {
+			return nil, err
+		}
 	}
 
 	client, err := minio.New(config.Endpoint, &minio.Options{
@@ -321,14 +336,14 @@ func validate(conf S3Config) error {
 
 // FullPath returns the storage working path combined with the path provided
 func (s3 *S3Storage) FullPath(name string) string {
-	name = s3.trimLeading(name)
+	name = trimLeading(name)
 
 	return name
 }
 
 // Get returns a reader for the given object name.
 func (s3 *S3Storage) Read(name string) ([]byte, error) {
-	name = s3.trimLeading(name)
+	name = trimLeading(name)
 
 	log.Infof("S3Storage::Read(%s)", name)
 	ctx := context.Background()
@@ -339,7 +354,7 @@ func (s3 *S3Storage) Read(name string) ([]byte, error) {
 
 // Exists checks if the given object exists.
 func (s3 *S3Storage) Exists(name string) (bool, error) {
-	name = s3.trimLeading(name)
+	name = trimLeading(name)
 	//log.Infof("S3Storage::Exists(%s)", name)
 
 	ctx := context.Background()
@@ -357,7 +372,7 @@ func (s3 *S3Storage) Exists(name string) (bool, error) {
 
 // Upload the contents of the reader as an object into the bucket.
 func (s3 *S3Storage) Write(name string, data []byte) error {
-	name = s3.trimLeading(name)
+	name = trimLeading(name)
 
 	log.Infof("S3Storage::Write(%s)", name)
 
@@ -391,7 +406,7 @@ func (s3 *S3Storage) Write(name string, data []byte) error {
 
 // Attributes returns information about the specified object.
 func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
-	name = s3.trimLeading(name)
+	name = trimLeading(name)
 
 	//log.Infof("S3Storage::Stat(%s)", name)
 	ctx := context.Background()
@@ -405,7 +420,7 @@ func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
 	}
 
 	return &StorageInfo{
-		Name:    s3.trimName(name),
+		Name:    trimName(name),
 		Size:    objInfo.Size,
 		ModTime: objInfo.LastModified,
 	}, nil
@@ -413,7 +428,7 @@ func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
 
 // Delete removes the object with the given name.
 func (s3 *S3Storage) Remove(name string) error {
-	name = s3.trimLeading(name)
+	name = trimLeading(name)
 
 	log.Infof("S3Storage::Remove(%s)", name)
 	ctx := context.Background()
@@ -422,7 +437,7 @@ func (s3 *S3Storage) Remove(name string) error {
 }
 
 func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
-	path = s3.trimLeading(path)
+	path = trimLeading(path)
 
 	log.Infof("S3Storage::List(%s)", path)
 	ctx := context.Background()
@@ -455,7 +470,7 @@ func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
 		}
 
 		stats = append(stats, &StorageInfo{
-			Name:    s3.trimName(object.Key),
+			Name:    trimName(object.Key),
 			Size:    object.Size,
 			ModTime: object.LastModified,
 		})
@@ -464,29 +479,6 @@ func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
 	return stats, nil
 }
 
-// trimLeading removes a leading / from the file name
-func (s3 *S3Storage) trimLeading(file string) string {
-	if len(file) == 0 {
-		return file
-	}
-
-	if file[0] == '/' {
-		return file[1:]
-	}
-	return file
-}
-
-// trimName removes the leading directory prefix
-func (s3 *S3Storage) trimName(file string) string {
-	slashIndex := strings.LastIndex(file, "/")
-	if slashIndex < 0 {
-		return file
-	}
-
-	name := file[slashIndex+1:]
-	return name
-}
-
 // getServerSideEncryption returns the SSE to use.
 func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
 	if value := ctx.Value(sseConfigKey); value != nil {

+ 84 - 0
pkg/storage/tlsconfig.go

@@ -0,0 +1,84 @@
+package storage
+
+import (
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
+	"io/ioutil"
+)
+
+// NewTLSConfig creates a new tls.Config from the given TLSConfig.
+func NewTLSConfig(cfg *TLSConfig) (*tls.Config, error) {
+	tlsConfig := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify}
+
+	// If a CA cert is provided then let's read it in.
+	if len(cfg.CAFile) > 0 {
+		b, err := readCAFile(cfg.CAFile)
+		if err != nil {
+			return nil, err
+		}
+		if !updateRootCA(tlsConfig, b) {
+			return nil, fmt.Errorf("unable to use specified CA cert %s", cfg.CAFile)
+		}
+	}
+
+	if len(cfg.ServerName) > 0 {
+		tlsConfig.ServerName = cfg.ServerName
+	}
+	// If a client cert & key is provided then configure TLS config accordingly.
+	if len(cfg.CertFile) > 0 && len(cfg.KeyFile) == 0 {
+		return nil, fmt.Errorf("client cert file %q specified without client key file", cfg.CertFile)
+	} else if len(cfg.KeyFile) > 0 && len(cfg.CertFile) == 0 {
+		return nil, fmt.Errorf("client key file %q specified without client cert file", cfg.KeyFile)
+	} else if len(cfg.CertFile) > 0 && len(cfg.KeyFile) > 0 {
+		// Verify that client cert and key are valid.
+		if _, err := cfg.getClientCertificate(nil); err != nil {
+			return nil, err
+		}
+		tlsConfig.GetClientCertificate = cfg.getClientCertificate
+	}
+
+	return tlsConfig, nil
+}
+
+// readCAFile reads the CA cert file from disk.
+func readCAFile(f string) ([]byte, error) {
+	data, err := ioutil.ReadFile(f)
+	if err != nil {
+		return nil, fmt.Errorf("unable to load specified CA cert %s: %s", f, err)
+	}
+	return data, nil
+}
+
+// updateRootCA parses the given byte slice as a series of PEM encoded certificates and updates tls.Config.RootCAs.
+func updateRootCA(cfg *tls.Config, b []byte) bool {
+	caCertPool := x509.NewCertPool()
+	if !caCertPool.AppendCertsFromPEM(b) {
+		return false
+	}
+	cfg.RootCAs = caCertPool
+	return true
+}
+
+// getClientCertificate reads the pair of client cert and key from disk and returns a tls.Certificate.
+func (c *TLSConfig) getClientCertificate(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
+	cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
+	if err != nil {
+		return nil, fmt.Errorf("unable to use specified client cert (%s) & key (%s): %s", c.CertFile, c.KeyFile, err)
+	}
+	return &cert, nil
+}
+
+// TLSConfig configures the options for TLS connections.
+type TLSConfig struct {
+	// The CA cert to use for the targets.
+	CAFile string `yaml:"ca_file"`
+	// The client cert file for the targets.
+	CertFile string `yaml:"cert_file"`
+	// The client key file for the targets.
+	KeyFile string `yaml:"key_file"`
+	// Used to verify the hostname for the targets.
+	ServerName string `yaml:"server_name"`
+	// Disable target certificate validation.
+	InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
+}