azurestorage.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. package storage
  2. // Fork from Thanos S3 Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
  5. import (
  6. "bytes"
  7. "context"
  8. "fmt"
  9. "io"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/opencost/opencost/core/pkg/log"
  17. "github.com/Azure/azure-pipeline-go/pipeline"
  18. blob "github.com/Azure/azure-storage-blob-go/azblob"
  19. "github.com/Azure/go-autorest/autorest/adal"
  20. "github.com/Azure/go-autorest/autorest/azure/auth"
  21. "github.com/pkg/errors"
  22. "github.com/prometheus/common/model"
  23. "gopkg.in/yaml.v2"
  24. )
  25. const (
  26. azureDefaultEndpoint = "blob.core.windows.net"
  27. )
  28. // Set default retry values to default Azure values. 0 = use Default Azure.
  29. var defaultAzureConfig = AzureConfig{
  30. PipelineConfig: PipelineConfig{
  31. MaxTries: 0,
  32. TryTimeout: 0,
  33. RetryDelay: 0,
  34. MaxRetryDelay: 0,
  35. },
  36. ReaderConfig: ReaderConfig{
  37. MaxRetryRequests: 0,
  38. },
  39. HTTPConfig: AzureHTTPConfig{
  40. IdleConnTimeout: model.Duration(90 * time.Second),
  41. ResponseHeaderTimeout: model.Duration(2 * time.Minute),
  42. TLSHandshakeTimeout: model.Duration(10 * time.Second),
  43. ExpectContinueTimeout: model.Duration(1 * time.Second),
  44. MaxIdleConns: 100,
  45. MaxIdleConnsPerHost: 100,
  46. MaxConnsPerHost: 0,
  47. DisableCompression: false,
  48. },
  49. }
  50. func init() {
  51. // Disable `ForceLog` in Azure storage module
  52. // As the time of this patch, the logging function in the storage module isn't correctly
  53. // detecting expected REST errors like 404 and so outputs them to syslog along with a stacktrace.
  54. // https://github.com/Azure/azure-storage-blob-go/issues/214
  55. //
  56. // This needs to be done at startup because the underlying variable is not thread safe.
  57. // https://github.com/Azure/azure-pipeline-go/blob/dc95902f1d32034f8f743ccc6c3f2eb36b84da27/pipeline/core.go#L276-L283
  58. pipeline.SetForceLogEnabled(false)
  59. }
  60. // AzureConfig Azure storage configuration.
  61. type AzureConfig struct {
  62. StorageAccountName string `yaml:"storage_account"`
  63. StorageAccountKey string `yaml:"storage_account_key"`
  64. ContainerName string `yaml:"container"`
  65. Endpoint string `yaml:"endpoint"`
  66. MaxRetries int `yaml:"max_retries"`
  67. MSIResource string `yaml:"msi_resource"`
  68. UserAssignedID string `yaml:"user_assigned_id"`
  69. PipelineConfig PipelineConfig `yaml:"pipeline_config"`
  70. ReaderConfig ReaderConfig `yaml:"reader_config"`
  71. HTTPConfig AzureHTTPConfig `yaml:"http_config"`
  72. }
  73. type ReaderConfig struct {
  74. MaxRetryRequests int `yaml:"max_retry_requests"`
  75. }
  76. type PipelineConfig struct {
  77. MaxTries int32 `yaml:"max_tries"`
  78. TryTimeout model.Duration `yaml:"try_timeout"`
  79. RetryDelay model.Duration `yaml:"retry_delay"`
  80. MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
  81. }
  82. type AzureHTTPConfig struct {
  83. IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
  84. ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
  85. InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
  86. TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"`
  87. ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
  88. MaxIdleConns int `yaml:"max_idle_conns"`
  89. MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
  90. MaxConnsPerHost int `yaml:"max_conns_per_host"`
  91. DisableCompression bool `yaml:"disable_compression"`
  92. TLSConfig TLSConfig `yaml:"tls_config"`
  93. }
  94. // AzureStorage implements the storage.Storage interface against Azure APIs.
  95. type AzureStorage struct {
  96. name string
  97. containerURL blob.ContainerURL
  98. config *AzureConfig
  99. }
  100. // Validate checks to see if any of the config options are set.
  101. func (conf *AzureConfig) validate() error {
  102. var errMsg []string
  103. if conf.MSIResource == "" {
  104. if conf.UserAssignedID == "" {
  105. if conf.StorageAccountName == "" ||
  106. conf.StorageAccountKey == "" {
  107. errMsg = append(errMsg, "invalid Azure storage configuration")
  108. }
  109. if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
  110. errMsg = append(errMsg, "no Azure storage_account specified while storage_account_key is present in config file; both should be present")
  111. }
  112. if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
  113. errMsg = append(errMsg, "no Azure storage_account_key specified while storage_account is present in config file; both should be present")
  114. }
  115. } else {
  116. if conf.StorageAccountName == "" {
  117. errMsg = append(errMsg, "UserAssignedID is configured but storage account name is missing")
  118. }
  119. if conf.StorageAccountKey != "" {
  120. errMsg = append(errMsg, "UserAssignedID is configured but storage account key is used")
  121. }
  122. }
  123. } else {
  124. if conf.StorageAccountName == "" {
  125. errMsg = append(errMsg, "MSI resource is configured but storage account name is missing")
  126. }
  127. if conf.StorageAccountKey != "" {
  128. errMsg = append(errMsg, "MSI resource is configured but storage account key is used")
  129. }
  130. }
  131. if conf.ContainerName == "" {
  132. errMsg = append(errMsg, "no Azure container specified")
  133. }
  134. if conf.Endpoint == "" {
  135. conf.Endpoint = azureDefaultEndpoint
  136. }
  137. if conf.PipelineConfig.MaxTries < 0 {
  138. errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
  139. }
  140. if conf.ReaderConfig.MaxRetryRequests < 0 {
  141. errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
  142. }
  143. if len(errMsg) > 0 {
  144. return errors.New(strings.Join(errMsg, ", "))
  145. }
  146. return nil
  147. }
  148. // parseAzureConfig unmarshals a buffer into a Config with default values.
  149. func parseAzureConfig(conf []byte) (AzureConfig, error) {
  150. config := defaultAzureConfig
  151. if err := yaml.UnmarshalStrict(conf, &config); err != nil {
  152. return AzureConfig{}, err
  153. }
  154. // If we don't have config specific retry values but we do have the generic MaxRetries.
  155. // This is for backwards compatibility but also ease of configuration.
  156. if config.MaxRetries > 0 {
  157. if config.PipelineConfig.MaxTries == 0 {
  158. config.PipelineConfig.MaxTries = int32(config.MaxRetries)
  159. }
  160. if config.ReaderConfig.MaxRetryRequests == 0 {
  161. config.ReaderConfig.MaxRetryRequests = config.MaxRetries
  162. }
  163. }
  164. return config, nil
  165. }
  166. // NewAzureStorage returns a new Storage using the provided Azure config.
  167. func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
  168. log.Debugf("Creating new Azure Bucket Connection")
  169. conf, err := parseAzureConfig(azureConfig)
  170. if err != nil {
  171. return nil, err
  172. }
  173. return NewAzureStorageWith(conf)
  174. }
  175. // NewAzureStorageWith returns a new Storage using the provided Azure config struct.
  176. func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
  177. if err := conf.validate(); err != nil {
  178. return nil, err
  179. }
  180. ctx := context.Background()
  181. container, err := createContainer(ctx, conf)
  182. if err != nil {
  183. ret, ok := err.(blob.StorageError)
  184. if !ok {
  185. return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
  186. }
  187. if ret.ServiceCode() == "ContainerAlreadyExists" {
  188. log.Debugf("Getting connection to existing Azure blob container: %s", conf.ContainerName)
  189. container, err = getContainer(ctx, conf)
  190. if err != nil {
  191. return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
  192. }
  193. } else {
  194. return nil, errors.Wrapf(err, "error creating Azure blob container: %s", container)
  195. }
  196. } else {
  197. log.Infof("Azure blob container successfully created. Address: %s", container)
  198. }
  199. return &AzureStorage{
  200. name: conf.ContainerName,
  201. containerURL: container,
  202. config: &conf,
  203. }, nil
  204. }
  205. // Name returns the bucket name for azure storage.
  206. func (as *AzureStorage) Name() string {
  207. return as.name
  208. }
  209. // StorageType returns a string identifier for the type of storage used by the implementation.
  210. func (as *AzureStorage) StorageType() StorageType {
  211. return StorageTypeBucketAzure
  212. }
  213. // FullPath returns the storage working path combined with the path provided
  214. func (as *AzureStorage) FullPath(name string) string {
  215. name = trimLeading(name)
  216. return name
  217. }
  218. // Stat returns the StorageStats for the specific path.
  219. func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
  220. name = trimLeading(name)
  221. ctx := context.Background()
  222. blobURL := getBlobURL(name, b.containerURL)
  223. props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
  224. if err != nil {
  225. return nil, err
  226. }
  227. return &StorageInfo{
  228. Name: trimName(name),
  229. Size: props.ContentLength(),
  230. ModTime: props.LastModified(),
  231. }, nil
  232. }
  233. // Read uses the relative path of the storage combined with the provided path to
  234. // read the contents.
  235. func (b *AzureStorage) Read(name string) ([]byte, error) {
  236. name = trimLeading(name)
  237. ctx := context.Background()
  238. log.Debugf("AzureStorage::Read(%s)", name)
  239. reader, err := b.getBlobReader(ctx, name, 0, blob.CountToEnd)
  240. if err != nil {
  241. return nil, err
  242. }
  243. data, err := io.ReadAll(reader)
  244. if err != nil {
  245. return nil, err
  246. }
  247. return data, nil
  248. }
  249. // Write uses the relative path of the storage combined with the provided path
  250. // to write a new file or overwrite an existing file.
  251. func (b *AzureStorage) Write(name string, data []byte) error {
  252. name = trimLeading(name)
  253. ctx := context.Background()
  254. log.Debugf("AzureStorage::Write(%s)", name)
  255. blobURL := getBlobURL(name, b.containerURL)
  256. r := bytes.NewReader(data)
  257. if _, err := blob.UploadStreamToBlockBlob(ctx, r, blobURL,
  258. blob.UploadStreamToBlockBlobOptions{
  259. BufferSize: len(data),
  260. MaxBuffers: 1,
  261. },
  262. ); err != nil {
  263. return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
  264. }
  265. return nil
  266. }
  267. // Remove uses the relative path of the storage combined with the provided path to
  268. // remove a file from storage permanently.
  269. func (b *AzureStorage) Remove(name string) error {
  270. name = trimLeading(name)
  271. log.Debugf("AzureStorage::Remove(%s)", name)
  272. ctx := context.Background()
  273. blobURL := getBlobURL(name, b.containerURL)
  274. if _, err := blobURL.Delete(ctx, blob.DeleteSnapshotsOptionInclude, blob.BlobAccessConditions{}); err != nil {
  275. return errors.Wrapf(err, "error deleting blob, address: %s", name)
  276. }
  277. return nil
  278. }
  279. // Exists uses the relative path of the storage combined with the provided path to
  280. // determine if the file exists.
  281. func (b *AzureStorage) Exists(name string) (bool, error) {
  282. name = trimLeading(name)
  283. ctx := context.Background()
  284. blobURL := getBlobURL(name, b.containerURL)
  285. if _, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{}); err != nil {
  286. var se blob.StorageError
  287. if errors.As(err, &se) && se.ServiceCode() == blob.ServiceCodeBlobNotFound {
  288. return false, nil
  289. }
  290. return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
  291. }
  292. return true, nil
  293. }
  294. // List uses the relative path of the storage combined with the provided path to return
  295. // storage information for the files.
  296. func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
  297. path = trimLeading(path)
  298. log.Debugf("AzureStorage::List(%s)", path)
  299. ctx := context.Background()
  300. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  301. // object itself as one prefix item.
  302. if path != "" {
  303. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  304. }
  305. marker := blob.Marker{}
  306. listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
  307. var names []string
  308. for i := 1; ; i++ {
  309. var blobItems []blob.BlobItemInternal
  310. list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
  311. if err != nil {
  312. return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
  313. }
  314. marker = list.NextMarker
  315. blobItems = list.Segment.BlobItems
  316. for _, blob := range blobItems {
  317. names = append(names, blob.Name)
  318. }
  319. // Continue iterating if we are not done.
  320. if !marker.NotDone() {
  321. break
  322. }
  323. log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
  324. }
  325. // get the storage information for each blob (really unfortunate we have to do this)
  326. var lock sync.Mutex
  327. var stats []*StorageInfo
  328. var wg sync.WaitGroup
  329. wg.Add(len(names))
  330. for i := 0; i < len(names); i++ {
  331. go func(n string) {
  332. defer wg.Done()
  333. stat, err := b.Stat(n)
  334. if err != nil {
  335. log.Errorf("Error statting blob %s: %s", n, err)
  336. } else {
  337. lock.Lock()
  338. stats = append(stats, stat)
  339. lock.Unlock()
  340. }
  341. }(names[i])
  342. }
  343. wg.Wait()
  344. return stats, nil
  345. }
  346. func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  347. path = trimLeading(path)
  348. log.Debugf("AzureStorage::ListDirectories(%s)", path)
  349. ctx := context.Background()
  350. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  351. // object itself as one prefix item.
  352. if path != "" {
  353. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  354. }
  355. marker := blob.Marker{}
  356. listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
  357. var stats []*StorageInfo
  358. for i := 1; ; i++ {
  359. var blobPrefixes []blob.BlobPrefix
  360. list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
  361. if err != nil {
  362. return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
  363. }
  364. marker = list.NextMarker
  365. blobPrefixes = list.Segment.BlobPrefixes
  366. for _, prefix := range blobPrefixes {
  367. stats = append(stats, &StorageInfo{
  368. Name: trimLeading(prefix.Name),
  369. })
  370. }
  371. // Continue iterating if we are not done.
  372. if !marker.NotDone() {
  373. break
  374. }
  375. log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(stats), i)
  376. }
  377. return stats, nil
  378. }
  379. func (b *AzureStorage) getBlobReader(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) {
  380. log.Debugf("Getting blob: %s, offset: %d, length: %d", name, offset, length)
  381. if name == "" {
  382. return nil, errors.New("X-Ms-Error-Code: [EmptyContainerName]")
  383. }
  384. exists, err := b.Exists(name)
  385. if err != nil {
  386. return nil, errors.Wrapf(err, "cannot get blob reader: %s", name)
  387. }
  388. if !exists {
  389. return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
  390. }
  391. blobURL := getBlobURL(name, b.containerURL)
  392. if err != nil {
  393. return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
  394. }
  395. var props *blob.BlobGetPropertiesResponse
  396. props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
  397. if err != nil {
  398. return nil, errors.Wrapf(err, "cannot get properties for container: %s", name)
  399. }
  400. var size int64
  401. // If a length is specified and it won't go past the end of the file,
  402. // then set it as the size.
  403. if length > 0 && length <= props.ContentLength()-offset {
  404. size = length
  405. log.Debugf("set size to length. size: %d, length: %d, offset: %d, name: %s", size, length, offset, name)
  406. } else {
  407. size = props.ContentLength() - offset
  408. log.Debugf("set size to go to EOF. contentlength: %d, size: %d, length: %d, offset: %d, name: %s", props.ContentLength(), size, length, offset, name)
  409. }
  410. destBuffer := make([]byte, size)
  411. if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size,
  412. destBuffer, blob.DownloadFromBlobOptions{
  413. BlockSize: blob.BlobDefaultDownloadBlockSize,
  414. Parallelism: uint16(3),
  415. Progress: nil,
  416. RetryReaderOptionsPerBlock: blob.RetryReaderOptions{
  417. MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
  418. },
  419. },
  420. ); err != nil {
  421. return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL)
  422. }
  423. return io.NopCloser(bytes.NewReader(destBuffer)), nil
  424. }
  425. func getAzureStorageCredentials(conf AzureConfig) (blob.Credential, error) {
  426. if conf.MSIResource != "" || conf.UserAssignedID != "" {
  427. spt, err := getServicePrincipalToken(conf)
  428. if err != nil {
  429. return nil, err
  430. }
  431. if err := spt.Refresh(); err != nil {
  432. return nil, err
  433. }
  434. return blob.NewTokenCredential(spt.Token().AccessToken, func(tc blob.TokenCredential) time.Duration {
  435. err := spt.Refresh()
  436. if err != nil {
  437. log.Errorf("could not refresh MSI token. err: %s", err)
  438. // Retry later as the error can be related to API throttling
  439. return 30 * time.Second
  440. }
  441. tc.SetToken(spt.Token().AccessToken)
  442. return spt.Token().Expires().Sub(time.Now().Add(2 * time.Minute))
  443. }), nil
  444. }
  445. credential, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
  446. if err != nil {
  447. return nil, err
  448. }
  449. return credential, nil
  450. }
  451. func getServicePrincipalToken(conf AzureConfig) (*adal.ServicePrincipalToken, error) {
  452. resource := conf.MSIResource
  453. if resource == "" {
  454. resource = fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)
  455. }
  456. msiConfig := auth.MSIConfig{
  457. Resource: resource,
  458. }
  459. if conf.UserAssignedID != "" {
  460. log.Debugf("using user assigned identity. clientId: %s", conf.UserAssignedID)
  461. msiConfig.ClientID = conf.UserAssignedID
  462. } else {
  463. log.Debugf("using system assigned identity")
  464. }
  465. return msiConfig.ServicePrincipalToken()
  466. }
  467. func getContainerURL(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
  468. credentials, err := getAzureStorageCredentials(conf)
  469. if err != nil {
  470. return blob.ContainerURL{}, err
  471. }
  472. retryOptions := blob.RetryOptions{
  473. MaxTries: conf.PipelineConfig.MaxTries,
  474. TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
  475. RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
  476. MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
  477. }
  478. if deadline, ok := ctx.Deadline(); ok {
  479. retryOptions.TryTimeout = time.Until(deadline)
  480. }
  481. dt, err := DefaultAzureTransport(conf)
  482. if err != nil {
  483. return blob.ContainerURL{}, err
  484. }
  485. client := http.Client{
  486. Transport: dt,
  487. }
  488. p := blob.NewPipeline(credentials, blob.PipelineOptions{
  489. Retry: retryOptions,
  490. Telemetry: blob.TelemetryOptions{Value: "Kubecost"},
  491. RequestLog: blob.RequestLogOptions{
  492. // Log a warning if an operation takes longer than the specified duration.
  493. // (-1=no logging; 0=default 3s threshold)
  494. LogWarningIfTryOverThreshold: -1,
  495. },
  496. Log: pipeline.LogOptions{
  497. ShouldLog: nil,
  498. },
  499. HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
  500. return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
  501. resp, err := client.Do(request.WithContext(ctx))
  502. return pipeline.NewHTTPResponse(resp), err
  503. }
  504. }),
  505. })
  506. u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
  507. if err != nil {
  508. return blob.ContainerURL{}, err
  509. }
  510. service := blob.NewServiceURL(*u, p)
  511. return service.NewContainerURL(conf.ContainerName), nil
  512. }
  513. func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
  514. tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
  515. if err != nil {
  516. return nil, err
  517. }
  518. if config.HTTPConfig.InsecureSkipVerify {
  519. tlsConfig.InsecureSkipVerify = true
  520. }
  521. return &http.Transport{
  522. Proxy: http.ProxyFromEnvironment,
  523. DialContext: (&net.Dialer{
  524. Timeout: 30 * time.Second,
  525. KeepAlive: 30 * time.Second,
  526. DualStack: true,
  527. }).DialContext,
  528. MaxIdleConns: config.HTTPConfig.MaxIdleConns,
  529. MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
  530. IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
  531. MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
  532. TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
  533. ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
  534. ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
  535. DisableCompression: config.HTTPConfig.DisableCompression,
  536. TLSClientConfig: tlsConfig,
  537. }, nil
  538. }
  539. func getContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
  540. c, err := getContainerURL(ctx, conf)
  541. if err != nil {
  542. return blob.ContainerURL{}, err
  543. }
  544. // Getting container properties to check if it exists or not. Returns error which will be parsed further.
  545. _, err = c.GetProperties(ctx, blob.LeaseAccessConditions{})
  546. return c, err
  547. }
  548. func createContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
  549. c, err := getContainerURL(ctx, conf)
  550. if err != nil {
  551. return blob.ContainerURL{}, err
  552. }
  553. _, err = c.Create(
  554. ctx,
  555. blob.Metadata{},
  556. blob.PublicAccessNone)
  557. return c, err
  558. }
  559. func getBlobURL(blobName string, c blob.ContainerURL) blob.BlockBlobURL {
  560. return c.NewBlockBlobURL(blobName)
  561. }