|
|
@@ -0,0 +1,510 @@
|
|
|
+// Fork from Thanos S3 Bucket support to reuse configuration options
|
|
|
+// Licensed under the Apache License 2.0
|
|
|
+// https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
|
|
|
+package storage
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "context"
|
|
|
+ "crypto/tls"
|
|
|
+ "io/ioutil"
|
|
|
+ "net"
|
|
|
+ "net/http"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/kubecost/cost-model/pkg/log"
|
|
|
+
|
|
|
+ "github.com/minio/minio-go/v7"
|
|
|
+ "github.com/minio/minio-go/v7/pkg/credentials"
|
|
|
+ "github.com/minio/minio-go/v7/pkg/encrypt"
|
|
|
+ "github.com/pkg/errors"
|
|
|
+
|
|
|
+ "gopkg.in/yaml.v2"
|
|
|
+)
|
|
|
+
|
|
|
+type ctxKey int
|
|
|
+
|
|
|
+const (
|
|
|
+ // DirDelim is the delimiter used to model a directory structure in an object store bucket.
|
|
|
+ DirDelim = "/"
|
|
|
+
|
|
|
+ // SSEKMS is the name of the SSE-KMS method for objectstore encryption.
|
|
|
+ SSEKMS = "SSE-KMS"
|
|
|
+
|
|
|
+ // SSEC is the name of the SSE-C method for objstore encryption.
|
|
|
+ SSEC = "SSE-C"
|
|
|
+
|
|
|
+ // SSES3 is the name of the SSE-S3 method for objstore encryption.
|
|
|
+ SSES3 = "SSE-S3"
|
|
|
+
|
|
|
+ // sseConfigKey is the context key to override SSE config. This feature is used by downstream
|
|
|
+ // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
|
|
|
+ // refactoring can introduce breaking changes as far as the functionality is preserved.
|
|
|
+ // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
|
|
|
+ // be available to wider set of backends we should probably add a variadic option to Get() and Upload().
|
|
|
+ sseConfigKey = ctxKey(0)
|
|
|
+)
|
|
|
+
|
|
|
+var DefaultConfig = S3Config{
|
|
|
+ PutUserMetadata: map[string]string{},
|
|
|
+ HTTPConfig: HTTPConfig{
|
|
|
+ IdleConnTimeout: time.Duration(90 * time.Second),
|
|
|
+ ResponseHeaderTimeout: time.Duration(2 * time.Minute),
|
|
|
+ TLSHandshakeTimeout: time.Duration(10 * time.Second),
|
|
|
+ ExpectContinueTimeout: time.Duration(1 * time.Second),
|
|
|
+ MaxIdleConns: 100,
|
|
|
+ MaxIdleConnsPerHost: 100,
|
|
|
+ MaxConnsPerHost: 0,
|
|
|
+ },
|
|
|
+ PartSize: 1024 * 1024 * 64, // 64Ms3.
|
|
|
+}
|
|
|
+
|
|
|
+// Config stores the configuration for s3 bucket.
|
|
|
+type S3Config struct {
|
|
|
+ Bucket string `yaml:"bucket"`
|
|
|
+ Endpoint string `yaml:"endpoint"`
|
|
|
+ Region string `yaml:"region"`
|
|
|
+ AccessKey string `yaml:"access_key"`
|
|
|
+ Insecure bool `yaml:"insecure"`
|
|
|
+ SignatureV2 bool `yaml:"signature_version2"`
|
|
|
+ SecretKey string `yaml:"secret_key"`
|
|
|
+ PutUserMetadata map[string]string `yaml:"put_user_metadata"`
|
|
|
+ HTTPConfig HTTPConfig `yaml:"http_config"`
|
|
|
+ TraceConfig TraceConfig `yaml:"trace"`
|
|
|
+ ListObjectsVersion string `yaml:"list_objects_version"`
|
|
|
+ // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
|
|
|
+ // NOTE we need to make sure this number does not produce more parts than 10 000.
|
|
|
+ PartSize uint64 `yaml:"part_size"`
|
|
|
+ SSEConfig SSEConfig `yaml:"sse_config"`
|
|
|
+}
|
|
|
+
|
|
|
+// SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
|
|
|
+// kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context
|
|
|
+type SSEConfig struct {
|
|
|
+ Type string `yaml:"type"`
|
|
|
+ KMSKeyID string `yaml:"kms_key_id"`
|
|
|
+ KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"`
|
|
|
+ EncryptionKey string `yaml:"encryption_key"`
|
|
|
+}
|
|
|
+
|
|
|
+type TraceConfig struct {
|
|
|
+ Enable bool `yaml:"enable"`
|
|
|
+}
|
|
|
+
|
|
|
+// HTTPConfig stores the http.Transport configuration for the s3 minio client.
|
|
|
+type HTTPConfig struct {
|
|
|
+ IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
|
|
|
+ ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
|
|
|
+ InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
|
|
|
+
|
|
|
+ TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`
|
|
|
+ ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
|
|
|
+ MaxIdleConns int `yaml:"max_idle_conns"`
|
|
|
+ MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
|
|
|
+ MaxConnsPerHost int `yaml:"max_conns_per_host"`
|
|
|
+
|
|
|
+ // Allow upstream callers to inject a round tripper
|
|
|
+ Transport http.RoundTripper `yaml:"-"`
|
|
|
+}
|
|
|
+
|
|
|
+// DefaultTransport - this default transport is based on the Minio
|
|
|
+// DefaultTransport up until the following commit:
|
|
|
+// https://githus3.com/minio/minio-go/commit/008c7aa71fc17e11bf980c209a4f8c4d687fc884
|
|
|
+// The values have since diverged.
|
|
|
+func DefaultTransport(config S3Config) *http.Transport {
|
|
|
+ return &http.Transport{
|
|
|
+ Proxy: http.ProxyFromEnvironment,
|
|
|
+ DialContext: (&net.Dialer{
|
|
|
+ Timeout: 30 * time.Second,
|
|
|
+ KeepAlive: 30 * time.Second,
|
|
|
+ DualStack: true,
|
|
|
+ }).DialContext,
|
|
|
+
|
|
|
+ MaxIdleConns: config.HTTPConfig.MaxIdleConns,
|
|
|
+ MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
|
|
|
+ IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
|
|
|
+ MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
|
|
|
+ TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
|
|
|
+ ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
|
|
|
+ // A custom ResponseHeaderTimeout was introduced
|
|
|
+ // to cover cases where the tcp connection works but
|
|
|
+ // the server never answers. Defaults to 2 minutes.
|
|
|
+ ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
|
|
|
+ // Set this value so that the underlying transport round-tripper
|
|
|
+ // doesn't try to auto decode the body of objects with
|
|
|
+ // content-encoding set to `gzip`.
|
|
|
+ //
|
|
|
+ // Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
|
|
|
+ DisableCompression: true,
|
|
|
+ // #nosec It's up to the user to decide on TLS configs
|
|
|
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify},
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// S3Storage provides storage via S3
|
|
|
+type S3Storage struct {
|
|
|
+ name string
|
|
|
+ client *minio.Client
|
|
|
+ defaultSSE encrypt.ServerSide
|
|
|
+ putUserMetadata map[string]string
|
|
|
+ partSize uint64
|
|
|
+ listObjectsV1 bool
|
|
|
+}
|
|
|
+
|
|
|
+// parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
|
|
|
+func parseConfig(conf []byte) (S3Config, error) {
|
|
|
+ config := DefaultConfig
|
|
|
+ if err := yaml.UnmarshalStrict(conf, &config); err != nil {
|
|
|
+ return S3Config{}, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return config, nil
|
|
|
+}
|
|
|
+
|
|
|
+// NewBucket returns a new Bucket using the provided s3 config values.
|
|
|
+func NewS3Storage(conf []byte) (*S3Storage, error) {
|
|
|
+ log.Infof("Creating new S3 Storage...")
|
|
|
+
|
|
|
+ config, err := parseConfig(conf)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return NewS3StorageWith(config)
|
|
|
+}
|
|
|
+
|
|
|
+// NewBucketWithConfig returns a new Bucket using the provided s3 config values.
|
|
|
+func NewS3StorageWith(config S3Config) (*S3Storage, error) {
|
|
|
+ var chain []credentials.Provider
|
|
|
+
|
|
|
+ log.Infof("New S3 Storage With Config: %+v", config)
|
|
|
+
|
|
|
+ wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p }
|
|
|
+ if config.SignatureV2 {
|
|
|
+ wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider {
|
|
|
+ return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2}
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := validate(config); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if config.AccessKey != "" {
|
|
|
+ chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
|
|
|
+ Value: credentials.Value{
|
|
|
+ AccessKeyID: config.AccessKey,
|
|
|
+ SecretAccessKey: config.SecretKey,
|
|
|
+ SignerType: credentials.SignatureV4,
|
|
|
+ },
|
|
|
+ })}
|
|
|
+ } else {
|
|
|
+ chain = []credentials.Provider{
|
|
|
+ wrapCredentialsProvider(&credentials.EnvAWS{}),
|
|
|
+ wrapCredentialsProvider(&credentials.FileAWSCredentials{}),
|
|
|
+ wrapCredentialsProvider(&credentials.IAM{
|
|
|
+ Client: &http.Client{
|
|
|
+ Transport: http.DefaultTransport,
|
|
|
+ },
|
|
|
+ }),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if a roundtripper has been set in the config
|
|
|
+ // otherwise build the default transport.
|
|
|
+ var rt http.RoundTripper
|
|
|
+ if config.HTTPConfig.Transport != nil {
|
|
|
+ rt = config.HTTPConfig.Transport
|
|
|
+ } else {
|
|
|
+ rt = DefaultTransport(config)
|
|
|
+ }
|
|
|
+
|
|
|
+ client, err := minio.New(config.Endpoint, &minio.Options{
|
|
|
+ Creds: credentials.NewChainCredentials(chain),
|
|
|
+ Secure: !config.Insecure,
|
|
|
+ Region: config.Region,
|
|
|
+ Transport: rt,
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "initialize s3 client")
|
|
|
+ }
|
|
|
+
|
|
|
+ var sse encrypt.ServerSide
|
|
|
+ if config.SSEConfig.Type != "" {
|
|
|
+ switch config.SSEConfig.Type {
|
|
|
+ case SSEKMS:
|
|
|
+ sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
|
|
|
+ }
|
|
|
+
|
|
|
+ case SSEC:
|
|
|
+ key, err := ioutil.ReadFile(config.SSEConfig.EncryptionKey)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ sse, err = encrypt.NewSSEC(key)
|
|
|
+ if err != nil {
|
|
|
+ return nil, errors.Wrap(err, "initialize s3 client SSE-C")
|
|
|
+ }
|
|
|
+
|
|
|
+ case SSES3:
|
|
|
+ sse = encrypt.NewSSE()
|
|
|
+
|
|
|
+ default:
|
|
|
+ sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
|
|
|
+ return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" {
|
|
|
+ return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion)
|
|
|
+ }
|
|
|
+
|
|
|
+ bkt := &S3Storage{
|
|
|
+ name: config.Bucket,
|
|
|
+ client: client,
|
|
|
+ defaultSSE: sse,
|
|
|
+ putUserMetadata: config.PutUserMetadata,
|
|
|
+ partSize: config.PartSize,
|
|
|
+ listObjectsV1: config.ListObjectsVersion == "v1",
|
|
|
+ }
|
|
|
+ return bkt, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Name returns the bucket name for s3.
|
|
|
+func (s3 *S3Storage) Name() string {
|
|
|
+ return s3.name
|
|
|
+}
|
|
|
+
|
|
|
+// validate checks to see the config options are set.
|
|
|
+func validate(conf S3Config) error {
|
|
|
+ if conf.Endpoint == "" {
|
|
|
+ return errors.New("no s3 endpoint in config file")
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.AccessKey == "" && conf.SecretKey != "" {
|
|
|
+ return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.AccessKey != "" && conf.SecretKey == "" {
|
|
|
+ return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" {
|
|
|
+ return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'")
|
|
|
+ }
|
|
|
+
|
|
|
+ if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" {
|
|
|
+ return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// Get returns a reader for the given object name.
|
|
|
+func (s3 *S3Storage) Read(name string) ([]byte, error) {
|
|
|
+ log.Infof("S3Storage::Read(%s)", name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ return s3.getRange(ctx, name, 0, -1)
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// Exists checks if the given object exists.
|
|
|
+func (s3 *S3Storage) Exists(name string) (bool, error) {
|
|
|
+ log.Infof("S3Storage::Exists(%s)", name)
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
|
|
|
+ if err != nil {
|
|
|
+ if s3.isDoesNotExist(err) {
|
|
|
+ return false, nil
|
|
|
+ }
|
|
|
+ return false, errors.Wrap(err, "stat s3 object")
|
|
|
+ }
|
|
|
+
|
|
|
+ return true, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Upload the contents of the reader as an object into the bucket.
|
|
|
+func (s3 *S3Storage) Write(name string, data []byte) error {
|
|
|
+ log.Infof("S3Storage::Write(%s)", name)
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ sse, err := s3.getServerSideEncryption(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ var size int64 = int64(len(data))
|
|
|
+ var partSize uint64 = 0
|
|
|
+
|
|
|
+ r := bytes.NewReader(data)
|
|
|
+ _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
|
|
|
+ PartSize: partSize,
|
|
|
+ ServerSideEncryption: sse,
|
|
|
+ UserMetadata: s3.putUserMetadata,
|
|
|
+ })
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return errors.Wrap(err, "upload s3 object")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// Attributes returns information about the specified object.
|
|
|
+func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
|
|
|
+ log.Infof("S3Storage::Stat(%s)", name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return &StorageInfo{
|
|
|
+ Name: s3.trimName(name),
|
|
|
+ Size: objInfo.Size,
|
|
|
+ ModTime: objInfo.LastModified,
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+// Delete removes the object with the given name.
|
|
|
+func (s3 *S3Storage) Remove(name string) error {
|
|
|
+ log.Infof("S3Storage::Remove(%s)", name)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{})
|
|
|
+}
|
|
|
+
|
|
|
+func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
|
|
|
+ log.Infof("S3Storage::List(%s)", path)
|
|
|
+ ctx := context.Background()
|
|
|
+
|
|
|
+ // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
|
|
|
+ // object itself as one prefix item.
|
|
|
+ if path != "" {
|
|
|
+ path = strings.TrimSuffix(path, DirDelim) + DirDelim
|
|
|
+ }
|
|
|
+
|
|
|
+ opts := minio.ListObjectsOptions{
|
|
|
+ Prefix: path,
|
|
|
+ Recursive: false,
|
|
|
+ UseV1: s3.listObjectsV1,
|
|
|
+ }
|
|
|
+
|
|
|
+ var stats []*StorageInfo
|
|
|
+ for object := range s3.client.ListObjects(ctx, s3.name, opts) {
|
|
|
+ // Catch the error when failed to list objects.
|
|
|
+ if object.Err != nil {
|
|
|
+ return nil, object.Err
|
|
|
+ }
|
|
|
+ // This sometimes happens with empty buckets.
|
|
|
+ if object.Key == "" {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // The s3 client can also return the directory itself in the ListObjects call above.
|
|
|
+ if object.Key == path {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ stats = append(stats, &StorageInfo{
|
|
|
+ Name: s3.trimName(object.Key),
|
|
|
+ Size: object.Size,
|
|
|
+ ModTime: object.LastModified,
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ return stats, nil
|
|
|
+}
|
|
|
+
|
|
|
+// trimName removes the leading directory prefix
|
|
|
+func (s3 *S3Storage) trimName(file string) string {
|
|
|
+ slashIndex := strings.LastIndex(file, "/")
|
|
|
+ if slashIndex < 0 {
|
|
|
+ return file
|
|
|
+ }
|
|
|
+
|
|
|
+ name := file[slashIndex+1:]
|
|
|
+ return name
|
|
|
+}
|
|
|
+
|
|
|
+// getServerSideEncryption returns the SSE to use.
|
|
|
+func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
|
|
|
+ if value := ctx.Value(sseConfigKey); value != nil {
|
|
|
+ if sse, ok := value.(encrypt.ServerSide); ok {
|
|
|
+ return sse, nil
|
|
|
+ }
|
|
|
+ return nil, errors.New("invalid SSE config override provided in the context")
|
|
|
+ }
|
|
|
+
|
|
|
+ return s3.defaultSSE, nil
|
|
|
+}
|
|
|
+
|
|
|
+// isDoesNotExist returns true if error means that object key is not found.
|
|
|
+func (s3 *S3Storage) isDoesNotExist(err error) bool {
|
|
|
+ return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey"
|
|
|
+}
|
|
|
+
|
|
|
+// isObjNotFound returns true if the error means that the object was not found
|
|
|
+func (s3 *S3Storage) isObjNotFound(err error) bool {
|
|
|
+ return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject"
|
|
|
+}
|
|
|
+
|
|
|
+func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) {
|
|
|
+ sse, err := s3.getServerSideEncryption(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
|
|
|
+ if length != -1 {
|
|
|
+ if err := opts.SetRange(off, off+length-1); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ } else if off > 0 {
|
|
|
+ if err := opts.SetRange(off, 0); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
|
|
|
+ if err != nil {
|
|
|
+ if s3.isObjNotFound(err) {
|
|
|
+ return nil, DoesNotExistError
|
|
|
+ }
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
|
|
|
+ // for convenience.
|
|
|
+ if _, err := r.Read(nil); err != nil {
|
|
|
+ r.Close()
|
|
|
+ if s3.isObjNotFound(err) {
|
|
|
+ return nil, DoesNotExistError
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil, errors.Wrap(err, "Read from S3 failed")
|
|
|
+ }
|
|
|
+
|
|
|
+ return ioutil.ReadAll(r)
|
|
|
+}
|
|
|
+
|
|
|
+type overrideSignerType struct {
|
|
|
+ credentials.Provider
|
|
|
+ signerType credentials.SignatureType
|
|
|
+}
|
|
|
+
|
|
|
+func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
|
|
|
+ v, err := s.Provider.Retrieve()
|
|
|
+ if err != nil {
|
|
|
+ return v, err
|
|
|
+ }
|
|
|
+ if !v.SignerType.IsAnonymous() {
|
|
|
+ v.SignerType = s.signerType
|
|
|
+ }
|
|
|
+ return v, nil
|
|
|
+}
|