| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590 |
- package storage
- // Fork from Thanos S3 Bucket support to reuse configuration options
- // Licensed under the Apache License 2.0
- // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
- import (
- "bytes"
- "context"
- "io/ioutil"
- "net"
- "net/http"
- "strings"
- "time"
- "github.com/opencost/opencost/pkg/log"
- aws "github.com/aws/aws-sdk-go-v2/aws"
- awsconfig "github.com/aws/aws-sdk-go-v2/config"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "github.com/minio/minio-go/v7/pkg/encrypt"
- "github.com/pkg/errors"
- "gopkg.in/yaml.v2"
- )
- type ctxKey int
- const (
- // SSEKMS is the name of the SSE-KMS method for objectstore encryption.
- SSEKMS = "SSE-KMS"
- // SSEC is the name of the SSE-C method for objstore encryption.
- SSEC = "SSE-C"
- // SSES3 is the name of the SSE-S3 method for objstore encryption.
- SSES3 = "SSE-S3"
- // sseConfigKey is the context key to override SSE config. This feature is used by downstream
- // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
- // refactoring can introduce breaking changes as far as the functionality is preserved.
- // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
- // be available to wider set of backends we should probably add a variadic option to Get() and Upload().
- sseConfigKey = ctxKey(0)
- )
- var defaultS3Config = S3Config{
- PutUserMetadata: map[string]string{},
- HTTPConfig: S3HTTPConfig{
- IdleConnTimeout: time.Duration(90 * time.Second),
- ResponseHeaderTimeout: time.Duration(2 * time.Minute),
- TLSHandshakeTimeout: time.Duration(10 * time.Second),
- ExpectContinueTimeout: time.Duration(1 * time.Second),
- MaxIdleConns: 100,
- MaxIdleConnsPerHost: 100,
- MaxConnsPerHost: 0,
- },
- PartSize: 1024 * 1024 * 64, // 64MB.
- }
- // Config stores the configuration for s3 bucket.
- type S3Config struct {
- Bucket string `yaml:"bucket"`
- Endpoint string `yaml:"endpoint"`
- Region string `yaml:"region"`
- AWSSDKAuth bool `yaml:"aws_sdk_auth"`
- AccessKey string `yaml:"access_key"`
- Insecure bool `yaml:"insecure"`
- SignatureV2 bool `yaml:"signature_version2"`
- SecretKey string `yaml:"secret_key"`
- PutUserMetadata map[string]string `yaml:"put_user_metadata"`
- HTTPConfig S3HTTPConfig `yaml:"http_config"`
- TraceConfig TraceConfig `yaml:"trace"`
- ListObjectsVersion string `yaml:"list_objects_version"`
- // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
- // NOTE we need to make sure this number does not produce more parts than 10 000.
- PartSize uint64 `yaml:"part_size"`
- SSEConfig SSEConfig `yaml:"sse_config"`
- STSEndpoint string `yaml:"sts_endpoint"`
- }
- // SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
- // kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context
- type SSEConfig struct {
- Type string `yaml:"type"`
- KMSKeyID string `yaml:"kms_key_id"`
- KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"`
- EncryptionKey string `yaml:"encryption_key"`
- }
- type TraceConfig struct {
- Enable bool `yaml:"enable"`
- }
- // HTTPConfig stores the http.Transport configuration for the s3 minio client.
- type S3HTTPConfig struct {
- IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
- ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
- InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
- TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`
- ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
- MaxIdleConns int `yaml:"max_idle_conns"`
- MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
- MaxConnsPerHost int `yaml:"max_conns_per_host"`
- // Allow upstream callers to inject a round tripper
- Transport http.RoundTripper `yaml:"-"`
- TLSConfig TLSConfig `yaml:"tls_config"`
- }
- // DefaultTransport - this default transport is based on the Minio
- // DefaultTransport up until the following commit:
- // https://githus3.com/minio/minio-go/commit/008c7aa71fc17e11bf980c209a4f8c4d687fc884
- // The values have since diverged.
- func DefaultS3Transport(config S3Config) (*http.Transport, error) {
- tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
- if err != nil {
- return nil, err
- }
- if config.HTTPConfig.InsecureSkipVerify {
- tlsConfig.InsecureSkipVerify = true
- }
- return &http.Transport{
- Proxy: http.ProxyFromEnvironment,
- DialContext: (&net.Dialer{
- Timeout: 30 * time.Second,
- KeepAlive: 30 * time.Second,
- DualStack: true,
- }).DialContext,
- MaxIdleConns: config.HTTPConfig.MaxIdleConns,
- MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
- IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
- MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
- TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
- ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
- // A custom ResponseHeaderTimeout was introduced
- // to cover cases where the tcp connection works but
- // the server never answers. Defaults to 2 minutes.
- ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
- // Set this value so that the underlying transport round-tripper
- // doesn't try to auto decode the body of objects with
- // content-encoding set to `gzip`.
- //
- // Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
- DisableCompression: true,
- // #nosec It's up to the user to decide on TLS configs
- TLSClientConfig: tlsConfig,
- }, nil
- }
- // S3Storage provides storage via S3
- type S3Storage struct {
- name string
- client *minio.Client
- defaultSSE encrypt.ServerSide
- putUserMetadata map[string]string
- partSize uint64
- listObjectsV1 bool
- }
- // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
- func parseS3Config(conf []byte) (S3Config, error) {
- config := defaultS3Config
- if err := yaml.UnmarshalStrict(conf, &config); err != nil {
- return S3Config{}, err
- }
- return config, nil
- }
- // NewBucket returns a new Bucket using the provided s3 config values.
- func NewS3Storage(conf []byte) (*S3Storage, error) {
- config, err := parseS3Config(conf)
- if err != nil {
- return nil, err
- }
- return NewS3StorageWith(config)
- }
- // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
- func NewS3StorageWith(config S3Config) (*S3Storage, error) {
- var chain []credentials.Provider
- wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p }
- if config.SignatureV2 {
- wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider {
- return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2}
- }
- }
- if err := validate(config); err != nil {
- return nil, err
- }
- if config.AWSSDKAuth {
- chain = []credentials.Provider{
- wrapCredentialsProvider(&awsAuth{Region: config.Region}),
- }
- } else if config.AccessKey != "" {
- chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
- Value: credentials.Value{
- AccessKeyID: config.AccessKey,
- SecretAccessKey: config.SecretKey,
- SignerType: credentials.SignatureV4,
- },
- })}
- } else {
- chain = []credentials.Provider{
- wrapCredentialsProvider(&credentials.EnvAWS{}),
- wrapCredentialsProvider(&credentials.FileAWSCredentials{}),
- wrapCredentialsProvider(&credentials.IAM{
- Client: &http.Client{
- Transport: http.DefaultTransport,
- },
- Endpoint: config.STSEndpoint,
- }),
- }
- }
- // Check if a roundtripper has been set in the config
- // otherwise build the default transport.
- var rt http.RoundTripper
- if config.HTTPConfig.Transport != nil {
- rt = config.HTTPConfig.Transport
- } else {
- var err error
- rt, err = DefaultS3Transport(config)
- if err != nil {
- return nil, err
- }
- }
- client, err := minio.New(config.Endpoint, &minio.Options{
- Creds: credentials.NewChainCredentials(chain),
- Secure: !config.Insecure,
- Region: config.Region,
- Transport: rt,
- })
- if err != nil {
- return nil, errors.Wrap(err, "initialize s3 client")
- }
- var sse encrypt.ServerSide
- if config.SSEConfig.Type != "" {
- switch config.SSEConfig.Type {
- case SSEKMS:
- // If the KMSEncryptionContext is a nil map the header that is
- // constructed by the encrypt.ServerSide object will be base64
- // encoded "nil" which is not accepted by AWS.
- if config.SSEConfig.KMSEncryptionContext == nil {
- config.SSEConfig.KMSEncryptionContext = make(map[string]string)
- }
- sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
- if err != nil {
- return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
- }
- case SSEC:
- key, err := ioutil.ReadFile(config.SSEConfig.EncryptionKey)
- if err != nil {
- return nil, err
- }
- sse, err = encrypt.NewSSEC(key)
- if err != nil {
- return nil, errors.Wrap(err, "initialize s3 client SSE-C")
- }
- case SSES3:
- sse = encrypt.NewSSE()
- default:
- sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
- return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
- }
- }
- if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" {
- return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion)
- }
- bkt := &S3Storage{
- name: config.Bucket,
- client: client,
- defaultSSE: sse,
- putUserMetadata: config.PutUserMetadata,
- partSize: config.PartSize,
- listObjectsV1: config.ListObjectsVersion == "v1",
- }
- return bkt, nil
- }
- // Name returns the bucket name for s3.
- func (s3 *S3Storage) Name() string {
- return s3.name
- }
- // StorageType returns a string identifier for the type of storage used by the implementation.
- func (s3 *S3Storage) StorageType() StorageType {
- return StorageTypeBucketS3
- }
- // validate checks to see the config options are set.
- func validate(conf S3Config) error {
- if conf.Endpoint == "" {
- return errors.New("no s3 endpoint in config file")
- }
- if conf.AWSSDKAuth && conf.AccessKey != "" {
- return errors.New("aws_sdk_auth and access_key are mutually exclusive configurations")
- }
- if conf.AccessKey == "" && conf.SecretKey != "" {
- return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
- }
- if conf.AccessKey != "" && conf.SecretKey == "" {
- return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
- }
- if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" {
- return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'")
- }
- if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" {
- return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'")
- }
- return nil
- }
- // FullPath returns the storage working path combined with the path provided
- func (s3 *S3Storage) FullPath(name string) string {
- name = trimLeading(name)
- return name
- }
- // Get returns a reader for the given object name.
- func (s3 *S3Storage) Read(name string) ([]byte, error) {
- name = trimLeading(name)
- log.Debugf("S3Storage::Read(%s)", name)
- ctx := context.Background()
- return s3.getRange(ctx, name, 0, -1)
- }
- // Exists checks if the given object exists.
- func (s3 *S3Storage) Exists(name string) (bool, error) {
- name = trimLeading(name)
- //log.Debugf("S3Storage::Exists(%s)", name)
- ctx := context.Background()
- _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
- if err != nil {
- if s3.isDoesNotExist(err) {
- return false, nil
- }
- return false, errors.Wrap(err, "stat s3 object")
- }
- return true, nil
- }
- // Upload the contents of the reader as an object into the bucket.
- func (s3 *S3Storage) Write(name string, data []byte) error {
- name = trimLeading(name)
- log.Debugf("S3Storage::Write(%s)", name)
- ctx := context.Background()
- sse, err := s3.getServerSideEncryption(ctx)
- if err != nil {
- return err
- }
- var size int64 = int64(len(data))
- // Set partSize to 0 to write files in one go. This prevents chunking of
- // upload into multiple parts, which requires additional memory for buffering
- // the sub-parts. To remain consistent with other storage implementations,
- // we would rather attempt to lower cost fast upload and fast-fail.
- var partSize uint64 = 0
- r := bytes.NewReader(data)
- _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
- PartSize: partSize,
- ServerSideEncryption: sse,
- UserMetadata: s3.putUserMetadata,
- })
- if err != nil {
- return errors.Wrap(err, "upload s3 object")
- }
- return nil
- }
- // Attributes returns information about the specified object.
- func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
- name = trimLeading(name)
- //log.Debugf("S3Storage::Stat(%s)", name)
- ctx := context.Background()
- objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
- if err != nil {
- if s3.isDoesNotExist(err) {
- return nil, DoesNotExistError
- }
- return nil, err
- }
- return &StorageInfo{
- Name: trimName(name),
- Size: objInfo.Size,
- ModTime: objInfo.LastModified,
- }, nil
- }
- // Delete removes the object with the given name.
- func (s3 *S3Storage) Remove(name string) error {
- name = trimLeading(name)
- log.Debugf("S3Storage::Remove(%s)", name)
- ctx := context.Background()
- return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{})
- }
- func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
- path = trimLeading(path)
- log.Debugf("S3Storage::List(%s)", path)
- ctx := context.Background()
- // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
- // object itself as one prefix item.
- if path != "" {
- path = strings.TrimSuffix(path, DirDelim) + DirDelim
- }
- opts := minio.ListObjectsOptions{
- Prefix: path,
- Recursive: false,
- UseV1: s3.listObjectsV1,
- }
- var stats []*StorageInfo
- for object := range s3.client.ListObjects(ctx, s3.name, opts) {
- // Catch the error when failed to list objects.
- if object.Err != nil {
- return nil, object.Err
- }
- // This sometimes happens with empty buckets.
- if object.Key == "" {
- continue
- }
- // The s3 client can also return the directory itself in the ListObjects call above.
- if object.Key == path {
- continue
- }
- stats = append(stats, &StorageInfo{
- Name: trimName(object.Key),
- Size: object.Size,
- ModTime: object.LastModified,
- })
- }
- return stats, nil
- }
- // getServerSideEncryption returns the SSE to use.
- func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
- if value := ctx.Value(sseConfigKey); value != nil {
- if sse, ok := value.(encrypt.ServerSide); ok {
- return sse, nil
- }
- return nil, errors.New("invalid SSE config override provided in the context")
- }
- return s3.defaultSSE, nil
- }
- // isDoesNotExist returns true if error means that object key is not found.
- func (s3 *S3Storage) isDoesNotExist(err error) bool {
- return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey"
- }
- // isObjNotFound returns true if the error means that the object was not found
- func (s3 *S3Storage) isObjNotFound(err error) bool {
- return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject"
- }
- func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) {
- sse, err := s3.getServerSideEncryption(ctx)
- if err != nil {
- return nil, err
- }
- opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
- if length != -1 {
- if err := opts.SetRange(off, off+length-1); err != nil {
- return nil, err
- }
- } else if off > 0 {
- if err := opts.SetRange(off, 0); err != nil {
- return nil, err
- }
- }
- r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
- if err != nil {
- if s3.isObjNotFound(err) {
- return nil, DoesNotExistError
- }
- return nil, err
- }
- // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
- // for convenience.
- if _, err := r.Read(nil); err != nil {
- r.Close()
- if s3.isObjNotFound(err) {
- return nil, DoesNotExistError
- }
- return nil, errors.Wrap(err, "Read from S3 failed")
- }
- return ioutil.ReadAll(r)
- }
- // awsAuth retrieves credentials from the aws-sdk-go.
- type awsAuth struct {
- Region string
- creds aws.Credentials
- }
- // Retrieve retrieves the keys from the environment.
- func (a *awsAuth) Retrieve() (credentials.Value, error) {
- cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(a.Region))
- if err != nil {
- return credentials.Value{}, errors.Wrap(err, "load AWS SDK config")
- }
- creds, err := cfg.Credentials.Retrieve(context.TODO())
- if err != nil {
- return credentials.Value{}, errors.Wrap(err, "retrieve AWS SDK credentials")
- }
- a.creds = creds
- return credentials.Value{
- AccessKeyID: creds.AccessKeyID,
- SecretAccessKey: creds.SecretAccessKey,
- SessionToken: creds.SessionToken,
- SignerType: credentials.SignatureV4,
- }, nil
- }
- // IsExpired returns if the credentials have been retrieved.
- func (a *awsAuth) IsExpired() bool {
- return a.creds.Expired()
- }
- type overrideSignerType struct {
- credentials.Provider
- signerType credentials.SignatureType
- }
- func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
- v, err := s.Provider.Retrieve()
- if err != nil {
- return v, err
- }
- if !v.SignerType.IsAnonymous() {
- v.SignerType = s.signerType
- }
- return v, nil
- }
|