azurestorage.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733
  1. package storage
  2. // Fork from Thanos S3 Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
  5. import (
  6. "bytes"
  7. "context"
  8. "fmt"
  9. "io"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "regexp"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/opencost/opencost/pkg/log"
  18. "github.com/Azure/azure-pipeline-go/pipeline"
  19. blob "github.com/Azure/azure-storage-blob-go/azblob"
  20. "github.com/Azure/go-autorest/autorest/adal"
  21. "github.com/Azure/go-autorest/autorest/azure/auth"
  22. "github.com/pkg/errors"
  23. "github.com/prometheus/common/model"
  24. "gopkg.in/yaml.v2"
  25. )
  26. const (
  27. azureDefaultEndpoint = "blob.core.windows.net"
  28. )
  29. var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`)
  30. // Set default retry values to default Azure values. 0 = use Default Azure.
  31. var defaultAzureConfig = AzureConfig{
  32. PipelineConfig: PipelineConfig{
  33. MaxTries: 0,
  34. TryTimeout: 0,
  35. RetryDelay: 0,
  36. MaxRetryDelay: 0,
  37. },
  38. ReaderConfig: ReaderConfig{
  39. MaxRetryRequests: 0,
  40. },
  41. HTTPConfig: AzureHTTPConfig{
  42. IdleConnTimeout: model.Duration(90 * time.Second),
  43. ResponseHeaderTimeout: model.Duration(2 * time.Minute),
  44. TLSHandshakeTimeout: model.Duration(10 * time.Second),
  45. ExpectContinueTimeout: model.Duration(1 * time.Second),
  46. MaxIdleConns: 100,
  47. MaxIdleConnsPerHost: 100,
  48. MaxConnsPerHost: 0,
  49. DisableCompression: false,
  50. },
  51. }
  52. func init() {
  53. // Disable `ForceLog` in Azure storage module
  54. // As the time of this patch, the logging function in the storage module isn't correctly
  55. // detecting expected REST errors like 404 and so outputs them to syslog along with a stacktrace.
  56. // https://github.com/Azure/azure-storage-blob-go/issues/214
  57. //
  58. // This needs to be done at startup because the underlying variable is not thread safe.
  59. // https://github.com/Azure/azure-pipeline-go/blob/dc95902f1d32034f8f743ccc6c3f2eb36b84da27/pipeline/core.go#L276-L283
  60. pipeline.SetForceLogEnabled(false)
  61. }
  62. // AzureConfig Azure storage configuration.
  63. type AzureConfig struct {
  64. StorageAccountName string `yaml:"storage_account"`
  65. StorageAccountKey string `yaml:"storage_account_key"`
  66. ContainerName string `yaml:"container"`
  67. Endpoint string `yaml:"endpoint"`
  68. MaxRetries int `yaml:"max_retries"`
  69. MSIResource string `yaml:"msi_resource"`
  70. UserAssignedID string `yaml:"user_assigned_id"`
  71. PipelineConfig PipelineConfig `yaml:"pipeline_config"`
  72. ReaderConfig ReaderConfig `yaml:"reader_config"`
  73. HTTPConfig AzureHTTPConfig `yaml:"http_config"`
  74. }
  75. type ReaderConfig struct {
  76. MaxRetryRequests int `yaml:"max_retry_requests"`
  77. }
  78. type PipelineConfig struct {
  79. MaxTries int32 `yaml:"max_tries"`
  80. TryTimeout model.Duration `yaml:"try_timeout"`
  81. RetryDelay model.Duration `yaml:"retry_delay"`
  82. MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
  83. }
  84. type AzureHTTPConfig struct {
  85. IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
  86. ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
  87. InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
  88. TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"`
  89. ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
  90. MaxIdleConns int `yaml:"max_idle_conns"`
  91. MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
  92. MaxConnsPerHost int `yaml:"max_conns_per_host"`
  93. DisableCompression bool `yaml:"disable_compression"`
  94. TLSConfig TLSConfig `yaml:"tls_config"`
  95. }
  96. // AzureStorage implements the storage.Storage interface against Azure APIs.
  97. type AzureStorage struct {
  98. name string
  99. containerURL blob.ContainerURL
  100. config *AzureConfig
  101. }
  102. // Validate checks to see if any of the config options are set.
  103. func (conf *AzureConfig) validate() error {
  104. var errMsg []string
  105. if conf.MSIResource == "" {
  106. if conf.UserAssignedID == "" {
  107. if conf.StorageAccountName == "" ||
  108. conf.StorageAccountKey == "" {
  109. errMsg = append(errMsg, "invalid Azure storage configuration")
  110. }
  111. if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
  112. errMsg = append(errMsg, "no Azure storage_account specified while storage_account_key is present in config file; both should be present")
  113. }
  114. if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
  115. errMsg = append(errMsg, "no Azure storage_account_key specified while storage_account is present in config file; both should be present")
  116. }
  117. } else {
  118. if conf.StorageAccountName == "" {
  119. errMsg = append(errMsg, "UserAssignedID is configured but storage account name is missing")
  120. }
  121. if conf.StorageAccountKey != "" {
  122. errMsg = append(errMsg, "UserAssignedID is configured but storage account key is used")
  123. }
  124. }
  125. } else {
  126. if conf.StorageAccountName == "" {
  127. errMsg = append(errMsg, "MSI resource is configured but storage account name is missing")
  128. }
  129. if conf.StorageAccountKey != "" {
  130. errMsg = append(errMsg, "MSI resource is configured but storage account key is used")
  131. }
  132. }
  133. if conf.ContainerName == "" {
  134. errMsg = append(errMsg, "no Azure container specified")
  135. }
  136. if conf.Endpoint == "" {
  137. conf.Endpoint = azureDefaultEndpoint
  138. }
  139. if conf.PipelineConfig.MaxTries < 0 {
  140. errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
  141. }
  142. if conf.ReaderConfig.MaxRetryRequests < 0 {
  143. errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
  144. }
  145. if len(errMsg) > 0 {
  146. return errors.New(strings.Join(errMsg, ", "))
  147. }
  148. return nil
  149. }
  150. // parseAzureConfig unmarshals a buffer into a Config with default values.
  151. func parseAzureConfig(conf []byte) (AzureConfig, error) {
  152. config := defaultAzureConfig
  153. if err := yaml.UnmarshalStrict(conf, &config); err != nil {
  154. return AzureConfig{}, err
  155. }
  156. // If we don't have config specific retry values but we do have the generic MaxRetries.
  157. // This is for backwards compatibility but also ease of configuration.
  158. if config.MaxRetries > 0 {
  159. if config.PipelineConfig.MaxTries == 0 {
  160. config.PipelineConfig.MaxTries = int32(config.MaxRetries)
  161. }
  162. if config.ReaderConfig.MaxRetryRequests == 0 {
  163. config.ReaderConfig.MaxRetryRequests = config.MaxRetries
  164. }
  165. }
  166. return config, nil
  167. }
  168. // NewAzureStorage returns a new Storage using the provided Azure config.
  169. func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
  170. log.Debugf("Creating new Azure Bucket Connection")
  171. conf, err := parseAzureConfig(azureConfig)
  172. if err != nil {
  173. return nil, err
  174. }
  175. return NewAzureStorageWith(conf)
  176. }
  177. // NewAzureStorageWith returns a new Storage using the provided Azure config struct.
  178. func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
  179. if err := conf.validate(); err != nil {
  180. return nil, err
  181. }
  182. ctx := context.Background()
  183. container, err := createContainer(ctx, conf)
  184. if err != nil {
  185. ret, ok := err.(blob.StorageError)
  186. if !ok {
  187. return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
  188. }
  189. if ret.ServiceCode() == "ContainerAlreadyExists" {
  190. log.Debugf("Getting connection to existing Azure blob container: %s", conf.ContainerName)
  191. container, err = getContainer(ctx, conf)
  192. if err != nil {
  193. return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
  194. }
  195. } else {
  196. return nil, errors.Wrapf(err, "error creating Azure blob container: %s", container)
  197. }
  198. } else {
  199. log.Infof("Azure blob container successfully created. Address: %s", container)
  200. }
  201. return &AzureStorage{
  202. name: conf.ContainerName,
  203. containerURL: container,
  204. config: &conf,
  205. }, nil
  206. }
  207. // Name returns the bucket name for azure storage.
  208. func (as *AzureStorage) Name() string {
  209. return as.name
  210. }
  211. // StorageType returns a string identifier for the type of storage used by the implementation.
  212. func (as *AzureStorage) StorageType() StorageType {
  213. return StorageTypeBucketAzure
  214. }
  215. // FullPath returns the storage working path combined with the path provided
  216. func (as *AzureStorage) FullPath(name string) string {
  217. name = trimLeading(name)
  218. return name
  219. }
  220. // Stat returns the StorageStats for the specific path.
  221. func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
  222. name = trimLeading(name)
  223. ctx := context.Background()
  224. blobURL := getBlobURL(name, b.containerURL)
  225. props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
  226. if err != nil {
  227. return nil, err
  228. }
  229. return &StorageInfo{
  230. Name: trimName(name),
  231. Size: props.ContentLength(),
  232. ModTime: props.LastModified(),
  233. }, nil
  234. }
  235. func (b *AzureStorage) StatDirectories(name string) (*StorageInfo, error) {
  236. name = trimLeading(name)
  237. ctx := context.Background()
  238. blobURL := getBlobURL(name, b.containerURL)
  239. props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
  240. if err != nil {
  241. return nil, err
  242. }
  243. if trimName(name) == "" {
  244. return &StorageInfo{
  245. Name: trimName(name),
  246. Size: props.ContentLength(),
  247. ModTime: props.LastModified(),
  248. }, nil
  249. } else {
  250. return nil, fmt.Errorf("non-directory in dir")
  251. }
  252. }
  253. // Read uses the relative path of the storage combined with the provided path to
  254. // read the contents.
  255. func (b *AzureStorage) Read(name string) ([]byte, error) {
  256. name = trimLeading(name)
  257. ctx := context.Background()
  258. log.Debugf("AzureStorage::Read(%s)", name)
  259. reader, err := b.getBlobReader(ctx, name, 0, blob.CountToEnd)
  260. if err != nil {
  261. return nil, err
  262. }
  263. data, err := io.ReadAll(reader)
  264. if err != nil {
  265. return nil, err
  266. }
  267. return data, nil
  268. }
  269. // Write uses the relative path of the storage combined with the provided path
  270. // to write a new file or overwrite an existing file.
  271. func (b *AzureStorage) Write(name string, data []byte) error {
  272. name = trimLeading(name)
  273. ctx := context.Background()
  274. log.Debugf("AzureStorage::Write(%s)", name)
  275. blobURL := getBlobURL(name, b.containerURL)
  276. r := bytes.NewReader(data)
  277. if _, err := blob.UploadStreamToBlockBlob(ctx, r, blobURL,
  278. blob.UploadStreamToBlockBlobOptions{
  279. BufferSize: len(data),
  280. MaxBuffers: 1,
  281. },
  282. ); err != nil {
  283. return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
  284. }
  285. return nil
  286. }
  287. // Remove uses the relative path of the storage combined with the provided path to
  288. // remove a file from storage permanently.
  289. func (b *AzureStorage) Remove(name string) error {
  290. name = trimLeading(name)
  291. log.Debugf("AzureStorage::Remove(%s)", name)
  292. ctx := context.Background()
  293. blobURL := getBlobURL(name, b.containerURL)
  294. if _, err := blobURL.Delete(ctx, blob.DeleteSnapshotsOptionInclude, blob.BlobAccessConditions{}); err != nil {
  295. return errors.Wrapf(err, "error deleting blob, address: %s", name)
  296. }
  297. return nil
  298. }
  299. // Exists uses the relative path of the storage combined with the provided path to
  300. // determine if the file exists.
  301. func (b *AzureStorage) Exists(name string) (bool, error) {
  302. name = trimLeading(name)
  303. ctx := context.Background()
  304. blobURL := getBlobURL(name, b.containerURL)
  305. if _, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{}); err != nil {
  306. if b.isObjNotFoundErr(err) {
  307. return false, nil
  308. }
  309. return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
  310. }
  311. return true, nil
  312. }
  313. // List uses the relative path of the storage combined with the provided path to return
  314. // storage information for the files.
  315. func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
  316. path = trimLeading(path)
  317. log.Debugf("AzureStorage::List(%s)", path)
  318. ctx := context.Background()
  319. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  320. // object itself as one prefix item.
  321. if path != "" {
  322. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  323. }
  324. marker := blob.Marker{}
  325. listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
  326. var names []string
  327. for i := 1; ; i++ {
  328. var blobItems []blob.BlobItemInternal
  329. list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
  330. if err != nil {
  331. return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
  332. }
  333. marker = list.NextMarker
  334. blobItems = list.Segment.BlobItems
  335. for _, blob := range blobItems {
  336. names = append(names, blob.Name)
  337. }
  338. // Continue iterating if we are not done.
  339. if !marker.NotDone() {
  340. break
  341. }
  342. log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
  343. }
  344. // get the storage information for each blob (really unfortunate we have to do this)
  345. var lock sync.Mutex
  346. var stats []*StorageInfo
  347. var wg sync.WaitGroup
  348. wg.Add(len(names))
  349. for i := 0; i < len(names); i++ {
  350. go func(n string) {
  351. defer wg.Done()
  352. stat, err := b.Stat(n)
  353. if err != nil {
  354. log.Errorf("Error statting blob %s: %s", n, err)
  355. } else {
  356. lock.Lock()
  357. stats = append(stats, stat)
  358. lock.Unlock()
  359. }
  360. }(names[i])
  361. }
  362. wg.Wait()
  363. return stats, nil
  364. }
  365. func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  366. path = trimLeading(path)
  367. log.Debugf("AzureStorage::List(%s)", path)
  368. ctx := context.Background()
  369. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  370. // object itself as one prefix item.
  371. if path != "" {
  372. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  373. }
  374. marker := blob.Marker{}
  375. listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
  376. var names []string
  377. for i := 1; ; i++ {
  378. var blobItems []blob.BlobItemInternal
  379. list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
  380. if err != nil {
  381. return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
  382. }
  383. marker = list.NextMarker
  384. blobItems = list.Segment.BlobItems
  385. for _, blob := range blobItems {
  386. names = append(names, blob.Name)
  387. }
  388. // Continue iterating if we are not done.
  389. if !marker.NotDone() {
  390. break
  391. }
  392. log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
  393. }
  394. // get the storage information for each blob (really unfortunate we have to do this)
  395. var lock sync.Mutex
  396. var stats []*StorageInfo
  397. var wg sync.WaitGroup
  398. wg.Add(len(names))
  399. for i := 0; i < len(names); i++ {
  400. go func(n string) {
  401. defer wg.Done()
  402. stat, err := b.StatDirectories(n)
  403. if err != nil {
  404. log.Errorf("Error statting blob %s: %s", n, err)
  405. } else {
  406. lock.Lock()
  407. stats = append(stats, stat)
  408. lock.Unlock()
  409. }
  410. }(names[i])
  411. }
  412. wg.Wait()
  413. return stats, nil
  414. }
  415. // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
  416. func (b *AzureStorage) isObjNotFoundErr(err error) bool {
  417. if err == nil {
  418. return false
  419. }
  420. errorCode := parseError(err.Error())
  421. if errorCode == "InvalidUri" || errorCode == "BlobNotFound" {
  422. return true
  423. }
  424. return false
  425. }
  426. func (b *AzureStorage) getBlobReader(ctx context.Context, name string, offset, length int64) (io.ReadCloser, error) {
  427. log.Debugf("Getting blob: %s, offset: %d, length: %d", name, offset, length)
  428. if name == "" {
  429. return nil, errors.New("X-Ms-Error-Code: [EmptyContainerName]")
  430. }
  431. exists, err := b.Exists(name)
  432. if err != nil {
  433. return nil, errors.Wrapf(err, "cannot get blob reader: %s", name)
  434. }
  435. if !exists {
  436. return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
  437. }
  438. blobURL := getBlobURL(name, b.containerURL)
  439. if err != nil {
  440. return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
  441. }
  442. var props *blob.BlobGetPropertiesResponse
  443. props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
  444. if err != nil {
  445. return nil, errors.Wrapf(err, "cannot get properties for container: %s", name)
  446. }
  447. var size int64
  448. // If a length is specified and it won't go past the end of the file,
  449. // then set it as the size.
  450. if length > 0 && length <= props.ContentLength()-offset {
  451. size = length
  452. log.Debugf("set size to length. size: %d, length: %d, offset: %d, name: %s", size, length, offset, name)
  453. } else {
  454. size = props.ContentLength() - offset
  455. log.Debugf("set size to go to EOF. contentlength: %d, size: %d, length: %d, offset: %d, name: %s", props.ContentLength(), size, length, offset, name)
  456. }
  457. destBuffer := make([]byte, size)
  458. if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size,
  459. destBuffer, blob.DownloadFromBlobOptions{
  460. BlockSize: blob.BlobDefaultDownloadBlockSize,
  461. Parallelism: uint16(3),
  462. Progress: nil,
  463. RetryReaderOptionsPerBlock: blob.RetryReaderOptions{
  464. MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
  465. },
  466. },
  467. ); err != nil {
  468. return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL)
  469. }
  470. return io.NopCloser(bytes.NewReader(destBuffer)), nil
  471. }
  472. func getAzureStorageCredentials(conf AzureConfig) (blob.Credential, error) {
  473. if conf.MSIResource != "" || conf.UserAssignedID != "" {
  474. spt, err := getServicePrincipalToken(conf)
  475. if err != nil {
  476. return nil, err
  477. }
  478. if err := spt.Refresh(); err != nil {
  479. return nil, err
  480. }
  481. return blob.NewTokenCredential(spt.Token().AccessToken, func(tc blob.TokenCredential) time.Duration {
  482. err := spt.Refresh()
  483. if err != nil {
  484. log.Errorf("could not refresh MSI token. err: %s", err)
  485. // Retry later as the error can be related to API throttling
  486. return 30 * time.Second
  487. }
  488. tc.SetToken(spt.Token().AccessToken)
  489. return spt.Token().Expires().Sub(time.Now().Add(2 * time.Minute))
  490. }), nil
  491. }
  492. credential, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
  493. if err != nil {
  494. return nil, err
  495. }
  496. return credential, nil
  497. }
  498. func getServicePrincipalToken(conf AzureConfig) (*adal.ServicePrincipalToken, error) {
  499. resource := conf.MSIResource
  500. if resource == "" {
  501. resource = fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint)
  502. }
  503. msiConfig := auth.MSIConfig{
  504. Resource: resource,
  505. }
  506. if conf.UserAssignedID != "" {
  507. log.Debugf("using user assigned identity. clientId: %s", conf.UserAssignedID)
  508. msiConfig.ClientID = conf.UserAssignedID
  509. } else {
  510. log.Debugf("using system assigned identity")
  511. }
  512. return msiConfig.ServicePrincipalToken()
  513. }
  514. func getContainerURL(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
  515. credentials, err := getAzureStorageCredentials(conf)
  516. if err != nil {
  517. return blob.ContainerURL{}, err
  518. }
  519. retryOptions := blob.RetryOptions{
  520. MaxTries: conf.PipelineConfig.MaxTries,
  521. TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
  522. RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
  523. MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
  524. }
  525. if deadline, ok := ctx.Deadline(); ok {
  526. retryOptions.TryTimeout = time.Until(deadline)
  527. }
  528. dt, err := DefaultAzureTransport(conf)
  529. if err != nil {
  530. return blob.ContainerURL{}, err
  531. }
  532. client := http.Client{
  533. Transport: dt,
  534. }
  535. p := blob.NewPipeline(credentials, blob.PipelineOptions{
  536. Retry: retryOptions,
  537. Telemetry: blob.TelemetryOptions{Value: "Kubecost"},
  538. RequestLog: blob.RequestLogOptions{
  539. // Log a warning if an operation takes longer than the specified duration.
  540. // (-1=no logging; 0=default 3s threshold)
  541. LogWarningIfTryOverThreshold: -1,
  542. },
  543. Log: pipeline.LogOptions{
  544. ShouldLog: nil,
  545. },
  546. HTTPSender: pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
  547. return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) {
  548. resp, err := client.Do(request.WithContext(ctx))
  549. return pipeline.NewHTTPResponse(resp), err
  550. }
  551. }),
  552. })
  553. u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
  554. if err != nil {
  555. return blob.ContainerURL{}, err
  556. }
  557. service := blob.NewServiceURL(*u, p)
  558. return service.NewContainerURL(conf.ContainerName), nil
  559. }
  560. func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
  561. tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
  562. if err != nil {
  563. return nil, err
  564. }
  565. if config.HTTPConfig.InsecureSkipVerify {
  566. tlsConfig.InsecureSkipVerify = true
  567. }
  568. return &http.Transport{
  569. Proxy: http.ProxyFromEnvironment,
  570. DialContext: (&net.Dialer{
  571. Timeout: 30 * time.Second,
  572. KeepAlive: 30 * time.Second,
  573. DualStack: true,
  574. }).DialContext,
  575. MaxIdleConns: config.HTTPConfig.MaxIdleConns,
  576. MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
  577. IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
  578. MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
  579. TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
  580. ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
  581. ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
  582. DisableCompression: config.HTTPConfig.DisableCompression,
  583. TLSClientConfig: tlsConfig,
  584. }, nil
  585. }
  586. func getContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
  587. c, err := getContainerURL(ctx, conf)
  588. if err != nil {
  589. return blob.ContainerURL{}, err
  590. }
  591. // Getting container properties to check if it exists or not. Returns error which will be parsed further.
  592. _, err = c.GetProperties(ctx, blob.LeaseAccessConditions{})
  593. return c, err
  594. }
  595. func createContainer(ctx context.Context, conf AzureConfig) (blob.ContainerURL, error) {
  596. c, err := getContainerURL(ctx, conf)
  597. if err != nil {
  598. return blob.ContainerURL{}, err
  599. }
  600. _, err = c.Create(
  601. ctx,
  602. blob.Metadata{},
  603. blob.PublicAccessNone)
  604. return c, err
  605. }
  606. func getBlobURL(blobName string, c blob.ContainerURL) blob.BlockBlobURL {
  607. return c.NewBlockBlobURL(blobName)
  608. }
  609. func parseError(errorCode string) string {
  610. match := errorCodeRegex.FindStringSubmatch(errorCode)
  611. if len(match) == 2 {
  612. return match[1]
  613. }
  614. return errorCode
  615. }