package storage // Fork from Thanos S3 Bucket support to reuse configuration options // Licensed under the Apache License 2.0 // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go import ( "bytes" "context" "io" "net/http" "os" "path/filepath" "strings" "time" "github.com/opencost/opencost/core/pkg/log" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/encrypt" "github.com/pkg/errors" "gopkg.in/yaml.v2" ) type ctxKey int const ( // SSEKMS is the name of the SSE-KMS method for objectstore encryption. SSEKMS = "SSE-KMS" // SSEC is the name of the SSE-C method for objstore encryption. SSEC = "SSE-C" // SSES3 is the name of the SSE-S3 method for objstore encryption. SSES3 = "SSE-S3" // sseConfigKey is the context key to override SSE config. This feature is used by downstream // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or // refactoring can introduce breaking changes as far as the functionality is preserved. // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will // be available to wider set of backends we should probably add a variadic option to Get() and Upload(). sseConfigKey = ctxKey(0) ) var defaultS3Config = S3Config{ PutUserMetadata: map[string]string{}, HTTPConfig: HTTPConfig{ IdleConnTimeout: 90 * time.Second, ResponseHeaderTimeout: 2 * time.Minute, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, MaxIdleConns: 100, MaxIdleConnsPerHost: 100, MaxConnsPerHost: 0, DisableCompression: true, }, PartSize: 1024 * 1024 * 64, // 64MB. } // Config stores the configuration for s3 bucket. type S3Config struct { Bucket string `yaml:"bucket"` Endpoint string `yaml:"endpoint"` Region string `yaml:"region"` AWSSDKAuth bool `yaml:"aws_sdk_auth"` AccessKey string `yaml:"access_key"` Insecure bool `yaml:"insecure"` SignatureV2 bool `yaml:"signature_version2"` SecretKey string `yaml:"secret_key"` PutUserMetadata map[string]string `yaml:"put_user_metadata"` HTTPConfig HTTPConfig `yaml:"http_config"` TraceConfig TraceConfig `yaml:"trace"` ListObjectsVersion string `yaml:"list_objects_version"` // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize. // NOTE we need to make sure this number does not produce more parts than 10 000. PartSize uint64 `yaml:"part_size"` SSEConfig SSEConfig `yaml:"sse_config"` STSEndpoint string `yaml:"sts_endpoint"` } // SSEConfig deals with the configuration of SSE for Minio. The following options are valid: // kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context type SSEConfig struct { Type string `yaml:"type"` KMSKeyID string `yaml:"kms_key_id"` KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"` EncryptionKey string `yaml:"encryption_key"` } type TraceConfig struct { Enable bool `yaml:"enable"` } // S3Storage provides storage via S3 type S3Storage struct { name string client *minio.Client defaultSSE encrypt.ServerSide putUserMetadata map[string]string partSize uint64 listObjectsV1 bool insecure bool } // parseConfig unmarshals a buffer into a Config with default HTTPConfig values. func parseS3Config(conf []byte) (S3Config, error) { config := defaultS3Config if err := yaml.Unmarshal(conf, &config); err != nil { return S3Config{}, err } return config, nil } // NewBucket returns a new Bucket using the provided s3 config values. func NewS3Storage(conf []byte) (*S3Storage, error) { config, err := parseS3Config(conf) if err != nil { return nil, err } return NewS3StorageWith(config) } // NewBucketWithConfig returns a new Bucket using the provided s3 config values. func NewS3StorageWith(config S3Config) (*S3Storage, error) { var chain []credentials.Provider wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p } if config.SignatureV2 { wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider { return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2} } } if err := validate(config); err != nil { return nil, err } if config.AWSSDKAuth { chain = []credentials.Provider{ wrapCredentialsProvider(&awsAuth{Region: config.Region}), } } else if config.AccessKey != "" { chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{ Value: credentials.Value{ AccessKeyID: config.AccessKey, SecretAccessKey: config.SecretKey, SignerType: credentials.SignatureV4, }, })} } else { chain = []credentials.Provider{ wrapCredentialsProvider(&credentials.EnvAWS{}), wrapCredentialsProvider(&credentials.FileAWSCredentials{}), wrapCredentialsProvider(&credentials.IAM{ Client: &http.Client{ Transport: http.DefaultTransport, }, Endpoint: config.STSEndpoint, }), } } rt, err := config.HTTPConfig.GetHTTPTransport() if err != nil { return nil, err } // HTTPS Protocol Configuration: The 'Secure' option controls whether HTTPS is used. // By default, config.Insecure is false if not set, so Secure=true. client, err := minio.New(config.Endpoint, &minio.Options{ Creds: credentials.NewChainCredentials(chain), Secure: !config.Insecure, Region: config.Region, Transport: rt, }) if err != nil { return nil, errors.Wrap(err, "initialize s3 client") } var sse encrypt.ServerSide if config.SSEConfig.Type != "" { switch config.SSEConfig.Type { case SSEKMS: // If the KMSEncryptionContext is a nil map the header that is // constructed by the encrypt.ServerSide object will be base64 // encoded "nil" which is not accepted by AWS. if config.SSEConfig.KMSEncryptionContext == nil { config.SSEConfig.KMSEncryptionContext = make(map[string]string) } sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext) if err != nil { return nil, errors.Wrap(err, "initialize s3 client SSE-KMS") } case SSEC: key, err := os.ReadFile(config.SSEConfig.EncryptionKey) if err != nil { return nil, err } sse, err = encrypt.NewSSEC(key) if err != nil { return nil, errors.Wrap(err, "initialize s3 client SSE-C") } case SSES3: sse = encrypt.NewSSE() default: sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type) return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config") } } if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" { return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion) } bkt := &S3Storage{ name: config.Bucket, client: client, defaultSSE: sse, putUserMetadata: config.PutUserMetadata, partSize: config.PartSize, listObjectsV1: config.ListObjectsVersion == "v1", insecure: config.Insecure, } log.Debugf("S3Storage: New S3 client initialized with '%s://%s/%s'", bkt.protocol(), config.Endpoint, config.Bucket) return bkt, nil } // String returns the bucket name for s3. func (s3 *S3Storage) String() string { return s3.name } // StorageType returns a string identifier for the type of storage used by the implementation. func (s3 *S3Storage) StorageType() StorageType { return StorageTypeBucketS3 } // protocol returns the protocol string (HTTP or HTTPS) based on configuration func (s3 *S3Storage) protocol() string { if s3.insecure { return "HTTP" } return "HTTPS" } // validate checks to see the config options are set. func validate(conf S3Config) error { if conf.Endpoint == "" { return errors.New("no s3 endpoint in config file") } if conf.AWSSDKAuth && conf.AccessKey != "" { return errors.New("aws_sdk_auth and access_key are mutually exclusive configurations") } if conf.AccessKey == "" && conf.SecretKey != "" { return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.") } if conf.AccessKey != "" && conf.SecretKey == "" { return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.") } if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" { return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'") } if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" { return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'") } return nil } // FullPath returns the storage working path combined with the path provided func (s3 *S3Storage) FullPath(name string) string { name = trimLeading(name) return name } // Get returns a reader for the given object name. func (s3 *S3Storage) Read(name string) ([]byte, error) { name = trimLeading(name) log.Debugf("S3Storage::Read::%s(%s)", s3.protocol(), name) ctx := context.Background() return s3.getRange(ctx, name, 0, -1) } // ReadStream returns an io.ReadCloser that streams an object from S3. func (s3 *S3Storage) ReadStream(path string) (io.ReadCloser, error) { path = trimLeading(path) log.Debugf("S3Storage::ReadStream::%s(%s)", s3.protocol(), path) ctx := context.Background() sse, err := s3.getServerSideEncryption(ctx) if err != nil { return nil, err } opts := &minio.GetObjectOptions{ServerSideEncryption: sse} r, err := s3.client.GetObject(ctx, s3.name, path, *opts) if err != nil { if s3.isObjNotFound(err) { return nil, DoesNotExistError } return nil, err } // Force a metadata call and surface "not found" errors early, // matching behavior in getRange(). if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil { if s3.isObjNotFound(err) || s3.isDoesNotExist(err) { _ = r.Close() return nil, DoesNotExistError } _ = r.Close() return nil, errors.Wrap(err, "StatObject from S3 failed") } return r, nil } // ReadToLocalFile streams the specified object at path to destPath on the local file system. func (s3 *S3Storage) ReadToLocalFile(path, destPath string) error { path = trimLeading(path) log.Debugf("S3Storage::ReadToLocalFile::%s(%s) -> %s", s3.protocol(), path, destPath) ctx := context.Background() sse, err := s3.getServerSideEncryption(ctx) if err != nil { return err } opts := &minio.GetObjectOptions{ServerSideEncryption: sse} r, err := s3.client.GetObject(ctx, s3.name, path, *opts) if err != nil { if s3.isObjNotFound(err) { return DoesNotExistError } return err } defer r.Close() // Force a metadata call and surface "not found" errors early, // matching behavior in getRange(). if _, err := s3.client.StatObject(ctx, s3.name, path, minio.StatObjectOptions{ServerSideEncryption: sse}); err != nil { if s3.isObjNotFound(err) { return DoesNotExistError } return errors.Wrap(err, "StatObject from S3 failed") } dir := filepath.Dir(destPath) if err := os.MkdirAll(dir, os.ModePerm); err != nil { return errors.Wrap(err, "creating destination directory") } // Write to a temporary file in the same directory to avoid leaving a // partially-written file at destPath on error. Rename atomically on success. tmpFile, err := os.CreateTemp(dir, ".s3-read-*") if err != nil { return errors.Wrapf(err, "creating temporary file in %s", dir) } tmpPath := tmpFile.Name() // Ensure temporary file is cleaned up on error. success := false defer func() { if !success { _ = tmpFile.Close() _ = os.Remove(tmpPath) } }() // Use 1 MB buffer for streaming operations buf := make([]byte, 1024*1024) if _, err := io.CopyBuffer(tmpFile, r, buf); err != nil { return errors.Wrapf(err, "streaming %s to %s", path, destPath) } // Ensure data is flushed to disk before renaming. if err := tmpFile.Sync(); err != nil { return errors.Wrapf(err, "syncing temporary file for %s", destPath) } if err := tmpFile.Close(); err != nil { return errors.Wrapf(err, "closing temporary file for %s", destPath) } // Atomically move the fully written temp file into place. if err := os.Rename(tmpPath, destPath); err != nil { return errors.Wrapf(err, "renaming temporary file to %s", destPath) } success = true return nil } // Exists checks if the given object exists. func (s3 *S3Storage) Exists(name string) (bool, error) { name = trimLeading(name) log.Debugf("S3Storage::Exists::%s(%s)", s3.protocol(), name) ctx := context.Background() _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{}) if err != nil { if s3.isDoesNotExist(err) { return false, nil } return false, errors.Wrap(err, "stat s3 object") } return true, nil } // Upload the contents of the reader as an object into the bucket. func (s3 *S3Storage) Write(name string, data []byte) error { name = trimLeading(name) log.Debugf("S3Storage::Write::%s(%s)", s3.protocol(), name) ctx := context.Background() sse, err := s3.getServerSideEncryption(ctx) if err != nil { return err } var size int64 = int64(len(data)) // Set partSize to 0 to write files in one go. This prevents chunking of // upload into multiple parts, which requires additional memory for buffering // the sub-parts. To remain consistent with other storage implementations, // we would rather attempt to lower cost fast upload and fast-fail. var partSize uint64 = 0 r := bytes.NewReader(data) _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{ PartSize: partSize, ServerSideEncryption: sse, UserMetadata: s3.putUserMetadata, }) if err != nil { return errors.Wrap(err, "upload s3 object") } return nil } // Attributes returns information about the specified object. func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) { name = trimLeading(name) log.Debugf("S3Storage::Stat::%s(%s)", s3.protocol(), name) ctx := context.Background() objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{}) if err != nil { if s3.isDoesNotExist(err) { return nil, DoesNotExistError } return nil, err } return &StorageInfo{ Name: trimName(name), Size: objInfo.Size, ModTime: objInfo.LastModified, }, nil } // Delete removes the object with the given name. func (s3 *S3Storage) Remove(name string) error { name = trimLeading(name) log.Debugf("S3Storage::Remove::%s(%s)", s3.protocol(), name) ctx := context.Background() return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{}) } func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) { path = trimLeading(path) log.Debugf("S3Storage::List::%s(%s)", s3.protocol(), path) ctx := context.Background() // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if path != "" { path = strings.TrimSuffix(path, DirDelim) + DirDelim } opts := minio.ListObjectsOptions{ Prefix: path, Recursive: false, UseV1: s3.listObjectsV1, } var stats []*StorageInfo for object := range s3.client.ListObjects(ctx, s3.name, opts) { // Catch the error when failed to list objects. if object.Err != nil { return nil, object.Err } // The s3 client can also return the directory itself in the ListObjects call above. if object.Key == path { continue } name := trimName(object.Key) // This sometimes happens with empty buckets. if name == "" { continue } stats = append(stats, &StorageInfo{ Name: name, Size: object.Size, ModTime: object.LastModified, }) } return stats, nil } func (s3 *S3Storage) ListDirectories(path string) ([]*StorageInfo, error) { path = trimLeading(path) log.Debugf("S3Storage::ListDirectories::%s(%s)", s3.protocol(), path) ctx := context.Background() if path != "" { path = strings.TrimSuffix(path, DirDelim) + DirDelim } opts := minio.ListObjectsOptions{ Prefix: path, Recursive: false, UseV1: s3.listObjectsV1, } var stats []*StorageInfo for object := range s3.client.ListObjects(ctx, s3.name, opts) { if object.Err != nil { return nil, object.Err } if object.Key == "" { continue } if object.Key == path { continue } // If trim removes the entire name, it's a directory, ergo we list it if trimName(object.Key) == "" { stats = append(stats, &StorageInfo{ Name: object.Key, Size: object.Size, ModTime: object.LastModified, }) } } return stats, nil } // getServerSideEncryption returns the SSE to use. func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) { if value := ctx.Value(sseConfigKey); value != nil { if sse, ok := value.(encrypt.ServerSide); ok { return sse, nil } return nil, errors.New("invalid SSE config override provided in the context") } return s3.defaultSSE, nil } // isDoesNotExist returns true if error means that object key is not found. func (s3 *S3Storage) isDoesNotExist(err error) bool { return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey" } // isObjNotFound returns true if the error means that the object was not found func (s3 *S3Storage) isObjNotFound(err error) bool { return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject" } func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) { sse, err := s3.getServerSideEncryption(ctx) if err != nil { return nil, err } opts := &minio.GetObjectOptions{ServerSideEncryption: sse} if err := setGetObjectRange(opts, off, length); err != nil { return nil, err } r, err := s3.client.GetObject(ctx, s3.name, name, *opts) if err != nil { if s3.isObjNotFound(err) { return nil, DoesNotExistError } return nil, err } // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here // for convenience. if _, err := r.Read(nil); err != nil { if s3.isObjNotFound(err) { _ = r.Close() return nil, DoesNotExistError } _ = r.Close() return nil, errors.Wrap(err, "Read from S3 failed") } defer r.Close() return io.ReadAll(r) } func setGetObjectRange(opts *minio.GetObjectOptions, off, length int64) error { if off < 0 { return errors.New("range offset must be >= 0") } if length < -1 || length == 0 { return errors.New("range length must be -1 or > 0") } if length > 0 { return opts.SetRange(off, off+length-1) } if off > 0 { return opts.SetRange(off, 0) } return nil } // awsAuth retrieves credentials from the aws-sdk-go. type awsAuth struct { Region string creds aws.Credentials } // Retrieve retrieves the keys from the environment. func (a *awsAuth) Retrieve() (credentials.Value, error) { cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(a.Region)) if err != nil { return credentials.Value{}, errors.Wrap(err, "load AWS SDK config") } creds, err := cfg.Credentials.Retrieve(context.TODO()) if err != nil { return credentials.Value{}, errors.Wrap(err, "retrieve AWS SDK credentials") } a.creds = creds return credentials.Value{ AccessKeyID: creds.AccessKeyID, SecretAccessKey: creds.SecretAccessKey, SessionToken: creds.SessionToken, SignerType: credentials.SignatureV4, }, nil } func (a *awsAuth) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) { return a.Retrieve() } // IsExpired returns if the credentials have been retrieved. func (a *awsAuth) IsExpired() bool { return a.creds.Expired() } type overrideSignerType struct { credentials.Provider signerType credentials.SignatureType } func (s *overrideSignerType) Retrieve() (credentials.Value, error) { v, err := s.Provider.Retrieve() if err != nil { return v, err } if !v.SignerType.IsAnonymous() { v.SignerType = s.signerType } return v, nil } func (s *overrideSignerType) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) { v, err := s.Provider.RetrieveWithCredContext(ctx) if err != nil { return v, err } if !v.SignerType.IsAnonymous() { v.SignerType = s.signerType } return v, nil }