فهرست منبع

Update S3 Storage implementation with latest thanos configuration spec.

Matt Bolt 4 سال پیش
والد
کامیت
5827155143
1فایلهای تغییر یافته به همراه57 افزوده شده و 5 حذف شده
  1. 57 5
      pkg/storage/s3storage.go

+ 57 - 5
pkg/storage/s3storage.go

@@ -15,6 +15,9 @@ import (
 
 	"github.com/kubecost/cost-model/pkg/log"
 
+	aws "github.com/aws/aws-sdk-go-v2/aws"
+	awsconfig "github.com/aws/aws-sdk-go-v2/config"
+
 	"github.com/minio/minio-go/v7"
 	"github.com/minio/minio-go/v7/pkg/credentials"
 	"github.com/minio/minio-go/v7/pkg/encrypt"
@@ -57,7 +60,7 @@ var DefaultConfig = S3Config{
 		MaxIdleConnsPerHost:   100,
 		MaxConnsPerHost:       0,
 	},
-	PartSize: 1024 * 1024 * 64, // 64Ms3.
+	PartSize: 1024 * 1024 * 64, // 64MB.
 }
 
 // Config stores the configuration for s3 bucket.
@@ -65,6 +68,7 @@ type S3Config struct {
 	Bucket             string            `yaml:"bucket"`
 	Endpoint           string            `yaml:"endpoint"`
 	Region             string            `yaml:"region"`
+	AWSSDKAuth         bool              `yaml:"aws_sdk_auth"`
 	AccessKey          string            `yaml:"access_key"`
 	Insecure           bool              `yaml:"insecure"`
 	SignatureV2        bool              `yaml:"signature_version2"`
@@ -75,8 +79,9 @@ type S3Config struct {
 	ListObjectsVersion string            `yaml:"list_objects_version"`
 	// PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
 	// NOTE we need to make sure this number does not produce more parts than 10 000.
-	PartSize  uint64    `yaml:"part_size"`
-	SSEConfig SSEConfig `yaml:"sse_config"`
+	PartSize    uint64    `yaml:"part_size"`
+	SSEConfig   SSEConfig `yaml:"sse_config"`
+	STSEndpoint string    `yaml:"sts_endpoint"`
 }
 
 // SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
@@ -190,7 +195,12 @@ func NewS3StorageWith(config S3Config) (*S3Storage, error) {
 	if err := validate(config); err != nil {
 		return nil, err
 	}
-	if config.AccessKey != "" {
+
+	if config.AWSSDKAuth {
+		chain = []credentials.Provider{
+			wrapCredentialsProvider(&awsAuth{Region: config.Region}),
+		}
+	} else if config.AccessKey != "" {
 		chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
 			Value: credentials.Value{
 				AccessKeyID:     config.AccessKey,
@@ -206,6 +216,7 @@ func NewS3StorageWith(config S3Config) (*S3Storage, error) {
 				Client: &http.Client{
 					Transport: http.DefaultTransport,
 				},
+				Endpoint: config.STSEndpoint,
 			}),
 		}
 	}
@@ -233,6 +244,12 @@ func NewS3StorageWith(config S3Config) (*S3Storage, error) {
 	if config.SSEConfig.Type != "" {
 		switch config.SSEConfig.Type {
 		case SSEKMS:
+			// If the KMSEncryptionContext is a nil map the header that is
+			// constructed by the encrypt.ServerSide object will be base64
+			// encoded "nil" which is not accepted by AWS.
+			if config.SSEConfig.KMSEncryptionContext == nil {
+				config.SSEConfig.KMSEncryptionContext = make(map[string]string)
+			}
 			sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
 			if err != nil {
 				return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
@@ -283,7 +300,9 @@ func validate(conf S3Config) error {
 	if conf.Endpoint == "" {
 		return errors.New("no s3 endpoint in config file")
 	}
-
+	if conf.AWSSDKAuth && conf.AccessKey != "" {
+		return errors.New("aws_sdk_auth and access_key are mutually exclusive configurations")
+	}
 	if conf.AccessKey == "" && conf.SecretKey != "" {
 		return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
 	}
@@ -526,6 +545,39 @@ func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int6
 	return ioutil.ReadAll(r)
 }
 
+// awsAuth retrieves credentials from the aws-sdk-go.
+type awsAuth struct {
+	Region string
+	creds  aws.Credentials
+}
+
+// Retrieve retrieves the keys from the environment.
+func (a *awsAuth) Retrieve() (credentials.Value, error) {
+	cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(a.Region))
+	if err != nil {
+		return credentials.Value{}, errors.Wrap(err, "load AWS SDK config")
+	}
+
+	creds, err := cfg.Credentials.Retrieve(context.TODO())
+	if err != nil {
+		return credentials.Value{}, errors.Wrap(err, "retrieve AWS SDK credentials")
+	}
+
+	a.creds = creds
+
+	return credentials.Value{
+		AccessKeyID:     creds.AccessKeyID,
+		SecretAccessKey: creds.SecretAccessKey,
+		SessionToken:    creds.SessionToken,
+		SignerType:      credentials.SignatureV4,
+	}, nil
+}
+
+// IsExpired returns if the credentials have been retrieved.
+func (a *awsAuth) IsExpired() bool {
+	return a.creds.Expired()
+}
+
 type overrideSignerType struct {
 	credentials.Provider
 	signerType credentials.SignatureType