s3storage.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. package storage
  2. // Fork from Thanos S3 Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
  5. import (
  6. "bytes"
  7. "context"
  8. "io"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "time"
  14. "github.com/opencost/opencost/core/pkg/log"
  15. "github.com/aws/aws-sdk-go-v2/aws"
  16. awsconfig "github.com/aws/aws-sdk-go-v2/config"
  17. "github.com/minio/minio-go/v7"
  18. "github.com/minio/minio-go/v7/pkg/credentials"
  19. "github.com/minio/minio-go/v7/pkg/encrypt"
  20. "github.com/pkg/errors"
  21. "gopkg.in/yaml.v2"
  22. )
  23. type ctxKey int
  24. const (
  25. // SSEKMS is the name of the SSE-KMS method for objectstore encryption.
  26. SSEKMS = "SSE-KMS"
  27. // SSEC is the name of the SSE-C method for objstore encryption.
  28. SSEC = "SSE-C"
  29. // SSES3 is the name of the SSE-S3 method for objstore encryption.
  30. SSES3 = "SSE-S3"
  31. // sseConfigKey is the context key to override SSE config. This feature is used by downstream
  32. // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
  33. // refactoring can introduce breaking changes as far as the functionality is preserved.
  34. // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
  35. // be available to wider set of backends we should probably add a variadic option to Get() and Upload().
  36. sseConfigKey = ctxKey(0)
  37. )
  38. var defaultS3Config = S3Config{
  39. PutUserMetadata: map[string]string{},
  40. HTTPConfig: HTTPConfig{
  41. IdleConnTimeout: 90 * time.Second,
  42. ResponseHeaderTimeout: 2 * time.Minute,
  43. TLSHandshakeTimeout: 10 * time.Second,
  44. ExpectContinueTimeout: 1 * time.Second,
  45. MaxIdleConns: 100,
  46. MaxIdleConnsPerHost: 100,
  47. MaxConnsPerHost: 0,
  48. DisableCompression: true,
  49. },
  50. PartSize: 1024 * 1024 * 64, // 64MB.
  51. }
  52. const defaultS3ReadChunkSize int64 = 8 * 1024 * 1024
  53. // Config stores the configuration for s3 bucket.
  54. type S3Config struct {
  55. Bucket string `yaml:"bucket"`
  56. Endpoint string `yaml:"endpoint"`
  57. Region string `yaml:"region"`
  58. AWSSDKAuth bool `yaml:"aws_sdk_auth"`
  59. AccessKey string `yaml:"access_key"`
  60. Insecure bool `yaml:"insecure"`
  61. SignatureV2 bool `yaml:"signature_version2"`
  62. SecretKey string `yaml:"secret_key"`
  63. PutUserMetadata map[string]string `yaml:"put_user_metadata"`
  64. HTTPConfig HTTPConfig `yaml:"http_config"`
  65. TraceConfig TraceConfig `yaml:"trace"`
  66. ListObjectsVersion string `yaml:"list_objects_version"`
  67. // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
  68. // NOTE we need to make sure this number does not produce more parts than 10 000.
  69. PartSize uint64 `yaml:"part_size"`
  70. SSEConfig SSEConfig `yaml:"sse_config"`
  71. STSEndpoint string `yaml:"sts_endpoint"`
  72. }
  73. // SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
  74. // kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context
  75. type SSEConfig struct {
  76. Type string `yaml:"type"`
  77. KMSKeyID string `yaml:"kms_key_id"`
  78. KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"`
  79. EncryptionKey string `yaml:"encryption_key"`
  80. }
  81. type TraceConfig struct {
  82. Enable bool `yaml:"enable"`
  83. }
  84. // S3Storage provides storage via S3
  85. type S3Storage struct {
  86. name string
  87. client *minio.Client
  88. defaultSSE encrypt.ServerSide
  89. putUserMetadata map[string]string
  90. partSize uint64
  91. listObjectsV1 bool
  92. insecure bool
  93. }
  94. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  95. func parseS3Config(conf []byte) (S3Config, error) {
  96. config := defaultS3Config
  97. if err := yaml.Unmarshal(conf, &config); err != nil {
  98. return S3Config{}, err
  99. }
  100. return config, nil
  101. }
  102. // NewBucket returns a new Bucket using the provided s3 config values.
  103. func NewS3Storage(conf []byte) (*S3Storage, error) {
  104. config, err := parseS3Config(conf)
  105. if err != nil {
  106. return nil, err
  107. }
  108. return NewS3StorageWith(config)
  109. }
  110. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  111. func NewS3StorageWith(config S3Config) (*S3Storage, error) {
  112. var chain []credentials.Provider
  113. wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p }
  114. if config.SignatureV2 {
  115. wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider {
  116. return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2}
  117. }
  118. }
  119. if err := validate(config); err != nil {
  120. return nil, err
  121. }
  122. if config.AWSSDKAuth {
  123. chain = []credentials.Provider{
  124. wrapCredentialsProvider(&awsAuth{Region: config.Region}),
  125. }
  126. } else if config.AccessKey != "" {
  127. chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
  128. Value: credentials.Value{
  129. AccessKeyID: config.AccessKey,
  130. SecretAccessKey: config.SecretKey,
  131. SignerType: credentials.SignatureV4,
  132. },
  133. })}
  134. } else {
  135. chain = []credentials.Provider{
  136. wrapCredentialsProvider(&credentials.EnvAWS{}),
  137. wrapCredentialsProvider(&credentials.FileAWSCredentials{}),
  138. wrapCredentialsProvider(&credentials.IAM{
  139. Client: &http.Client{
  140. Transport: http.DefaultTransport,
  141. },
  142. Endpoint: config.STSEndpoint,
  143. }),
  144. }
  145. }
  146. rt, err := config.HTTPConfig.GetHTTPTransport()
  147. if err != nil {
  148. return nil, err
  149. }
  150. // HTTPS Protocol Configuration: The 'Secure' option controls whether HTTPS is used.
  151. // By default, config.Insecure is false if not set, so Secure=true.
  152. client, err := minio.New(config.Endpoint, &minio.Options{
  153. Creds: credentials.NewChainCredentials(chain),
  154. Secure: !config.Insecure,
  155. Region: config.Region,
  156. Transport: rt,
  157. })
  158. if err != nil {
  159. return nil, errors.Wrap(err, "initialize s3 client")
  160. }
  161. var sse encrypt.ServerSide
  162. if config.SSEConfig.Type != "" {
  163. switch config.SSEConfig.Type {
  164. case SSEKMS:
  165. // If the KMSEncryptionContext is a nil map the header that is
  166. // constructed by the encrypt.ServerSide object will be base64
  167. // encoded "nil" which is not accepted by AWS.
  168. if config.SSEConfig.KMSEncryptionContext == nil {
  169. config.SSEConfig.KMSEncryptionContext = make(map[string]string)
  170. }
  171. sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
  172. if err != nil {
  173. return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
  174. }
  175. case SSEC:
  176. key, err := os.ReadFile(config.SSEConfig.EncryptionKey)
  177. if err != nil {
  178. return nil, err
  179. }
  180. sse, err = encrypt.NewSSEC(key)
  181. if err != nil {
  182. return nil, errors.Wrap(err, "initialize s3 client SSE-C")
  183. }
  184. case SSES3:
  185. sse = encrypt.NewSSE()
  186. default:
  187. sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
  188. return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
  189. }
  190. }
  191. if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" {
  192. return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion)
  193. }
  194. bkt := &S3Storage{
  195. name: config.Bucket,
  196. client: client,
  197. defaultSSE: sse,
  198. putUserMetadata: config.PutUserMetadata,
  199. partSize: config.PartSize,
  200. listObjectsV1: config.ListObjectsVersion == "v1",
  201. insecure: config.Insecure,
  202. }
  203. log.Debugf("S3Storage: New S3 client initialized with '%s://%s/%s'", bkt.protocol(), config.Endpoint, config.Bucket)
  204. return bkt, nil
  205. }
  206. // String returns the bucket name for s3.
  207. func (s3 *S3Storage) String() string {
  208. return s3.name
  209. }
  210. // StorageType returns a string identifier for the type of storage used by the implementation.
  211. func (s3 *S3Storage) StorageType() StorageType {
  212. return StorageTypeBucketS3
  213. }
  214. // protocol returns the protocol string (HTTP or HTTPS) based on configuration
  215. func (s3 *S3Storage) protocol() string {
  216. if s3.insecure {
  217. return "HTTP"
  218. }
  219. return "HTTPS"
  220. }
  221. // validate checks to see the config options are set.
  222. func validate(conf S3Config) error {
  223. if conf.Endpoint == "" {
  224. return errors.New("no s3 endpoint in config file")
  225. }
  226. if conf.AWSSDKAuth && conf.AccessKey != "" {
  227. return errors.New("aws_sdk_auth and access_key are mutually exclusive configurations")
  228. }
  229. if conf.AccessKey == "" && conf.SecretKey != "" {
  230. return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  231. }
  232. if conf.AccessKey != "" && conf.SecretKey == "" {
  233. return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  234. }
  235. if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" {
  236. return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'")
  237. }
  238. if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" {
  239. return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'")
  240. }
  241. return nil
  242. }
  243. // FullPath returns the storage working path combined with the path provided
  244. func (s3 *S3Storage) FullPath(name string) string {
  245. name = trimLeading(name)
  246. return name
  247. }
  248. // Get returns a reader for the given object name.
  249. func (s3 *S3Storage) Read(name string) ([]byte, error) {
  250. name = trimLeading(name)
  251. log.Debugf("S3Storage::Read::%s(%s)", s3.protocol(), name)
  252. ctx := context.Background()
  253. return s3.getRange(ctx, name, 0, -1)
  254. }
  255. // ReadStream returns an io.ReadCloser that incrementally streams an object from S3
  256. // by issuing byte-range requests under the hood.
  257. func (s3 *S3Storage) ReadStream(path string) (io.ReadCloser, error) {
  258. path = trimLeading(path)
  259. log.Debugf("S3Storage::ReadStream::%s(%s)", s3.protocol(), path)
  260. ctx := context.Background()
  261. sse, err := s3.getServerSideEncryption(ctx)
  262. if err != nil {
  263. return nil, err
  264. }
  265. objInfo, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse})
  266. if err != nil {
  267. if s3.isDoesNotExist(err) || s3.isObjNotFound(err) {
  268. return nil, DoesNotExistError
  269. }
  270. return nil, errors.Wrap(err, "StatObject from S3 failed")
  271. }
  272. return newS3ChunkReader(objInfo.Size, defaultS3ReadChunkSize, func(off, length int64) ([]byte, error) {
  273. return s3.getRange(ctx, path, off, length)
  274. }), nil
  275. }
  276. // ReadToLocalFile streams the specified object at path to destPath on the local file system.
  277. func (s3 *S3Storage) ReadToLocalFile(path, destPath string) error {
  278. path = trimLeading(path)
  279. log.Debugf("S3Storage::ReadToLocalFile::%s(%s) -> %s", s3.protocol(), path, destPath)
  280. ctx := context.Background()
  281. sse, err := s3.getServerSideEncryption(ctx)
  282. if err != nil {
  283. return err
  284. }
  285. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  286. r, err := s3.client.GetObject(ctx, s3.name, path, *opts)
  287. if err != nil {
  288. if s3.isObjNotFound(err) {
  289. return DoesNotExistError
  290. }
  291. return err
  292. }
  293. defer r.Close()
  294. // Force a metadata call and surface "not found" errors early,
  295. // matching behavior in getRange().
  296. if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil {
  297. if s3.isObjNotFound(err) {
  298. return DoesNotExistError
  299. }
  300. return errors.Wrap(err, "StatObject from S3 failed")
  301. }
  302. dir := filepath.Dir(destPath)
  303. if err := os.MkdirAll(dir, os.ModePerm); err != nil {
  304. return errors.Wrap(err, "creating destination directory")
  305. }
  306. // Write to a temporary file in the same directory to avoid leaving a
  307. // partially-written file at destPath on error. Rename atomically on success.
  308. tmpFile, err := os.CreateTemp(dir, ".s3-read-*")
  309. if err != nil {
  310. return errors.Wrapf(err, "creating temporary file in %s", dir)
  311. }
  312. tmpPath := tmpFile.Name()
  313. // Ensure temporary file is cleaned up on error.
  314. success := false
  315. defer func() {
  316. if !success {
  317. _ = tmpFile.Close()
  318. _ = os.Remove(tmpPath)
  319. }
  320. }()
  321. // Use 1 MB buffer for streaming operations
  322. buf := make([]byte, 1024*1024)
  323. if _, err := io.CopyBuffer(tmpFile, r, buf); err != nil {
  324. return errors.Wrapf(err, "streaming %s to %s", path, destPath)
  325. }
  326. // Ensure data is flushed to disk before renaming.
  327. if err := tmpFile.Sync(); err != nil {
  328. return errors.Wrapf(err, "syncing temporary file for %s", destPath)
  329. }
  330. if err := tmpFile.Close(); err != nil {
  331. return errors.Wrapf(err, "closing temporary file for %s", destPath)
  332. }
  333. // Atomically move the fully written temp file into place.
  334. if err := os.Rename(tmpPath, destPath); err != nil {
  335. return errors.Wrapf(err, "renaming temporary file to %s", destPath)
  336. }
  337. success = true
  338. return nil
  339. }
  340. // Exists checks if the given object exists.
  341. func (s3 *S3Storage) Exists(name string) (bool, error) {
  342. name = trimLeading(name)
  343. log.Debugf("S3Storage::Exists::%s(%s)", s3.protocol(), name)
  344. ctx := context.Background()
  345. _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  346. if err != nil {
  347. if s3.isDoesNotExist(err) {
  348. return false, nil
  349. }
  350. return false, errors.Wrap(err, "stat s3 object")
  351. }
  352. return true, nil
  353. }
  354. // Upload the contents of the reader as an object into the bucket.
  355. func (s3 *S3Storage) Write(name string, data []byte) error {
  356. name = trimLeading(name)
  357. log.Debugf("S3Storage::Write::%s(%s)", s3.protocol(), name)
  358. ctx := context.Background()
  359. sse, err := s3.getServerSideEncryption(ctx)
  360. if err != nil {
  361. return err
  362. }
  363. var size int64 = int64(len(data))
  364. // Set partSize to 0 to write files in one go. This prevents chunking of
  365. // upload into multiple parts, which requires additional memory for buffering
  366. // the sub-parts. To remain consistent with other storage implementations,
  367. // we would rather attempt to lower cost fast upload and fast-fail.
  368. var partSize uint64 = 0
  369. r := bytes.NewReader(data)
  370. _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
  371. PartSize: partSize,
  372. ServerSideEncryption: sse,
  373. UserMetadata: s3.putUserMetadata,
  374. })
  375. if err != nil {
  376. return errors.Wrap(err, "upload s3 object")
  377. }
  378. return nil
  379. }
  380. // Attributes returns information about the specified object.
  381. func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
  382. name = trimLeading(name)
  383. log.Debugf("S3Storage::Stat::%s(%s)", s3.protocol(), name)
  384. ctx := context.Background()
  385. objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  386. if err != nil {
  387. if s3.isDoesNotExist(err) {
  388. return nil, DoesNotExistError
  389. }
  390. return nil, err
  391. }
  392. return &StorageInfo{
  393. Name: trimName(name),
  394. Size: objInfo.Size,
  395. ModTime: objInfo.LastModified,
  396. }, nil
  397. }
  398. // Delete removes the object with the given name.
  399. func (s3 *S3Storage) Remove(name string) error {
  400. name = trimLeading(name)
  401. log.Debugf("S3Storage::Remove::%s(%s)", s3.protocol(), name)
  402. ctx := context.Background()
  403. return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{})
  404. }
  405. func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
  406. path = trimLeading(path)
  407. log.Debugf("S3Storage::List::%s(%s)", s3.protocol(), path)
  408. ctx := context.Background()
  409. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  410. // object itself as one prefix item.
  411. if path != "" {
  412. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  413. }
  414. opts := minio.ListObjectsOptions{
  415. Prefix: path,
  416. Recursive: false,
  417. UseV1: s3.listObjectsV1,
  418. }
  419. var stats []*StorageInfo
  420. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  421. // Catch the error when failed to list objects.
  422. if object.Err != nil {
  423. return nil, object.Err
  424. }
  425. // The s3 client can also return the directory itself in the ListObjects call above.
  426. if object.Key == path {
  427. continue
  428. }
  429. name := trimName(object.Key)
  430. // This sometimes happens with empty buckets.
  431. if name == "" {
  432. continue
  433. }
  434. stats = append(stats, &StorageInfo{
  435. Name: name,
  436. Size: object.Size,
  437. ModTime: object.LastModified,
  438. })
  439. }
  440. return stats, nil
  441. }
  442. func (s3 *S3Storage) ListDirectories(path string) ([]*StorageInfo, error) {
  443. path = trimLeading(path)
  444. log.Debugf("S3Storage::ListDirectories::%s(%s)", s3.protocol(), path)
  445. ctx := context.Background()
  446. if path != "" {
  447. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  448. }
  449. opts := minio.ListObjectsOptions{
  450. Prefix: path,
  451. Recursive: false,
  452. UseV1: s3.listObjectsV1,
  453. }
  454. var stats []*StorageInfo
  455. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  456. if object.Err != nil {
  457. return nil, object.Err
  458. }
  459. if object.Key == "" {
  460. continue
  461. }
  462. if object.Key == path {
  463. continue
  464. }
  465. // If trim removes the entire name, it's a directory, ergo we list it
  466. if trimName(object.Key) == "" {
  467. stats = append(stats, &StorageInfo{
  468. Name: object.Key,
  469. Size: object.Size,
  470. ModTime: object.LastModified,
  471. })
  472. }
  473. }
  474. return stats, nil
  475. }
  476. // getServerSideEncryption returns the SSE to use.
  477. func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
  478. if value := ctx.Value(sseConfigKey); value != nil {
  479. if sse, ok := value.(encrypt.ServerSide); ok {
  480. return sse, nil
  481. }
  482. return nil, errors.New("invalid SSE config override provided in the context")
  483. }
  484. return s3.defaultSSE, nil
  485. }
  486. // isDoesNotExist returns true if error means that object key is not found.
  487. func (s3 *S3Storage) isDoesNotExist(err error) bool {
  488. return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey"
  489. }
  490. // isObjNotFound returns true if the error means that the object was not found
  491. func (s3 *S3Storage) isObjNotFound(err error) bool {
  492. return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject"
  493. }
  494. func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) {
  495. sse, err := s3.getServerSideEncryption(ctx)
  496. if err != nil {
  497. return nil, err
  498. }
  499. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  500. if err := setGetObjectRange(opts, off, length); err != nil {
  501. return nil, err
  502. }
  503. r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
  504. if err != nil {
  505. if s3.isObjNotFound(err) {
  506. return nil, DoesNotExistError
  507. }
  508. return nil, err
  509. }
  510. // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
  511. // for convenience.
  512. if _, err := r.Read(nil); err != nil {
  513. if s3.isObjNotFound(err) {
  514. _ = r.Close()
  515. return nil, DoesNotExistError
  516. }
  517. _ = r.Close()
  518. return nil, errors.Wrap(err, "Read from S3 failed")
  519. }
  520. defer r.Close()
  521. return io.ReadAll(r)
  522. }
  523. func setGetObjectRange(opts *minio.GetObjectOptions, off, length int64) error {
  524. if off < 0 {
  525. return errors.New("range offset must be >= 0")
  526. }
  527. if length < -1 || length == 0 {
  528. return errors.New("range length must be -1 or > 0")
  529. }
  530. if length > 0 {
  531. return opts.SetRange(off, off+length-1)
  532. }
  533. if off > 0 {
  534. return opts.SetRange(off, 0)
  535. }
  536. return nil
  537. }
  538. type s3ChunkReader struct {
  539. size int64
  540. chunkSize int64
  541. pos int64
  542. chunkOff int64
  543. chunk []byte
  544. closed bool
  545. fetch func(off, length int64) ([]byte, error)
  546. }
  547. func newS3ChunkReader(size, chunkSize int64, fetch func(off, length int64) ([]byte, error)) io.ReadCloser {
  548. if chunkSize <= 0 {
  549. chunkSize = defaultS3ReadChunkSize
  550. }
  551. return &s3ChunkReader{
  552. size: size,
  553. chunkSize: chunkSize,
  554. chunkOff: -1,
  555. fetch: fetch,
  556. }
  557. }
  558. func (r *s3ChunkReader) Read(p []byte) (int, error) {
  559. if r.closed {
  560. return 0, errors.New("s3 chunk reader is closed")
  561. }
  562. if len(p) == 0 {
  563. return 0, nil
  564. }
  565. if r.pos >= r.size {
  566. return 0, io.EOF
  567. }
  568. n := 0
  569. for n < len(p) && r.pos < r.size {
  570. if !r.hasChunkForPos(r.pos) {
  571. if err := r.loadChunk(); err != nil {
  572. if err == io.EOF && n > 0 {
  573. return n, nil
  574. }
  575. return n, err
  576. }
  577. }
  578. chunkIdx := int(r.pos - r.chunkOff)
  579. wrote := copy(p[n:], r.chunk[chunkIdx:])
  580. n += wrote
  581. r.pos += int64(wrote)
  582. }
  583. if r.pos >= r.size {
  584. return n, io.EOF
  585. }
  586. return n, nil
  587. }
  588. func (r *s3ChunkReader) Close() error {
  589. r.closed = true
  590. r.chunk = nil
  591. return nil
  592. }
  593. func (r *s3ChunkReader) hasChunkForPos(pos int64) bool {
  594. if len(r.chunk) == 0 || r.chunkOff < 0 {
  595. return false
  596. }
  597. return pos >= r.chunkOff && pos < r.chunkOff+int64(len(r.chunk))
  598. }
  599. func (r *s3ChunkReader) loadChunk() error {
  600. if r.pos >= r.size {
  601. return io.EOF
  602. }
  603. length := r.chunkSize
  604. if remaining := r.size - r.pos; remaining < length {
  605. length = remaining
  606. }
  607. chunk, err := r.fetch(r.pos, length)
  608. if err != nil {
  609. return err
  610. }
  611. if len(chunk) == 0 {
  612. return io.EOF
  613. }
  614. r.chunk = chunk
  615. r.chunkOff = r.pos
  616. return nil
  617. }
  618. // awsAuth retrieves credentials from the aws-sdk-go.
  619. type awsAuth struct {
  620. Region string
  621. creds aws.Credentials
  622. }
  623. // Retrieve retrieves the keys from the environment.
  624. func (a *awsAuth) Retrieve() (credentials.Value, error) {
  625. cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(a.Region))
  626. if err != nil {
  627. return credentials.Value{}, errors.Wrap(err, "load AWS SDK config")
  628. }
  629. creds, err := cfg.Credentials.Retrieve(context.TODO())
  630. if err != nil {
  631. return credentials.Value{}, errors.Wrap(err, "retrieve AWS SDK credentials")
  632. }
  633. a.creds = creds
  634. return credentials.Value{
  635. AccessKeyID: creds.AccessKeyID,
  636. SecretAccessKey: creds.SecretAccessKey,
  637. SessionToken: creds.SessionToken,
  638. SignerType: credentials.SignatureV4,
  639. }, nil
  640. }
  641. func (a *awsAuth) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) {
  642. return a.Retrieve()
  643. }
  644. // IsExpired returns if the credentials have been retrieved.
  645. func (a *awsAuth) IsExpired() bool {
  646. return a.creds.Expired()
  647. }
  648. type overrideSignerType struct {
  649. credentials.Provider
  650. signerType credentials.SignatureType
  651. }
  652. func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
  653. v, err := s.Provider.Retrieve()
  654. if err != nil {
  655. return v, err
  656. }
  657. if !v.SignerType.IsAnonymous() {
  658. v.SignerType = s.signerType
  659. }
  660. return v, nil
  661. }
  662. func (s *overrideSignerType) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) {
  663. v, err := s.Provider.RetrieveWithCredContext(ctx)
  664. if err != nil {
  665. return v, err
  666. }
  667. if !v.SignerType.IsAnonymous() {
  668. v.SignerType = s.signerType
  669. }
  670. return v, nil
  671. }