azurestorage.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. package storage
  2. // Fork from Thanos Azure Storage Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/objstore/blob/main/providers/azure/azure.go
  5. import (
  6. "bytes"
  7. "context"
  8. "fmt"
  9. "io"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. "time"
  15. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  16. "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
  17. "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
  18. "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  19. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  20. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
  21. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
  22. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
  23. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  24. "github.com/opencost/opencost/core/pkg/log"
  25. "github.com/pkg/errors"
  26. "github.com/prometheus/common/model"
  27. "gopkg.in/yaml.v2"
  28. )
  29. const (
  30. azureDefaultEndpoint = "blob.core.windows.net"
  31. )
  32. // Set default retry values to default Azure values. 0 = use Default Azure.
  33. var defaultAzureConfig = AzureConfig{
  34. PipelineConfig: PipelineConfig{
  35. MaxTries: 0,
  36. TryTimeout: 0,
  37. RetryDelay: 0,
  38. MaxRetryDelay: 0,
  39. },
  40. ReaderConfig: ReaderConfig{
  41. MaxRetryRequests: 0,
  42. },
  43. HTTPConfig: HTTPConfig{
  44. IdleConnTimeout: 90 * time.Second,
  45. ResponseHeaderTimeout: 2 * time.Minute,
  46. TLSHandshakeTimeout: 10 * time.Second,
  47. ExpectContinueTimeout: 1 * time.Second,
  48. MaxIdleConns: 100,
  49. MaxIdleConnsPerHost: 100,
  50. MaxConnsPerHost: 0,
  51. DisableCompression: false,
  52. },
  53. }
  54. // AzureConfig Azure storage configuration.
  55. type AzureConfig struct {
  56. StorageAccountName string `yaml:"storage_account"`
  57. StorageAccountKey string `yaml:"storage_account_key"`
  58. StorageConnectionString string `yaml:"storage_connection_string"`
  59. ContainerName string `yaml:"container"`
  60. Endpoint string `yaml:"endpoint"`
  61. MaxRetries int `yaml:"max_retries"`
  62. MSIResource string `yaml:"msi_resource"`
  63. UserAssignedID string `yaml:"user_assigned_id"`
  64. PipelineConfig PipelineConfig `yaml:"pipeline_config"`
  65. ReaderConfig ReaderConfig `yaml:"reader_config"`
  66. HTTPConfig HTTPConfig `yaml:"http_config"`
  67. }
  68. type ReaderConfig struct {
  69. MaxRetryRequests int `yaml:"max_retry_requests"`
  70. }
  71. type PipelineConfig struct {
  72. MaxTries int32 `yaml:"max_tries"`
  73. TryTimeout model.Duration `yaml:"try_timeout"`
  74. RetryDelay model.Duration `yaml:"retry_delay"`
  75. MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
  76. }
  77. // AzureStorage implements the storage.Storage interface against Azure APIs.
  78. type AzureStorage struct {
  79. name string
  80. containerClient *container.Client
  81. config *AzureConfig
  82. }
  83. // Validate checks to see if any of the config options are set.
  84. func (conf *AzureConfig) validate() error {
  85. var errMsg []string
  86. if conf.UserAssignedID != "" && conf.StorageAccountKey != "" {
  87. errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_account_key authentication")
  88. }
  89. if conf.UserAssignedID != "" && conf.StorageConnectionString != "" {
  90. errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_connection_string authentication")
  91. }
  92. if conf.StorageAccountKey != "" && conf.StorageConnectionString != "" {
  93. errMsg = append(errMsg, "storage_account_key and storage_connection_string cannot both be set")
  94. }
  95. if conf.StorageAccountName == "" {
  96. errMsg = append(errMsg, "storage_account_name is required but not configured")
  97. }
  98. if conf.ContainerName == "" {
  99. errMsg = append(errMsg, "no container specified")
  100. }
  101. if conf.PipelineConfig.MaxTries < 0 {
  102. errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
  103. }
  104. if conf.ReaderConfig.MaxRetryRequests < 0 {
  105. errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
  106. }
  107. if len(errMsg) > 0 {
  108. return errors.New(strings.Join(errMsg, ", "))
  109. }
  110. return nil
  111. }
  112. // parseAzureConfig unmarshals a buffer into a Config with default values.
  113. func parseAzureConfig(conf []byte) (AzureConfig, error) {
  114. config := defaultAzureConfig
  115. if err := yaml.UnmarshalStrict(conf, &config); err != nil {
  116. return AzureConfig{}, err
  117. }
  118. // If we don't have config specific retry values but we do have the generic MaxRetries.
  119. // This is for backwards compatibility but also ease of configuration.
  120. if config.MaxRetries > 0 {
  121. if config.PipelineConfig.MaxTries == 0 {
  122. config.PipelineConfig.MaxTries = int32(config.MaxRetries)
  123. }
  124. if config.ReaderConfig.MaxRetryRequests == 0 {
  125. config.ReaderConfig.MaxRetryRequests = config.MaxRetries
  126. }
  127. }
  128. return config, nil
  129. }
  130. // NewAzureStorage returns a new Storage using the provided Azure config.
  131. func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
  132. log.Debugf("Creating new Azure Bucket Connection")
  133. conf, err := parseAzureConfig(azureConfig)
  134. if err != nil {
  135. return nil, fmt.Errorf("error parsing azure storage config: %w", err)
  136. }
  137. return NewAzureStorageWith(conf)
  138. }
  139. // NewAzureStorageWith returns a new Storage using the provided Azure config struct.
  140. func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
  141. if err := conf.validate(); err != nil {
  142. return nil, fmt.Errorf("error validating azure storage config: %w", err)
  143. }
  144. containerClient, err := getContainerClient(conf)
  145. if err != nil {
  146. return nil, fmt.Errorf("error retrieving container client: %w", err)
  147. }
  148. // Check if storage account container already exists, and create one if it does not.
  149. ctx := context.Background()
  150. _, err = containerClient.GetProperties(ctx, &container.GetPropertiesOptions{})
  151. if err != nil {
  152. if !bloberror.HasCode(err, bloberror.ContainerNotFound) {
  153. return nil, err
  154. }
  155. _, err := containerClient.Create(ctx, nil)
  156. if err != nil {
  157. return nil, errors.Wrapf(err, "error creating Azure blob container: %s", conf.ContainerName)
  158. }
  159. log.Infof("Azure blob container successfully created %s", conf.ContainerName)
  160. }
  161. return &AzureStorage{
  162. name: conf.ContainerName,
  163. containerClient: containerClient,
  164. config: &conf,
  165. }, nil
  166. }
  167. // String returns the bucket name for azure storage.
  168. func (as *AzureStorage) String() string {
  169. return as.name
  170. }
  171. // StorageType returns a string identifier for the type of storage used by the implementation.
  172. func (as *AzureStorage) StorageType() StorageType {
  173. return StorageTypeBucketAzure
  174. }
  175. // FullPath returns the storage working path combined with the path provided
  176. func (as *AzureStorage) FullPath(name string) string {
  177. name = trimLeading(name)
  178. return name
  179. }
  180. // Stat returns the StorageStats for the specific path.
  181. func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
  182. name = trimLeading(name)
  183. ctx := context.Background()
  184. blobClient := b.containerClient.NewBlobClient(name)
  185. props, err := blobClient.GetProperties(ctx, nil)
  186. if err != nil {
  187. return nil, fmt.Errorf("error retrieving blob properties: %w", err)
  188. }
  189. return &StorageInfo{
  190. Name: trimName(name),
  191. Size: *props.ContentLength,
  192. ModTime: *props.LastModified,
  193. }, nil
  194. }
  195. // Read uses the relative path of the storage combined with the provided path to
  196. // read the contents.
  197. func (b *AzureStorage) Read(name string) ([]byte, error) {
  198. name = trimLeading(name)
  199. ctx := context.Background()
  200. log.Debugf("AzureStorage::Read::HTTPS(%s)", name)
  201. downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
  202. if err != nil {
  203. return nil, fmt.Errorf("AzureStorage: Read: failed to download blob %q: %w", name, err)
  204. }
  205. // NOTE: automatic retries are performed if the connection fails
  206. retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
  207. MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
  208. })
  209. defer retryReader.Close()
  210. // read the body into a buffer
  211. downloadedData := bytes.Buffer{}
  212. _, err = downloadedData.ReadFrom(retryReader)
  213. if err != nil {
  214. return nil, fmt.Errorf("AzureStorage: Read: failed to read downloaded data %w", err)
  215. }
  216. return downloadedData.Bytes(), nil
  217. }
  218. // ReadStream returns a streaming reader for the specified blob path.
  219. func (b *AzureStorage) ReadStream(path string) (io.ReadCloser, error) {
  220. path = trimLeading(path)
  221. ctx := context.Background()
  222. log.Debugf("AzureStorage::ReadStream::HTTPS(%s)", path)
  223. downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
  224. if err != nil {
  225. if b.IsObjNotFoundErr(err) {
  226. return nil, DoesNotExistError
  227. }
  228. return nil, fmt.Errorf("AzureStorage: ReadStream: failed to download blob %q: %w", path, err)
  229. }
  230. retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
  231. MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
  232. })
  233. return retryReader, nil
  234. }
  235. // ReadToLocalFile streams the specified blob at path to destPath on the local file system.
  236. func (b *AzureStorage) ReadToLocalFile(path, destPath string) error {
  237. path = trimLeading(path)
  238. ctx := context.Background()
  239. log.Debugf("AzureStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
  240. downloadResponse, err := b.containerClient.NewBlobClient(path).DownloadStream(ctx, nil)
  241. if err != nil {
  242. if b.IsObjNotFoundErr(err) {
  243. return DoesNotExistError
  244. }
  245. return fmt.Errorf("AzureStorage: ReadToLocalFile: failed to download blob %q to %q: %w", path, destPath, err)
  246. }
  247. // NOTE: automatic retries are performed if the connection fails.
  248. retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
  249. MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
  250. })
  251. defer retryReader.Close()
  252. if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
  253. return errors.Wrap(err, "creating destination directory")
  254. }
  255. f, err := os.Create(destPath)
  256. if err != nil {
  257. return errors.Wrapf(err, "creating destination file %s", destPath)
  258. }
  259. defer f.Close()
  260. // Use 1 MB buffer for streaming operations
  261. buf := make([]byte, 1024*1024)
  262. if _, err := io.CopyBuffer(f, retryReader, buf); err != nil {
  263. return errors.Wrapf(err, "streaming %s to %s", path, destPath)
  264. }
  265. return nil
  266. }
  267. // Write uses the relative path of the storage combined with the provided path
  268. // to write a new file or overwrite an existing file.
  269. func (b *AzureStorage) Write(name string, data []byte) error {
  270. name = trimLeading(name)
  271. ctx := context.Background()
  272. log.Debugf("AzureStorage::Write::HTTPS(%s)", name)
  273. r := bytes.NewReader(data)
  274. blobClient := b.containerClient.NewBlockBlobClient(name)
  275. opts := &blockblob.UploadStreamOptions{
  276. BlockSize: 3 * 1024 * 1024,
  277. Concurrency: 4,
  278. }
  279. if _, err := blobClient.UploadStream(ctx, r, opts); err != nil {
  280. return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
  281. }
  282. return nil
  283. }
  284. // Remove uses the relative path of the storage combined with the provided path to
  285. // remove a file from storage permanently.
  286. func (b *AzureStorage) Remove(name string) error {
  287. name = trimLeading(name)
  288. log.Debugf("AzureStorage::Remove::HTTPS(%s)", name)
  289. ctx := context.Background()
  290. blobClient := b.containerClient.NewBlobClient(name)
  291. opt := &blob.DeleteOptions{
  292. DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
  293. }
  294. if _, err := blobClient.Delete(ctx, opt); err != nil {
  295. return errors.Wrapf(err, "error deleting blob, address: %s", name)
  296. }
  297. return nil
  298. }
  299. // Exists uses the relative path of the storage combined with the provided path to
  300. // determine if the file exists.
  301. func (b *AzureStorage) Exists(name string) (bool, error) {
  302. name = trimLeading(name)
  303. ctx := context.Background()
  304. blobClient := b.containerClient.NewBlobClient(name)
  305. if _, err := blobClient.GetProperties(ctx, nil); err != nil {
  306. if b.IsObjNotFoundErr(err) {
  307. return false, nil
  308. }
  309. return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
  310. }
  311. return true, nil
  312. }
  313. // List uses the relative path of the storage combined with the provided path to return
  314. // storage information for the files.
  315. func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
  316. path = trimLeading(path)
  317. log.Debugf("AzureStorage::List::HTTPS(%s)", path)
  318. ctx := context.Background()
  319. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  320. // object itself as one prefix item.
  321. if path != "" {
  322. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  323. }
  324. var stats []*StorageInfo
  325. list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
  326. Prefix: &path,
  327. })
  328. for list.More() {
  329. page, err := list.NextPage(ctx)
  330. if err != nil {
  331. return nil, fmt.Errorf("failed to retrieve page: %s", err)
  332. }
  333. segment := page.ListBlobsHierarchySegmentResponse.Segment
  334. if segment == nil {
  335. continue
  336. }
  337. for _, blob := range segment.BlobItems {
  338. if blob.Name == nil {
  339. continue
  340. }
  341. if blob.Properties == nil {
  342. continue
  343. }
  344. stats = append(stats, &StorageInfo{
  345. Name: trimName(*blob.Name),
  346. Size: *blob.Properties.ContentLength,
  347. ModTime: *blob.Properties.LastModified,
  348. })
  349. }
  350. }
  351. return stats, nil
  352. }
  353. func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  354. path = trimLeading(path)
  355. log.Debugf("AzureStorage::ListDirectories::HTTPS(%s)", path)
  356. ctx := context.Background()
  357. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  358. // object itself as one prefix item.
  359. if path != "" {
  360. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  361. }
  362. var stats []*StorageInfo
  363. list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
  364. Prefix: &path,
  365. })
  366. for list.More() {
  367. page, err := list.NextPage(ctx)
  368. if err != nil {
  369. return nil, fmt.Errorf("failed to retrieve page: %s", err)
  370. }
  371. segment := page.ListBlobsHierarchySegmentResponse.Segment
  372. if segment == nil {
  373. continue
  374. }
  375. for _, dir := range segment.BlobPrefixes {
  376. if dir.Name == nil {
  377. continue
  378. }
  379. stats = append(stats, &StorageInfo{
  380. Name: *dir.Name,
  381. })
  382. }
  383. }
  384. return stats, nil
  385. }
  386. // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
  387. func (b *AzureStorage) IsObjNotFoundErr(err error) bool {
  388. if err == nil {
  389. return false
  390. }
  391. return bloberror.HasCode(err, bloberror.BlobNotFound) || bloberror.HasCode(err, bloberror.InvalidURI)
  392. }
  393. // IsAccessDeniedErr returns true if access to object is denied.
  394. func (b *AzureStorage) IsAccessDeniedErr(err error) bool {
  395. if err == nil {
  396. return false
  397. }
  398. return bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) || bloberror.HasCode(err, bloberror.InsufficientAccountPermissions)
  399. }
  400. func getContainerClient(conf AzureConfig) (*container.Client, error) {
  401. dt, err := conf.HTTPConfig.GetHTTPTransport()
  402. if err != nil {
  403. return nil, fmt.Errorf("error creating transport: %w", err)
  404. }
  405. opt := &container.ClientOptions{
  406. ClientOptions: azcore.ClientOptions{
  407. Retry: policy.RetryOptions{
  408. MaxRetries: conf.PipelineConfig.MaxTries,
  409. TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
  410. RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
  411. MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
  412. },
  413. Telemetry: policy.TelemetryOptions{
  414. ApplicationID: "Thanos",
  415. },
  416. Transport: &http.Client{Transport: dt},
  417. },
  418. }
  419. // Use connection string if set
  420. if conf.StorageConnectionString != "" {
  421. containerClient, err := container.NewClientFromConnectionString(conf.StorageConnectionString, conf.ContainerName, opt)
  422. if err != nil {
  423. return nil, fmt.Errorf("error creating client from connection string: %w", err)
  424. }
  425. log.Debugf("AzureStorage: New Azure client initialized for container '%s' using connection string", conf.ContainerName)
  426. return containerClient, nil
  427. }
  428. if conf.Endpoint == "" {
  429. conf.Endpoint = "blob.core.windows.net"
  430. }
  431. // HTTPS Protocol Configuration: Azure Storage always uses HTTPS protocol.
  432. // The containerURL is explicitly constructed with "https://" scheme.
  433. // All Azure blob operations (read, write, delete, list) use this HTTPS URL.
  434. containerURL := fmt.Sprintf("https://%s.%s/%s", conf.StorageAccountName, conf.Endpoint, conf.ContainerName)
  435. log.Debugf("AzureStorage: New Azure client initialized with '%s'", containerURL)
  436. // Use shared keys if set
  437. if conf.StorageAccountKey != "" {
  438. cred, err := container.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
  439. if err != nil {
  440. return nil, fmt.Errorf("error getting shared key credential: %w", err)
  441. }
  442. containerClient, err := container.NewClientWithSharedKeyCredential(containerURL, cred, opt)
  443. if err != nil {
  444. return nil, fmt.Errorf("error creating client with shared key credential: %w", err)
  445. }
  446. return containerClient, nil
  447. }
  448. // Otherwise use a token credential
  449. var cred azcore.TokenCredential
  450. // Use Managed Identity Credential if a user assigned ID is set
  451. if conf.UserAssignedID != "" {
  452. msiOpt := &azidentity.ManagedIdentityCredentialOptions{}
  453. msiOpt.ID = azidentity.ClientID(conf.UserAssignedID)
  454. cred, err = azidentity.NewManagedIdentityCredential(msiOpt)
  455. } else {
  456. // Otherwise use Default Azure Credential
  457. cred, err = azidentity.NewDefaultAzureCredential(nil)
  458. }
  459. if err != nil {
  460. return nil, fmt.Errorf("error creating token credential: %w", err)
  461. }
  462. containerClient, err := container.NewClient(containerURL, cred, opt)
  463. if err != nil {
  464. return nil, fmt.Errorf("error creating client from token credential: %w", err)
  465. }
  466. return containerClient, nil
  467. }