s3storage.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726
  1. package storage
  2. // Fork from Thanos S3 Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
  5. import (
  6. "bytes"
  7. "context"
  8. "io"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "time"
  14. "github.com/opencost/opencost/core/pkg/log"
  15. "github.com/aws/aws-sdk-go-v2/aws"
  16. awsconfig "github.com/aws/aws-sdk-go-v2/config"
  17. "github.com/minio/minio-go/v7"
  18. "github.com/minio/minio-go/v7/pkg/credentials"
  19. "github.com/minio/minio-go/v7/pkg/encrypt"
  20. "github.com/pkg/errors"
  21. "gopkg.in/yaml.v2"
  22. )
  23. type ctxKey int
  24. const (
  25. // SSEKMS is the name of the SSE-KMS method for objectstore encryption.
  26. SSEKMS = "SSE-KMS"
  27. // SSEC is the name of the SSE-C method for objstore encryption.
  28. SSEC = "SSE-C"
  29. // SSES3 is the name of the SSE-S3 method for objstore encryption.
  30. SSES3 = "SSE-S3"
  31. // sseConfigKey is the context key to override SSE config. This feature is used by downstream
  32. // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
  33. // refactoring can introduce breaking changes as far as the functionality is preserved.
  34. // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
  35. // be available to wider set of backends we should probably add a variadic option to Get() and Upload().
  36. sseConfigKey = ctxKey(0)
  37. )
  38. var defaultS3Config = S3Config{
  39. PutUserMetadata: map[string]string{},
  40. HTTPConfig: HTTPConfig{
  41. IdleConnTimeout: 90 * time.Second,
  42. ResponseHeaderTimeout: 2 * time.Minute,
  43. TLSHandshakeTimeout: 10 * time.Second,
  44. ExpectContinueTimeout: 1 * time.Second,
  45. MaxIdleConns: 100,
  46. MaxIdleConnsPerHost: 100,
  47. MaxConnsPerHost: 0,
  48. DisableCompression: true,
  49. },
  50. PartSize: 1024 * 1024 * 64, // 64MB.
  51. }
  52. // Config stores the configuration for s3 bucket.
  53. type S3Config struct {
  54. Bucket string `yaml:"bucket"`
  55. Endpoint string `yaml:"endpoint"`
  56. Region string `yaml:"region"`
  57. AWSSDKAuth bool `yaml:"aws_sdk_auth"`
  58. AccessKey string `yaml:"access_key"`
  59. Insecure bool `yaml:"insecure"`
  60. SignatureV2 bool `yaml:"signature_version2"`
  61. SecretKey string `yaml:"secret_key"`
  62. PutUserMetadata map[string]string `yaml:"put_user_metadata"`
  63. HTTPConfig HTTPConfig `yaml:"http_config"`
  64. TraceConfig TraceConfig `yaml:"trace"`
  65. ListObjectsVersion string `yaml:"list_objects_version"`
  66. // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
  67. // NOTE we need to make sure this number does not produce more parts than 10 000.
  68. PartSize uint64 `yaml:"part_size"`
  69. SSEConfig SSEConfig `yaml:"sse_config"`
  70. STSEndpoint string `yaml:"sts_endpoint"`
  71. }
  72. // SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
  73. // kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context
  74. type SSEConfig struct {
  75. Type string `yaml:"type"`
  76. KMSKeyID string `yaml:"kms_key_id"`
  77. KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"`
  78. EncryptionKey string `yaml:"encryption_key"`
  79. }
  80. type TraceConfig struct {
  81. Enable bool `yaml:"enable"`
  82. }
  83. // S3Storage provides storage via S3
  84. type S3Storage struct {
  85. name string
  86. client *minio.Client
  87. defaultSSE encrypt.ServerSide
  88. putUserMetadata map[string]string
  89. partSize uint64
  90. listObjectsV1 bool
  91. insecure bool
  92. }
  93. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  94. func parseS3Config(conf []byte) (S3Config, error) {
  95. config := defaultS3Config
  96. if err := yaml.Unmarshal(conf, &config); err != nil {
  97. return S3Config{}, err
  98. }
  99. return config, nil
  100. }
  101. // NewBucket returns a new Bucket using the provided s3 config values.
  102. func NewS3Storage(conf []byte) (*S3Storage, error) {
  103. config, err := parseS3Config(conf)
  104. if err != nil {
  105. return nil, err
  106. }
  107. return NewS3StorageWith(config)
  108. }
  109. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  110. func NewS3StorageWith(config S3Config) (*S3Storage, error) {
  111. var chain []credentials.Provider
  112. wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p }
  113. if config.SignatureV2 {
  114. wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider {
  115. return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2}
  116. }
  117. }
  118. if err := validate(config); err != nil {
  119. return nil, err
  120. }
  121. if config.AWSSDKAuth {
  122. chain = []credentials.Provider{
  123. wrapCredentialsProvider(&awsAuth{Region: config.Region}),
  124. }
  125. } else if config.AccessKey != "" {
  126. chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
  127. Value: credentials.Value{
  128. AccessKeyID: config.AccessKey,
  129. SecretAccessKey: config.SecretKey,
  130. SignerType: credentials.SignatureV4,
  131. },
  132. })}
  133. } else {
  134. chain = []credentials.Provider{
  135. wrapCredentialsProvider(&credentials.EnvAWS{}),
  136. wrapCredentialsProvider(&credentials.FileAWSCredentials{}),
  137. wrapCredentialsProvider(&credentials.IAM{
  138. Client: &http.Client{
  139. Transport: http.DefaultTransport,
  140. },
  141. Endpoint: config.STSEndpoint,
  142. }),
  143. }
  144. }
  145. rt, err := config.HTTPConfig.GetHTTPTransport()
  146. if err != nil {
  147. return nil, err
  148. }
  149. // HTTPS Protocol Configuration: The 'Secure' option controls whether HTTPS is used.
  150. // By default, config.Insecure is false if not set, so Secure=true.
  151. client, err := minio.New(config.Endpoint, &minio.Options{
  152. Creds: credentials.NewChainCredentials(chain),
  153. Secure: !config.Insecure,
  154. Region: config.Region,
  155. Transport: rt,
  156. })
  157. if err != nil {
  158. return nil, errors.Wrap(err, "initialize s3 client")
  159. }
  160. var sse encrypt.ServerSide
  161. if config.SSEConfig.Type != "" {
  162. switch config.SSEConfig.Type {
  163. case SSEKMS:
  164. // If the KMSEncryptionContext is a nil map the header that is
  165. // constructed by the encrypt.ServerSide object will be base64
  166. // encoded "nil" which is not accepted by AWS.
  167. if config.SSEConfig.KMSEncryptionContext == nil {
  168. config.SSEConfig.KMSEncryptionContext = make(map[string]string)
  169. }
  170. sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
  171. if err != nil {
  172. return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
  173. }
  174. case SSEC:
  175. key, err := os.ReadFile(config.SSEConfig.EncryptionKey)
  176. if err != nil {
  177. return nil, err
  178. }
  179. sse, err = encrypt.NewSSEC(key)
  180. if err != nil {
  181. return nil, errors.Wrap(err, "initialize s3 client SSE-C")
  182. }
  183. case SSES3:
  184. sse = encrypt.NewSSE()
  185. default:
  186. sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
  187. return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
  188. }
  189. }
  190. if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" {
  191. return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion)
  192. }
  193. bkt := &S3Storage{
  194. name: config.Bucket,
  195. client: client,
  196. defaultSSE: sse,
  197. putUserMetadata: config.PutUserMetadata,
  198. partSize: config.PartSize,
  199. listObjectsV1: config.ListObjectsVersion == "v1",
  200. insecure: config.Insecure,
  201. }
  202. log.Debugf("S3Storage: New S3 client initialized with '%s://%s/%s'", bkt.protocol(), config.Endpoint, config.Bucket)
  203. return bkt, nil
  204. }
  205. // String returns the bucket name for s3.
  206. func (s3 *S3Storage) String() string {
  207. return s3.name
  208. }
  209. // StorageType returns a string identifier for the type of storage used by the implementation.
  210. func (s3 *S3Storage) StorageType() StorageType {
  211. return StorageTypeBucketS3
  212. }
  213. // protocol returns the protocol string (HTTP or HTTPS) based on configuration
  214. func (s3 *S3Storage) protocol() string {
  215. if s3.insecure {
  216. return "HTTP"
  217. }
  218. return "HTTPS"
  219. }
  220. // validate checks to see the config options are set.
  221. func validate(conf S3Config) error {
  222. if conf.Endpoint == "" {
  223. return errors.New("no s3 endpoint in config file")
  224. }
  225. if conf.AWSSDKAuth && conf.AccessKey != "" {
  226. return errors.New("aws_sdk_auth and access_key are mutually exclusive configurations")
  227. }
  228. if conf.AccessKey == "" && conf.SecretKey != "" {
  229. return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  230. }
  231. if conf.AccessKey != "" && conf.SecretKey == "" {
  232. return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  233. }
  234. if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" {
  235. return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'")
  236. }
  237. if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" {
  238. return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'")
  239. }
  240. return nil
  241. }
  242. // FullPath returns the storage working path combined with the path provided
  243. func (s3 *S3Storage) FullPath(name string) string {
  244. name = trimLeading(name)
  245. return name
  246. }
  247. // Get returns a reader for the given object name.
  248. func (s3 *S3Storage) Read(name string) ([]byte, error) {
  249. name = trimLeading(name)
  250. log.Debugf("S3Storage::Read::%s(%s)", s3.protocol(), name)
  251. ctx := context.Background()
  252. return s3.getRange(ctx, name, 0, -1)
  253. }
  254. // ReadStream returns an io.ReadCloser that streams an object from S3.
  255. func (s3 *S3Storage) ReadStream(path string) (io.ReadCloser, error) {
  256. path = trimLeading(path)
  257. log.Debugf("S3Storage::ReadStream::%s(%s)", s3.protocol(), path)
  258. ctx := context.Background()
  259. sse, err := s3.getServerSideEncryption(ctx)
  260. if err != nil {
  261. return nil, err
  262. }
  263. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  264. r, err := s3.client.GetObject(ctx, s3.name, path, *opts)
  265. if err != nil {
  266. if s3.isObjNotFound(err) {
  267. return nil, DoesNotExistError
  268. }
  269. return nil, err
  270. }
  271. // Force a metadata call and surface "not found" errors early,
  272. // matching behavior in getRange().
  273. if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil {
  274. if s3.isObjNotFound(err) || s3.isDoesNotExist(err) {
  275. _ = r.Close()
  276. return nil, DoesNotExistError
  277. }
  278. _ = r.Close()
  279. return nil, errors.Wrap(err, "StatObject from S3 failed")
  280. }
  281. return r, nil
  282. }
  283. // ReadToLocalFile streams the specified object at path to destPath on the local file system.
  284. func (s3 *S3Storage) ReadToLocalFile(path, destPath string) error {
  285. path = trimLeading(path)
  286. log.Debugf("S3Storage::ReadToLocalFile::%s(%s) -> %s", s3.protocol(), path, destPath)
  287. ctx := context.Background()
  288. sse, err := s3.getServerSideEncryption(ctx)
  289. if err != nil {
  290. return err
  291. }
  292. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  293. r, err := s3.client.GetObject(ctx, s3.name, path, *opts)
  294. if err != nil {
  295. if s3.isObjNotFound(err) {
  296. return DoesNotExistError
  297. }
  298. return err
  299. }
  300. defer r.Close()
  301. // Force a metadata call and surface "not found" errors early,
  302. // matching behavior in getRange().
  303. if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil {
  304. if s3.isObjNotFound(err) {
  305. return DoesNotExistError
  306. }
  307. return errors.Wrap(err, "StatObject from S3 failed")
  308. }
  309. dir := filepath.Dir(destPath)
  310. if err := os.MkdirAll(dir, os.ModePerm); err != nil {
  311. return errors.Wrap(err, "creating destination directory")
  312. }
  313. // Write to a temporary file in the same directory to avoid leaving a
  314. // partially-written file at destPath on error. Rename atomically on success.
  315. tmpFile, err := os.CreateTemp(dir, ".s3-read-*")
  316. if err != nil {
  317. return errors.Wrapf(err, "creating temporary file in %s", dir)
  318. }
  319. tmpPath := tmpFile.Name()
  320. // Ensure temporary file is cleaned up on error.
  321. success := false
  322. defer func() {
  323. if !success {
  324. _ = tmpFile.Close()
  325. _ = os.Remove(tmpPath)
  326. }
  327. }()
  328. // Use 1 MB buffer for streaming operations
  329. buf := make([]byte, 1024*1024)
  330. if _, err := io.CopyBuffer(tmpFile, r, buf); err != nil {
  331. return errors.Wrapf(err, "streaming %s to %s", path, destPath)
  332. }
  333. // Ensure data is flushed to disk before renaming.
  334. if err := tmpFile.Sync(); err != nil {
  335. return errors.Wrapf(err, "syncing temporary file for %s", destPath)
  336. }
  337. if err := tmpFile.Close(); err != nil {
  338. return errors.Wrapf(err, "closing temporary file for %s", destPath)
  339. }
  340. // Atomically move the fully written temp file into place.
  341. if err := os.Rename(tmpPath, destPath); err != nil {
  342. return errors.Wrapf(err, "renaming temporary file to %s", destPath)
  343. }
  344. success = true
  345. return nil
  346. }
  347. // Exists checks if the given object exists.
  348. func (s3 *S3Storage) Exists(name string) (bool, error) {
  349. name = trimLeading(name)
  350. log.Debugf("S3Storage::Exists::%s(%s)", s3.protocol(), name)
  351. ctx := context.Background()
  352. _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  353. if err != nil {
  354. if s3.isDoesNotExist(err) {
  355. return false, nil
  356. }
  357. return false, errors.Wrap(err, "stat s3 object")
  358. }
  359. return true, nil
  360. }
  361. // Upload the contents of the reader as an object into the bucket.
  362. func (s3 *S3Storage) Write(name string, data []byte) error {
  363. name = trimLeading(name)
  364. log.Debugf("S3Storage::Write::%s(%s)", s3.protocol(), name)
  365. ctx := context.Background()
  366. sse, err := s3.getServerSideEncryption(ctx)
  367. if err != nil {
  368. return err
  369. }
  370. var size int64 = int64(len(data))
  371. // Set partSize to 0 to write files in one go. This prevents chunking of
  372. // upload into multiple parts, which requires additional memory for buffering
  373. // the sub-parts. To remain consistent with other storage implementations,
  374. // we would rather attempt to lower cost fast upload and fast-fail.
  375. var partSize uint64 = 0
  376. r := bytes.NewReader(data)
  377. _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
  378. PartSize: partSize,
  379. ServerSideEncryption: sse,
  380. UserMetadata: s3.putUserMetadata,
  381. })
  382. if err != nil {
  383. return errors.Wrap(err, "upload s3 object")
  384. }
  385. return nil
  386. }
  387. // Attributes returns information about the specified object.
  388. func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
  389. name = trimLeading(name)
  390. log.Debugf("S3Storage::Stat::%s(%s)", s3.protocol(), name)
  391. ctx := context.Background()
  392. objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  393. if err != nil {
  394. if s3.isDoesNotExist(err) {
  395. return nil, DoesNotExistError
  396. }
  397. return nil, err
  398. }
  399. return &StorageInfo{
  400. Name: trimName(name),
  401. Size: objInfo.Size,
  402. ModTime: objInfo.LastModified,
  403. }, nil
  404. }
  405. // Delete removes the object with the given name.
  406. func (s3 *S3Storage) Remove(name string) error {
  407. name = trimLeading(name)
  408. log.Debugf("S3Storage::Remove::%s(%s)", s3.protocol(), name)
  409. ctx := context.Background()
  410. return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{})
  411. }
  412. func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
  413. path = trimLeading(path)
  414. log.Debugf("S3Storage::List::%s(%s)", s3.protocol(), path)
  415. ctx := context.Background()
  416. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  417. // object itself as one prefix item.
  418. if path != "" {
  419. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  420. }
  421. opts := minio.ListObjectsOptions{
  422. Prefix: path,
  423. Recursive: false,
  424. UseV1: s3.listObjectsV1,
  425. }
  426. var stats []*StorageInfo
  427. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  428. // Catch the error when failed to list objects.
  429. if object.Err != nil {
  430. return nil, object.Err
  431. }
  432. // The s3 client can also return the directory itself in the ListObjects call above.
  433. if object.Key == path {
  434. continue
  435. }
  436. name := trimName(object.Key)
  437. // This sometimes happens with empty buckets.
  438. if name == "" {
  439. continue
  440. }
  441. stats = append(stats, &StorageInfo{
  442. Name: name,
  443. Size: object.Size,
  444. ModTime: object.LastModified,
  445. })
  446. }
  447. return stats, nil
  448. }
  449. func (s3 *S3Storage) ListDirectories(path string) ([]*StorageInfo, error) {
  450. path = trimLeading(path)
  451. log.Debugf("S3Storage::ListDirectories::%s(%s)", s3.protocol(), path)
  452. ctx := context.Background()
  453. if path != "" {
  454. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  455. }
  456. opts := minio.ListObjectsOptions{
  457. Prefix: path,
  458. Recursive: false,
  459. UseV1: s3.listObjectsV1,
  460. }
  461. var stats []*StorageInfo
  462. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  463. if object.Err != nil {
  464. return nil, object.Err
  465. }
  466. if object.Key == "" {
  467. continue
  468. }
  469. if object.Key == path {
  470. continue
  471. }
  472. // If trim removes the entire name, it's a directory, ergo we list it
  473. if trimName(object.Key) == "" {
  474. stats = append(stats, &StorageInfo{
  475. Name: object.Key,
  476. Size: object.Size,
  477. ModTime: object.LastModified,
  478. })
  479. }
  480. }
  481. return stats, nil
  482. }
  483. // getServerSideEncryption returns the SSE to use.
  484. func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
  485. if value := ctx.Value(sseConfigKey); value != nil {
  486. if sse, ok := value.(encrypt.ServerSide); ok {
  487. return sse, nil
  488. }
  489. return nil, errors.New("invalid SSE config override provided in the context")
  490. }
  491. return s3.defaultSSE, nil
  492. }
  493. // isDoesNotExist returns true if error means that object key is not found.
  494. func (s3 *S3Storage) isDoesNotExist(err error) bool {
  495. return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey"
  496. }
  497. // isObjNotFound returns true if the error means that the object was not found
  498. func (s3 *S3Storage) isObjNotFound(err error) bool {
  499. return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject"
  500. }
  501. func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) {
  502. sse, err := s3.getServerSideEncryption(ctx)
  503. if err != nil {
  504. return nil, err
  505. }
  506. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  507. if err := setGetObjectRange(opts, off, length); err != nil {
  508. return nil, err
  509. }
  510. r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
  511. if err != nil {
  512. if s3.isObjNotFound(err) {
  513. return nil, DoesNotExistError
  514. }
  515. return nil, err
  516. }
  517. // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
  518. // for convenience.
  519. if _, err := r.Read(nil); err != nil {
  520. if s3.isObjNotFound(err) {
  521. _ = r.Close()
  522. return nil, DoesNotExistError
  523. }
  524. _ = r.Close()
  525. return nil, errors.Wrap(err, "Read from S3 failed")
  526. }
  527. defer r.Close()
  528. return io.ReadAll(r)
  529. }
  530. func setGetObjectRange(opts *minio.GetObjectOptions, off, length int64) error {
  531. if off < 0 {
  532. return errors.New("range offset must be >= 0")
  533. }
  534. if length < -1 || length == 0 {
  535. return errors.New("range length must be -1 or > 0")
  536. }
  537. if length > 0 {
  538. return opts.SetRange(off, off+length-1)
  539. }
  540. if off > 0 {
  541. return opts.SetRange(off, 0)
  542. }
  543. return nil
  544. }
  545. // awsAuth retrieves credentials from the aws-sdk-go.
  546. type awsAuth struct {
  547. Region string
  548. creds aws.Credentials
  549. }
  550. // Retrieve retrieves the keys from the environment.
  551. func (a *awsAuth) Retrieve() (credentials.Value, error) {
  552. cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(a.Region))
  553. if err != nil {
  554. return credentials.Value{}, errors.Wrap(err, "load AWS SDK config")
  555. }
  556. creds, err := cfg.Credentials.Retrieve(context.TODO())
  557. if err != nil {
  558. return credentials.Value{}, errors.Wrap(err, "retrieve AWS SDK credentials")
  559. }
  560. a.creds = creds
  561. return credentials.Value{
  562. AccessKeyID: creds.AccessKeyID,
  563. SecretAccessKey: creds.SecretAccessKey,
  564. SessionToken: creds.SessionToken,
  565. SignerType: credentials.SignatureV4,
  566. }, nil
  567. }
  568. func (a *awsAuth) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) {
  569. return a.Retrieve()
  570. }
  571. // IsExpired returns if the credentials have been retrieved.
  572. func (a *awsAuth) IsExpired() bool {
  573. return a.creds.Expired()
  574. }
  575. type overrideSignerType struct {
  576. credentials.Provider
  577. signerType credentials.SignatureType
  578. }
  579. func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
  580. v, err := s.Provider.Retrieve()
  581. if err != nil {
  582. return v, err
  583. }
  584. if !v.SignerType.IsAnonymous() {
  585. v.SignerType = s.signerType
  586. }
  587. return v, nil
  588. }
  589. func (s *overrideSignerType) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) {
  590. v, err := s.Provider.RetrieveWithCredContext(ctx)
  591. if err != nil {
  592. return v, err
  593. }
  594. if !v.SignerType.IsAnonymous() {
  595. v.SignerType = s.signerType
  596. }
  597. return v, nil
  598. }