azurestorage.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. package storage
  2. // Fork from Thanos Azure Storage Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/objstore/blob/main/providers/azure/azure.go
  5. import (
  6. "bytes"
  7. "context"
  8. "fmt"
  9. "net/http"
  10. "strings"
  11. "time"
  12. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  13. "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
  14. "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
  15. "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  16. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
  17. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
  18. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
  19. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
  20. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  21. "github.com/opencost/opencost/core/pkg/log"
  22. "github.com/pkg/errors"
  23. "github.com/prometheus/common/model"
  24. "gopkg.in/yaml.v2"
  25. )
  26. const (
  27. azureDefaultEndpoint = "blob.core.windows.net"
  28. )
  29. // Set default retry values to default Azure values. 0 = use Default Azure.
  30. var defaultAzureConfig = AzureConfig{
  31. PipelineConfig: PipelineConfig{
  32. MaxTries: 0,
  33. TryTimeout: 0,
  34. RetryDelay: 0,
  35. MaxRetryDelay: 0,
  36. },
  37. ReaderConfig: ReaderConfig{
  38. MaxRetryRequests: 0,
  39. },
  40. HTTPConfig: HTTPConfig{
  41. IdleConnTimeout: 90 * time.Second,
  42. ResponseHeaderTimeout: 2 * time.Minute,
  43. TLSHandshakeTimeout: 10 * time.Second,
  44. ExpectContinueTimeout: 1 * time.Second,
  45. MaxIdleConns: 100,
  46. MaxIdleConnsPerHost: 100,
  47. MaxConnsPerHost: 0,
  48. DisableCompression: false,
  49. },
  50. }
  51. // AzureConfig Azure storage configuration.
  52. type AzureConfig struct {
  53. StorageAccountName string `yaml:"storage_account"`
  54. StorageAccountKey string `yaml:"storage_account_key"`
  55. StorageConnectionString string `yaml:"storage_connection_string"`
  56. ContainerName string `yaml:"container"`
  57. Endpoint string `yaml:"endpoint"`
  58. MaxRetries int `yaml:"max_retries"`
  59. MSIResource string `yaml:"msi_resource"`
  60. UserAssignedID string `yaml:"user_assigned_id"`
  61. PipelineConfig PipelineConfig `yaml:"pipeline_config"`
  62. ReaderConfig ReaderConfig `yaml:"reader_config"`
  63. HTTPConfig HTTPConfig `yaml:"http_config"`
  64. }
  65. type ReaderConfig struct {
  66. MaxRetryRequests int `yaml:"max_retry_requests"`
  67. }
  68. type PipelineConfig struct {
  69. MaxTries int32 `yaml:"max_tries"`
  70. TryTimeout model.Duration `yaml:"try_timeout"`
  71. RetryDelay model.Duration `yaml:"retry_delay"`
  72. MaxRetryDelay model.Duration `yaml:"max_retry_delay"`
  73. }
  74. // AzureStorage implements the storage.Storage interface against Azure APIs.
  75. type AzureStorage struct {
  76. name string
  77. containerClient *container.Client
  78. config *AzureConfig
  79. }
  80. // Validate checks to see if any of the config options are set.
  81. func (conf *AzureConfig) validate() error {
  82. var errMsg []string
  83. if conf.UserAssignedID != "" && conf.StorageAccountKey != "" {
  84. errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_account_key authentication")
  85. }
  86. if conf.UserAssignedID != "" && conf.StorageConnectionString != "" {
  87. errMsg = append(errMsg, "user_assigned_id cannot be set when using storage_connection_string authentication")
  88. }
  89. if conf.StorageAccountKey != "" && conf.StorageConnectionString != "" {
  90. errMsg = append(errMsg, "storage_account_key and storage_connection_string cannot both be set")
  91. }
  92. if conf.StorageAccountName == "" {
  93. errMsg = append(errMsg, "storage_account_name is required but not configured")
  94. }
  95. if conf.ContainerName == "" {
  96. errMsg = append(errMsg, "no container specified")
  97. }
  98. if conf.PipelineConfig.MaxTries < 0 {
  99. errMsg = append(errMsg, "The value of max_tries must be greater than or equal to 0 in the config file")
  100. }
  101. if conf.ReaderConfig.MaxRetryRequests < 0 {
  102. errMsg = append(errMsg, "The value of max_retry_requests must be greater than or equal to 0 in the config file")
  103. }
  104. if len(errMsg) > 0 {
  105. return errors.New(strings.Join(errMsg, ", "))
  106. }
  107. return nil
  108. }
  109. // parseAzureConfig unmarshals a buffer into a Config with default values.
  110. func parseAzureConfig(conf []byte) (AzureConfig, error) {
  111. config := defaultAzureConfig
  112. if err := yaml.UnmarshalStrict(conf, &config); err != nil {
  113. return AzureConfig{}, err
  114. }
  115. // If we don't have config specific retry values but we do have the generic MaxRetries.
  116. // This is for backwards compatibility but also ease of configuration.
  117. if config.MaxRetries > 0 {
  118. if config.PipelineConfig.MaxTries == 0 {
  119. config.PipelineConfig.MaxTries = int32(config.MaxRetries)
  120. }
  121. if config.ReaderConfig.MaxRetryRequests == 0 {
  122. config.ReaderConfig.MaxRetryRequests = config.MaxRetries
  123. }
  124. }
  125. return config, nil
  126. }
  127. // NewAzureStorage returns a new Storage using the provided Azure config.
  128. func NewAzureStorage(azureConfig []byte) (*AzureStorage, error) {
  129. log.Debugf("Creating new Azure Bucket Connection")
  130. conf, err := parseAzureConfig(azureConfig)
  131. if err != nil {
  132. return nil, fmt.Errorf("error parsing azure storage config: %w", err)
  133. }
  134. return NewAzureStorageWith(conf)
  135. }
  136. // NewAzureStorageWith returns a new Storage using the provided Azure config struct.
  137. func NewAzureStorageWith(conf AzureConfig) (*AzureStorage, error) {
  138. if err := conf.validate(); err != nil {
  139. return nil, fmt.Errorf("error validating azure storage config: %w", err)
  140. }
  141. containerClient, err := getContainerClient(conf)
  142. if err != nil {
  143. return nil, fmt.Errorf("error retrieving container client: %w", err)
  144. }
  145. // Check if storage account container already exists, and create one if it does not.
  146. ctx := context.Background()
  147. _, err = containerClient.GetProperties(ctx, &container.GetPropertiesOptions{})
  148. if err != nil {
  149. if !bloberror.HasCode(err, bloberror.ContainerNotFound) {
  150. return nil, err
  151. }
  152. _, err := containerClient.Create(ctx, nil)
  153. if err != nil {
  154. return nil, errors.Wrapf(err, "error creating Azure blob container: %s", conf.ContainerName)
  155. }
  156. log.Infof("Azure blob container successfully created %s", conf.ContainerName)
  157. }
  158. return &AzureStorage{
  159. name: conf.ContainerName,
  160. containerClient: containerClient,
  161. config: &conf,
  162. }, nil
  163. }
  164. // String returns the bucket name for azure storage.
  165. func (as *AzureStorage) String() string {
  166. return as.name
  167. }
  168. // StorageType returns a string identifier for the type of storage used by the implementation.
  169. func (as *AzureStorage) StorageType() StorageType {
  170. return StorageTypeBucketAzure
  171. }
  172. // FullPath returns the storage working path combined with the path provided
  173. func (as *AzureStorage) FullPath(name string) string {
  174. name = trimLeading(name)
  175. return name
  176. }
  177. // Stat returns the StorageStats for the specific path.
  178. func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
  179. name = trimLeading(name)
  180. ctx := context.Background()
  181. blobClient := b.containerClient.NewBlobClient(name)
  182. props, err := blobClient.GetProperties(ctx, nil)
  183. if err != nil {
  184. return nil, fmt.Errorf("error retrieving blob properties: %w", err)
  185. }
  186. return &StorageInfo{
  187. Name: trimName(name),
  188. Size: *props.ContentLength,
  189. ModTime: *props.LastModified,
  190. }, nil
  191. }
  192. // Read uses the relative path of the storage combined with the provided path to
  193. // read the contents.
  194. func (b *AzureStorage) Read(name string) ([]byte, error) {
  195. name = trimLeading(name)
  196. ctx := context.Background()
  197. log.Debugf("AzureStorage::Read::HTTPS(%s)", name)
  198. downloadResponse, err := b.containerClient.NewBlobClient(name).DownloadStream(ctx, nil)
  199. if err != nil {
  200. return nil, fmt.Errorf("AzureStorage: Read: failed to download %w", err)
  201. }
  202. // NOTE: automatically retries are performed if the connection fails
  203. retryReader := downloadResponse.NewRetryReader(ctx, &azblob.RetryReaderOptions{
  204. MaxRetries: int32(b.config.ReaderConfig.MaxRetryRequests),
  205. })
  206. defer retryReader.Close()
  207. // read the body into a buffer
  208. downloadedData := bytes.Buffer{}
  209. _, err = downloadedData.ReadFrom(retryReader)
  210. if err != nil {
  211. return nil, fmt.Errorf("AzureStorage: Read: failed to read downloaded data %w", err)
  212. }
  213. return downloadedData.Bytes(), nil
  214. }
  215. // Write uses the relative path of the storage combined with the provided path
  216. // to write a new file or overwrite an existing file.
  217. func (b *AzureStorage) Write(name string, data []byte) error {
  218. name = trimLeading(name)
  219. ctx := context.Background()
  220. log.Debugf("AzureStorage::Write::HTTPS(%s)", name)
  221. r := bytes.NewReader(data)
  222. blobClient := b.containerClient.NewBlockBlobClient(name)
  223. opts := &blockblob.UploadStreamOptions{
  224. BlockSize: 3 * 1024 * 1024,
  225. Concurrency: 4,
  226. }
  227. if _, err := blobClient.UploadStream(ctx, r, opts); err != nil {
  228. return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
  229. }
  230. return nil
  231. }
  232. // Remove uses the relative path of the storage combined with the provided path to
  233. // remove a file from storage permanently.
  234. func (b *AzureStorage) Remove(name string) error {
  235. name = trimLeading(name)
  236. log.Debugf("AzureStorage::Remove::HTTPS(%s)", name)
  237. ctx := context.Background()
  238. blobClient := b.containerClient.NewBlobClient(name)
  239. opt := &blob.DeleteOptions{
  240. DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
  241. }
  242. if _, err := blobClient.Delete(ctx, opt); err != nil {
  243. return errors.Wrapf(err, "error deleting blob, address: %s", name)
  244. }
  245. return nil
  246. }
  247. // Exists uses the relative path of the storage combined with the provided path to
  248. // determine if the file exists.
  249. func (b *AzureStorage) Exists(name string) (bool, error) {
  250. name = trimLeading(name)
  251. ctx := context.Background()
  252. blobClient := b.containerClient.NewBlobClient(name)
  253. if _, err := blobClient.GetProperties(ctx, nil); err != nil {
  254. if b.IsObjNotFoundErr(err) {
  255. return false, nil
  256. }
  257. return false, errors.Wrapf(err, "cannot get properties for Azure blob, address: %s", name)
  258. }
  259. return true, nil
  260. }
  261. // List uses the relative path of the storage combined with the provided path to return
  262. // storage information for the files.
  263. func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
  264. path = trimLeading(path)
  265. log.Debugf("AzureStorage::List::HTTPS(%s)", path)
  266. ctx := context.Background()
  267. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  268. // object itself as one prefix item.
  269. if path != "" {
  270. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  271. }
  272. var stats []*StorageInfo
  273. list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
  274. Prefix: &path,
  275. })
  276. for list.More() {
  277. page, err := list.NextPage(ctx)
  278. if err != nil {
  279. return nil, fmt.Errorf("failed to retrieve page: %s", err)
  280. }
  281. segment := page.ListBlobsHierarchySegmentResponse.Segment
  282. if segment == nil {
  283. continue
  284. }
  285. for _, blob := range segment.BlobItems {
  286. if blob.Name == nil {
  287. continue
  288. }
  289. if blob.Properties == nil {
  290. continue
  291. }
  292. stats = append(stats, &StorageInfo{
  293. Name: trimName(*blob.Name),
  294. Size: *blob.Properties.ContentLength,
  295. ModTime: *blob.Properties.LastModified,
  296. })
  297. }
  298. }
  299. return stats, nil
  300. }
  301. func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  302. path = trimLeading(path)
  303. log.Debugf("AzureStorage::ListDirectories::HTTPS(%s)", path)
  304. ctx := context.Background()
  305. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  306. // object itself as one prefix item.
  307. if path != "" {
  308. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  309. }
  310. var stats []*StorageInfo
  311. list := b.containerClient.NewListBlobsHierarchyPager(DirDelim, &container.ListBlobsHierarchyOptions{
  312. Prefix: &path,
  313. })
  314. for list.More() {
  315. page, err := list.NextPage(ctx)
  316. if err != nil {
  317. return nil, fmt.Errorf("failed to retrieve page: %s", err)
  318. }
  319. segment := page.ListBlobsHierarchySegmentResponse.Segment
  320. if segment == nil {
  321. continue
  322. }
  323. for _, dir := range segment.BlobPrefixes {
  324. if dir.Name == nil {
  325. continue
  326. }
  327. stats = append(stats, &StorageInfo{
  328. Name: *dir.Name,
  329. })
  330. }
  331. }
  332. return stats, nil
  333. }
  334. // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
  335. func (b *AzureStorage) IsObjNotFoundErr(err error) bool {
  336. if err == nil {
  337. return false
  338. }
  339. return bloberror.HasCode(err, bloberror.BlobNotFound) || bloberror.HasCode(err, bloberror.InvalidURI)
  340. }
  341. // IsAccessDeniedErr returns true if access to object is denied.
  342. func (b *AzureStorage) IsAccessDeniedErr(err error) bool {
  343. if err == nil {
  344. return false
  345. }
  346. return bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) || bloberror.HasCode(err, bloberror.InsufficientAccountPermissions)
  347. }
  348. func getContainerClient(conf AzureConfig) (*container.Client, error) {
  349. dt, err := conf.HTTPConfig.GetHTTPTransport()
  350. if err != nil {
  351. return nil, fmt.Errorf("error creating transport: %w", err)
  352. }
  353. opt := &container.ClientOptions{
  354. ClientOptions: azcore.ClientOptions{
  355. Retry: policy.RetryOptions{
  356. MaxRetries: conf.PipelineConfig.MaxTries,
  357. TryTimeout: time.Duration(conf.PipelineConfig.TryTimeout),
  358. RetryDelay: time.Duration(conf.PipelineConfig.RetryDelay),
  359. MaxRetryDelay: time.Duration(conf.PipelineConfig.MaxRetryDelay),
  360. },
  361. Telemetry: policy.TelemetryOptions{
  362. ApplicationID: "Thanos",
  363. },
  364. Transport: &http.Client{Transport: dt},
  365. },
  366. }
  367. // Use connection string if set
  368. if conf.StorageConnectionString != "" {
  369. containerClient, err := container.NewClientFromConnectionString(conf.StorageConnectionString, conf.ContainerName, opt)
  370. if err != nil {
  371. return nil, fmt.Errorf("error creating client from connection string: %w", err)
  372. }
  373. log.Debugf("AzureStorage: New Azure client initialized for container '%s' using connection string", conf.ContainerName)
  374. return containerClient, nil
  375. }
  376. if conf.Endpoint == "" {
  377. conf.Endpoint = "blob.core.windows.net"
  378. }
  379. // HTTPS Protocol Configuration: Azure Storage always uses HTTPS protocol.
  380. // The containerURL is explicitly constructed with "https://" scheme.
  381. // All Azure blob operations (read, write, delete, list) use this HTTPS URL.
  382. containerURL := fmt.Sprintf("https://%s.%s/%s", conf.StorageAccountName, conf.Endpoint, conf.ContainerName)
  383. log.Debugf("AzureStorage: New Azure client initialized with '%s'", containerURL)
  384. // Use shared keys if set
  385. if conf.StorageAccountKey != "" {
  386. cred, err := container.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
  387. if err != nil {
  388. return nil, fmt.Errorf("error getting shared key credential: %w", err)
  389. }
  390. containerClient, err := container.NewClientWithSharedKeyCredential(containerURL, cred, opt)
  391. if err != nil {
  392. return nil, fmt.Errorf("error creating client with shared key credential: %w", err)
  393. }
  394. return containerClient, nil
  395. }
  396. // Otherwise use a token credential
  397. var cred azcore.TokenCredential
  398. // Use Managed Identity Credential if a user assigned ID is set
  399. if conf.UserAssignedID != "" {
  400. msiOpt := &azidentity.ManagedIdentityCredentialOptions{}
  401. msiOpt.ID = azidentity.ClientID(conf.UserAssignedID)
  402. cred, err = azidentity.NewManagedIdentityCredential(msiOpt)
  403. } else {
  404. // Otherwise use Default Azure Credential
  405. cred, err = azidentity.NewDefaultAzureCredential(nil)
  406. }
  407. if err != nil {
  408. return nil, fmt.Errorf("error creating token credential: %w", err)
  409. }
  410. containerClient, err := container.NewClient(containerURL, cred, opt)
  411. if err != nil {
  412. return nil, fmt.Errorf("error creating client from token credential: %w", err)
  413. }
  414. return containerClient, nil
  415. }