azurestorage.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. package storage
  2. // Fork from Thanos Azure Storage Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/objstore/blob/main/providers/azure/azure.go
  5. import (
  6. "bytes"
  7. "context"
  8. "fmt"
  9. "net"
  10. "net/http"
  11. "strings"
  12. "time"
  13. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  14. "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
  15. "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
  16. "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  17. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  18. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
  19. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
  20. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
  21. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  22. "github.com/opencost/opencost/core/pkg/log"
  23. "github.com/pkg/errors"
  24. "github.com/prometheus/common/model"
  25. "gopkg.in/yaml.v2"
  26. )
  27. const (
  28. azureDefaultEndpoint = "blob.core.windows.net"
  29. )
  30. // Set default retry values to default Azure values. 0 = use Default Azure.
  31. var defaultAzureConfig = AzureConfig{
  32. PipelineConfig: PipelineConfig{
  33. MaxTries: 0,
  34. TryTimeout: 0,
  35. RetryDelay: 0,
  36. MaxRetryDelay: 0,
  37. },
  38. ReaderConfig: ReaderConfig{
  39. MaxRetryRequests: 0,
  40. },
  41. HTTPConfig: AzureHTTPConfig{
  42. IdleConnTimeout: model.Duration(90 * time.Second),
  43. ResponseHeaderTimeout: model.Duration(2 * time.Minute),
  44. TLSHandshakeTimeout: model.Duration(10 * time.Second),
  45. ExpectContinueTimeout: model.Duration(1 * time.Second),
  46. MaxIdleConns: 100,
  47. MaxIdleConnsPerHost: 100,
  48. MaxConnsPerHost: 0,
  49. DisableCompression: false,
  50. },
  51. }
  52. // AzureConfig Azure storage configuration.
  53. type AzureConfig struct {
  54. StorageAccountName string `yaml:"storage_account"`
  55. StorageAccountKey string `yaml:"storage_account_key"`
  56. StorageConnectionString string `yaml:"storage_connection_string"`
  57. ContainerName string `yaml:"container"`
  58. Endpoint string `yaml:"endpoint"`
  59. MaxRetries int `yaml:"max_retries"`
  60. MSIResource string `yaml:"msi_resource"`
  61. UserAssignedID string `yaml:"user_assigned_id"`
  62. PipelineConfig PipelineConfig `yaml:"pipeline_config"`
  63. ReaderConfig ReaderConfig `yaml:"reader_config"`
  64. HTTPConfig AzureHTTPConfig `yaml:"http_config"`
  65. }
  66. type ReaderConfig struct {
  67. MaxRetryRequests int `yaml:"max_retry_requests"`
  68. }
  69. type PipelineConfig struct {
  70. MaxTries int32 `yaml:"max_tries"`
  71. TryTimeout model.Duration `yaml:"try_timeout"`
  72. RetryDelay model.Duration `yaml:"retry_delay"`
  73. MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
  74. }
  75. type AzureHTTPConfig struct {
  76. IdleConnTimeout model.Duration `yaml:"idle_conn_timeout"`
  77. ResponseHeaderTimeout model.Duration `yaml:"response_header_timeout"`
  78. InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
  79. TLSHandshakeTimeout model.Duration `yaml:"tls_handshake_timeout"`
  80. ExpectContinueTimeout model.Duration `yaml:"expect_continue_timeout"`
  81. MaxIdleConns int `yaml:"max_idle_conns"`
  82. MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
  83. MaxConnsPerHost int `yaml:"max_conns_per_host"`
  84. DisableCompression bool `yaml:"disable_compression"`
  85. TLSConfig TLSConfig `yaml:"tls_config"`
  86. }
  87. // AzureStorage implements the storage.Storage interface against Azure APIs.
  88. type AzureStorage struct {
  89. name string
  90. containerClient *container.Client
  91. config *AzureConfig
  92. }
  93. // Validate checks to see if any of the config options are set.
  94. func (conf *AzureConfig) validate() error {
  95. var errMsg []string
  96. if conf.UserAssignedID != "" && conf.StorageAccountKey != "" {
  97. errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_account_key authentication")
  98. }
  99. if conf.UserAssignedID != "" && conf.StorageConnectionString != "" {
  100. errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_connection_string authentication")
  101. }
  102. if conf.StorageAccountKey != "" && conf.StorageConnectionString != "" {
  103. errMsg = append(errMsg, "storage_account_key and storage_connection_string cannot both be set")
  104. }
  105. if conf.StorageAccountName == "" {
  106. errMsg = append(errMsg, "storage_account_name is required but not configured")
  107. }
  108. if conf.ContainerName == "" {
  109. errMsg = append(errMsg, "no container specified")
  110. }
  111. if conf.PipelineConfig.MaxTries < 0 {
  112. errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
  113. }
  114. if conf.ReaderConfig.MaxRetryRequests < 0 {
  115. errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
  116. }
  117. if len(errMsg) > 0 {
  118. return errors.New(strings.Join(errMsg, ", "))
  119. }
  120. return nil
  121. }
  122. // parseAzureConfig unmarshals a buffer into a Config with default values.
  123. func parseAzureConfig(conf []byte) (AzureConfig, error) {
  124. config := defaultAzureConfig
  125. if err := yaml.UnmarshalStrict(conf, &config); err != nil {
  126. return AzureConfig{}, err
  127. }
  128. // If we don't have config specific retry values but we do have the generic MaxRetries.
  129. // This is for backwards compatibility but also ease of configuration.
  130. if config.MaxRetries > 0 {
  131. if config.PipelineConfig.MaxTries == 0 {
  132. config.PipelineConfig.MaxTries = int32(config.MaxRetries)
  133. }
  134. if config.ReaderConfig.MaxRetryRequests == 0 {
  135. config.ReaderConfig.MaxRetryRequests = config.MaxRetries
  136. }
  137. }
  138. return config, nil
  139. }
  140. // NewAzureStorage returns a new Storage using the provided Azure config.
  141. func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
  142. log.Debugf("Creating new Azure Bucket Connection")
  143. conf, err := parseAzureConfig(azureConfig)
  144. if err != nil {
  145. return nil, fmt.Errorf("error parsing azure storage config: %w", err)
  146. }
  147. return NewAzureStorageWith(conf)
  148. }
  149. // NewAzureStorageWith returns a new Storage using the provided Azure config struct.
  150. func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
  151. if err := conf.validate(); err != nil {
  152. return nil, fmt.Errorf("error validating azure storage config: %w", err)
  153. }
  154. containerClient, err := getContainerClient(conf)
  155. if err != nil {
  156. return nil, fmt.Errorf("error retrieving container client: %w", err)
  157. }
  158. // Check if storage account container already exists, and create one if it does not.
  159. ctx := context.Background()
  160. _, err = containerClient.GetProperties(ctx, &container.GetPropertiesOptions{})
  161. if err != nil {
  162. if !bloberror.HasCode(err, bloberror.ContainerNotFound) {
  163. return nil, err
  164. }
  165. _, err := containerClient.Create(ctx, nil)
  166. if err != nil {
  167. return nil, errors.Wrapf(err, "error creating Azure blob container: %s", conf.ContainerName)
  168. }
  169. log.Infof("Azure blob container successfully created %s", conf.ContainerName)
  170. }
  171. return &AzureStorage{
  172. name: conf.ContainerName,
  173. containerClient: containerClient,
  174. config: &conf,
  175. }, nil
  176. }
  177. // Name returns the bucket name for azure storage.
  178. func (as *AzureStorage) Name() string {
  179. return as.name
  180. }
  181. // StorageType returns a string identifier for the type of storage used by the implementation.
  182. func (as *AzureStorage) StorageType() StorageType {
  183. return StorageTypeBucketAzure
  184. }
  185. // FullPath returns the storage working path combined with the path provided
  186. func (as *AzureStorage) FullPath(name string) string {
  187. name = trimLeading(name)
  188. return name
  189. }
  190. // Stat returns the StorageStats for the specific path.
  191. func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
  192. name = trimLeading(name)
  193. ctx := context.Background()
  194. blobClient := b.containerClient.NewBlobClient(name)
  195. props, err := blobClient.GetProperties(ctx, nil)
  196. if err != nil {
  197. return nil, fmt.Errorf("error retrieving blob properties: %w", err)
  198. }
  199. return &StorageInfo{
  200. Name: trimName(name),
  201. Size: *props.ContentLength,
  202. ModTime: *props.LastModified,
  203. }, nil
  204. }
  205. // Read uses the relative path of the storage combined with the provided path to
  206. // read the contents.
  207. func (b *AzureStorage) Read(name string) ([]byte, error) {
  208. name = trimLeading(name)
  209. ctx := context.Background()
  210. log.Debugf("AzureStorage::Read(%s)", name)
  211. downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
  212. if err != nil {
  213. return nil, fmt.Errorf("AzureStorage: Read: failed to download %w", err)
  214. }
  215. // NOTE: automatically retries are performed if the connection fails
  216. retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
  217. MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
  218. })
  219. defer retryReader.Close()
  220. // read the body into a buffer
  221. downloadedData := bytes.Buffer{}
  222. _, err = downloadedData.ReadFrom(retryReader)
  223. if err != nil {
  224. return nil, fmt.Errorf("AzureStorage: Read: failed to read downloaded data %w", err)
  225. }
  226. return downloadedData.Bytes(), nil
  227. }
  228. // Write uses the relative path of the storage combined with the provided path
  229. // to write a new file or overwrite an existing file.
  230. func (b *AzureStorage) Write(name string, data []byte) error {
  231. name = trimLeading(name)
  232. ctx := context.Background()
  233. log.Debugf("AzureStorage::Write(%s)", name)
  234. r := bytes.NewReader(data)
  235. blobClient := b.containerClient.NewBlockBlobClient(name)
  236. opts := &blockblob.UploadStreamOptions{
  237. BlockSize: 3 * 1024 * 1024,
  238. Concurrency: 4,
  239. }
  240. if _, err := blobClient.UploadStream(ctx, r, opts); err != nil {
  241. return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
  242. }
  243. return nil
  244. }
  245. // Remove uses the relative path of the storage combined with the provided path to
  246. // remove a file from storage permanently.
  247. func (b *AzureStorage) Remove(name string) error {
  248. name = trimLeading(name)
  249. log.Debugf("AzureStorage::Remove(%s)", name)
  250. ctx := context.Background()
  251. blobClient := b.containerClient.NewBlobClient(name)
  252. opt := &blob.DeleteOptions{
  253. DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
  254. }
  255. if _, err := blobClient.Delete(ctx, opt); err != nil {
  256. return errors.Wrapf(err, "error deleting blob, address: %s", name)
  257. }
  258. return nil
  259. }
  260. // Exists uses the relative path of the storage combined with the provided path to
  261. // determine if the file exists.
  262. func (b *AzureStorage) Exists(name string) (bool, error) {
  263. name = trimLeading(name)
  264. ctx := context.Background()
  265. blobClient := b.containerClient.NewBlobClient(name)
  266. if _, err := blobClient.GetProperties(ctx, nil); err != nil {
  267. if b.IsObjNotFoundErr(err) {
  268. return false, nil
  269. }
  270. return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
  271. }
  272. return true, nil
  273. }
  274. // List uses the relative path of the storage combined with the provided path to return
  275. // storage information for the files.
  276. func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
  277. path = trimLeading(path)
  278. log.Debugf("AzureStorage::List(%s)", path)
  279. ctx := context.Background()
  280. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  281. // object itself as one prefix item.
  282. if path != "" {
  283. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  284. }
  285. var stats []*StorageInfo
  286. list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
  287. Prefix: &path,
  288. })
  289. for list.More() {
  290. page, err := list.NextPage(ctx)
  291. if err != nil {
  292. return nil, fmt.Errorf("failed to retrieve page: %s", err)
  293. }
  294. segment := page.ListBlobsHierarchySegmentResponse.Segment
  295. if segment == nil {
  296. continue
  297. }
  298. for _, blob := range segment.BlobItems {
  299. if blob.Name == nil {
  300. continue
  301. }
  302. if blob.Properties == nil {
  303. continue
  304. }
  305. stats = append(stats, &StorageInfo{
  306. Name: trimName(*blob.Name),
  307. Size: *blob.Properties.ContentLength,
  308. ModTime: *blob.Properties.LastModified,
  309. })
  310. }
  311. }
  312. return stats, nil
  313. }
  314. func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  315. path = trimLeading(path)
  316. log.Debugf("AzureStorage::ListDirectories(%s)", path)
  317. ctx := context.Background()
  318. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  319. // object itself as one prefix item.
  320. if path != "" {
  321. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  322. }
  323. var stats []*StorageInfo
  324. list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
  325. Prefix: &path,
  326. })
  327. for list.More() {
  328. page, err := list.NextPage(ctx)
  329. if err != nil {
  330. return nil, fmt.Errorf("failed to retrieve page: %s", err)
  331. }
  332. segment := page.ListBlobsHierarchySegmentResponse.Segment
  333. if segment == nil {
  334. continue
  335. }
  336. for _, dir := range segment.BlobPrefixes {
  337. if dir.Name == nil {
  338. continue
  339. }
  340. stats = append(stats, &StorageInfo{
  341. Name: *dir.Name,
  342. })
  343. }
  344. }
  345. return stats, nil
  346. }
  347. // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
  348. func (b *AzureStorage) IsObjNotFoundErr(err error) bool {
  349. if err == nil {
  350. return false
  351. }
  352. return bloberror.HasCode(err, bloberror.BlobNotFound) || bloberror.HasCode(err, bloberror.InvalidURI)
  353. }
  354. // IsAccessDeniedErr returns true if access to object is denied.
  355. func (b *AzureStorage) IsAccessDeniedErr(err error) bool {
  356. if err == nil {
  357. return false
  358. }
  359. return bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) || bloberror.HasCode(err, bloberror.InsufficientAccountPermissions)
  360. }
  361. func DefaultAzureTransport(config AzureConfig) (*http.Transport, error) {
  362. tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
  363. if err != nil {
  364. return nil, fmt.Errorf("error creating TLS config: %w", err)
  365. }
  366. if config.HTTPConfig.InsecureSkipVerify {
  367. tlsConfig.InsecureSkipVerify = true
  368. }
  369. return &http.Transport{
  370. Proxy: http.ProxyFromEnvironment,
  371. DialContext: (&net.Dialer{
  372. Timeout: 30 * time.Second,
  373. KeepAlive: 30 * time.Second,
  374. DualStack: true,
  375. }).DialContext,
  376. MaxIdleConns: config.HTTPConfig.MaxIdleConns,
  377. MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
  378. IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
  379. MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
  380. TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
  381. ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
  382. ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
  383. DisableCompression: config.HTTPConfig.DisableCompression,
  384. TLSClientConfig: tlsConfig,
  385. }, nil
  386. }
  387. func getContainerClient(conf AzureConfig) (*container.Client, error) {
  388. dt, err := DefaultAzureTransport(conf)
  389. if err != nil {
  390. return nil, fmt.Errorf("error creating default transport: %w", err)
  391. }
  392. opt := &container.ClientOptions{
  393. ClientOptions: azcore.ClientOptions{
  394. Retry: policy.RetryOptions{
  395. MaxRetries: conf.PipelineConfig.MaxTries,
  396. TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
  397. RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
  398. MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
  399. },
  400. Telemetry: policy.TelemetryOptions{
  401. ApplicationID: "Thanos",
  402. },
  403. Transport: &http.Client{Transport: dt},
  404. },
  405. }
  406. // Use connection string if set
  407. if conf.StorageConnectionString != "" {
  408. containerClient, err := container.NewClientFromConnectionString(conf.StorageConnectionString, conf.ContainerName, opt)
  409. if err != nil {
  410. return nil, fmt.Errorf("error creating client from connection string: %w", err)
  411. }
  412. return containerClient, nil
  413. }
  414. if conf.Endpoint == "" {
  415. conf.Endpoint = "blob.core.windows.net"
  416. }
  417. containerURL := fmt.Sprintf("https://%s.%s/%s", conf.StorageAccountName, conf.Endpoint, conf.ContainerName)
  418. // Use shared keys if set
  419. if conf.StorageAccountKey != "" {
  420. cred, err := container.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
  421. if err != nil {
  422. return nil, fmt.Errorf("error getting shared key credential: %w", err)
  423. }
  424. containerClient, err := container.NewClientWithSharedKeyCredential(containerURL, cred, opt)
  425. if err != nil {
  426. return nil, fmt.Errorf("error creating client with shared key credential: %w", err)
  427. }
  428. return containerClient, nil
  429. }
  430. // Otherwise use a token credential
  431. var cred azcore.TokenCredential
  432. // Use Managed Identity Credential if a user assigned ID is set
  433. if conf.UserAssignedID != "" {
  434. msiOpt := &azidentity.ManagedIdentityCredentialOptions{}
  435. msiOpt.ID = azidentity.ClientID(conf.UserAssignedID)
  436. cred, err = azidentity.NewManagedIdentityCredential(msiOpt)
  437. } else {
  438. // Otherwise use Default Azure Credential
  439. cred, err = azidentity.NewDefaultAzureCredential(nil)
  440. }
  441. if err != nil {
  442. return nil, fmt.Errorf("error creating token credential: %w", err)
  443. }
  444. containerClient, err := container.NewClient(containerURL, cred, opt)
  445. if err != nil {
  446. return nil, fmt.Errorf("error creating client from token credential: %w", err)
  447. }
  448. return containerClient, nil
  449. }