s3storage.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. package storage
  2. // Fork from Thanos S3 Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
  5. import (
  6. "bytes"
  7. "context"
  8. "io/ioutil"
  9. "net"
  10. "net/http"
  11. "strings"
  12. "time"
  13. "github.com/opencost/opencost/pkg/log"
  14. aws "github.com/aws/aws-sdk-go-v2/aws"
  15. awsconfig "github.com/aws/aws-sdk-go-v2/config"
  16. "github.com/minio/minio-go/v7"
  17. "github.com/minio/minio-go/v7/pkg/credentials"
  18. "github.com/minio/minio-go/v7/pkg/encrypt"
  19. "github.com/pkg/errors"
  20. "gopkg.in/yaml.v2"
  21. )
  22. type ctxKey int
  23. const (
  24. // SSEKMS is the name of the SSE-KMS method for objectstore encryption.
  25. SSEKMS = "SSE-KMS"
  26. // SSEC is the name of the SSE-C method for objstore encryption.
  27. SSEC = "SSE-C"
  28. // SSES3 is the name of the SSE-S3 method for objstore encryption.
  29. SSES3 = "SSE-S3"
  30. // sseConfigKey is the context key to override SSE config. This feature is used by downstream
  31. // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
  32. // refactoring can introduce breaking changes as far as the functionality is preserved.
  33. // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
  34. // be available to wider set of backends we should probably add a variadic option to Get() and Upload().
  35. sseConfigKey = ctxKey(0)
  36. )
  37. var defaultS3Config = S3Config{
  38. PutUserMetadata: map[string]string{},
  39. HTTPConfig: S3HTTPConfig{
  40. IdleConnTimeout: time.Duration(90 * time.Second),
  41. ResponseHeaderTimeout: time.Duration(2 * time.Minute),
  42. TLSHandshakeTimeout: time.Duration(10 * time.Second),
  43. ExpectContinueTimeout: time.Duration(1 * time.Second),
  44. MaxIdleConns: 100,
  45. MaxIdleConnsPerHost: 100,
  46. MaxConnsPerHost: 0,
  47. },
  48. PartSize: 1024 * 1024 * 64, // 64MB.
  49. }
  50. // Config stores the configuration for s3 bucket.
  51. type S3Config struct {
  52. Bucket string `yaml:"bucket"`
  53. Endpoint string `yaml:"endpoint"`
  54. Region string `yaml:"region"`
  55. AWSSDKAuth bool `yaml:"aws_sdk_auth"`
  56. AccessKey string `yaml:"access_key"`
  57. Insecure bool `yaml:"insecure"`
  58. SignatureV2 bool `yaml:"signature_version2"`
  59. SecretKey string `yaml:"secret_key"`
  60. PutUserMetadata map[string]string `yaml:"put_user_metadata"`
  61. HTTPConfig S3HTTPConfig `yaml:"http_config"`
  62. TraceConfig TraceConfig `yaml:"trace"`
  63. ListObjectsVersion string `yaml:"list_objects_version"`
  64. // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
  65. // NOTE we need to make sure this number does not produce more parts than 10 000.
  66. PartSize uint64 `yaml:"part_size"`
  67. SSEConfig SSEConfig `yaml:"sse_config"`
  68. STSEndpoint string `yaml:"sts_endpoint"`
  69. }
  70. // SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
  71. // kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context
  72. type SSEConfig struct {
  73. Type string `yaml:"type"`
  74. KMSKeyID string `yaml:"kms_key_id"`
  75. KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"`
  76. EncryptionKey string `yaml:"encryption_key"`
  77. }
  78. type TraceConfig struct {
  79. Enable bool `yaml:"enable"`
  80. }
  81. // HTTPConfig stores the http.Transport configuration for the s3 minio client.
  82. type S3HTTPConfig struct {
  83. IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
  84. ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
  85. InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
  86. TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`
  87. ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
  88. MaxIdleConns int `yaml:"max_idle_conns"`
  89. MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
  90. MaxConnsPerHost int `yaml:"max_conns_per_host"`
  91. // Allow upstream callers to inject a round tripper
  92. Transport http.RoundTripper `yaml:"-"`
  93. TLSConfig TLSConfig `yaml:"tls_config"`
  94. }
  95. // DefaultTransport - this default transport is based on the Minio
  96. // DefaultTransport up until the following commit:
  97. // https://githus3.com/minio/minio-go/commit/008c7aa71fc17e11bf980c209a4f8c4d687fc884
  98. // The values have since diverged.
  99. func DefaultS3Transport(config S3Config) (*http.Transport, error) {
  100. tlsConfig, err := NewTLSConfig(&config.HTTPConfig.TLSConfig)
  101. if err != nil {
  102. return nil, err
  103. }
  104. if config.HTTPConfig.InsecureSkipVerify {
  105. tlsConfig.InsecureSkipVerify = true
  106. }
  107. return &http.Transport{
  108. Proxy: http.ProxyFromEnvironment,
  109. DialContext: (&net.Dialer{
  110. Timeout: 30 * time.Second,
  111. KeepAlive: 30 * time.Second,
  112. DualStack: true,
  113. }).DialContext,
  114. MaxIdleConns: config.HTTPConfig.MaxIdleConns,
  115. MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
  116. IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
  117. MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
  118. TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
  119. ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
  120. // A custom ResponseHeaderTimeout was introduced
  121. // to cover cases where the tcp connection works but
  122. // the server never answers. Defaults to 2 minutes.
  123. ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
  124. // Set this value so that the underlying transport round-tripper
  125. // doesn't try to auto decode the body of objects with
  126. // content-encoding set to `gzip`.
  127. //
  128. // Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
  129. DisableCompression: true,
  130. // #nosec It's up to the user to decide on TLS configs
  131. TLSClientConfig: tlsConfig,
  132. }, nil
  133. }
  134. // S3Storage provides storage via S3
  135. type S3Storage struct {
  136. name string
  137. client *minio.Client
  138. defaultSSE encrypt.ServerSide
  139. putUserMetadata map[string]string
  140. partSize uint64
  141. listObjectsV1 bool
  142. }
  143. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  144. func parseS3Config(conf []byte) (S3Config, error) {
  145. config := defaultS3Config
  146. if err := yaml.UnmarshalStrict(conf, &config); err != nil {
  147. return S3Config{}, err
  148. }
  149. return config, nil
  150. }
  151. // NewBucket returns a new Bucket using the provided s3 config values.
  152. func NewS3Storage(conf []byte) (*S3Storage, error) {
  153. config, err := parseS3Config(conf)
  154. if err != nil {
  155. return nil, err
  156. }
  157. return NewS3StorageWith(config)
  158. }
  159. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  160. func NewS3StorageWith(config S3Config) (*S3Storage, error) {
  161. var chain []credentials.Provider
  162. wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p }
  163. if config.SignatureV2 {
  164. wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider {
  165. return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2}
  166. }
  167. }
  168. if err := validate(config); err != nil {
  169. return nil, err
  170. }
  171. if config.AWSSDKAuth {
  172. chain = []credentials.Provider{
  173. wrapCredentialsProvider(&awsAuth{Region: config.Region}),
  174. }
  175. } else if config.AccessKey != "" {
  176. chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
  177. Value: credentials.Value{
  178. AccessKeyID: config.AccessKey,
  179. SecretAccessKey: config.SecretKey,
  180. SignerType: credentials.SignatureV4,
  181. },
  182. })}
  183. } else {
  184. chain = []credentials.Provider{
  185. wrapCredentialsProvider(&credentials.EnvAWS{}),
  186. wrapCredentialsProvider(&credentials.FileAWSCredentials{}),
  187. wrapCredentialsProvider(&credentials.IAM{
  188. Client: &http.Client{
  189. Transport: http.DefaultTransport,
  190. },
  191. Endpoint: config.STSEndpoint,
  192. }),
  193. }
  194. }
  195. // Check if a roundtripper has been set in the config
  196. // otherwise build the default transport.
  197. var rt http.RoundTripper
  198. if config.HTTPConfig.Transport != nil {
  199. rt = config.HTTPConfig.Transport
  200. } else {
  201. var err error
  202. rt, err = DefaultS3Transport(config)
  203. if err != nil {
  204. return nil, err
  205. }
  206. }
  207. client, err := minio.New(config.Endpoint, &minio.Options{
  208. Creds: credentials.NewChainCredentials(chain),
  209. Secure: !config.Insecure,
  210. Region: config.Region,
  211. Transport: rt,
  212. })
  213. if err != nil {
  214. return nil, errors.Wrap(err, "initialize s3 client")
  215. }
  216. var sse encrypt.ServerSide
  217. if config.SSEConfig.Type != "" {
  218. switch config.SSEConfig.Type {
  219. case SSEKMS:
  220. // If the KMSEncryptionContext is a nil map the header that is
  221. // constructed by the encrypt.ServerSide object will be base64
  222. // encoded "nil" which is not accepted by AWS.
  223. if config.SSEConfig.KMSEncryptionContext == nil {
  224. config.SSEConfig.KMSEncryptionContext = make(map[string]string)
  225. }
  226. sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
  227. if err != nil {
  228. return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
  229. }
  230. case SSEC:
  231. key, err := ioutil.ReadFile(config.SSEConfig.EncryptionKey)
  232. if err != nil {
  233. return nil, err
  234. }
  235. sse, err = encrypt.NewSSEC(key)
  236. if err != nil {
  237. return nil, errors.Wrap(err, "initialize s3 client SSE-C")
  238. }
  239. case SSES3:
  240. sse = encrypt.NewSSE()
  241. default:
  242. sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
  243. return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
  244. }
  245. }
  246. if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" {
  247. return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion)
  248. }
  249. bkt := &S3Storage{
  250. name: config.Bucket,
  251. client: client,
  252. defaultSSE: sse,
  253. putUserMetadata: config.PutUserMetadata,
  254. partSize: config.PartSize,
  255. listObjectsV1: config.ListObjectsVersion == "v1",
  256. }
  257. return bkt, nil
  258. }
  259. // Name returns the bucket name for s3.
  260. func (s3 *S3Storage) Name() string {
  261. return s3.name
  262. }
  263. // StorageType returns a string identifier for the type of storage used by the implementation.
  264. func (s3 *S3Storage) StorageType() StorageType {
  265. return StorageTypeBucketS3
  266. }
  267. // validate checks to see the config options are set.
  268. func validate(conf S3Config) error {
  269. if conf.Endpoint == "" {
  270. return errors.New("no s3 endpoint in config file")
  271. }
  272. if conf.AWSSDKAuth && conf.AccessKey != "" {
  273. return errors.New("aws_sdk_auth and access_key are mutually exclusive configurations")
  274. }
  275. if conf.AccessKey == "" && conf.SecretKey != "" {
  276. return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  277. }
  278. if conf.AccessKey != "" && conf.SecretKey == "" {
  279. return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  280. }
  281. if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" {
  282. return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'")
  283. }
  284. if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" {
  285. return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'")
  286. }
  287. return nil
  288. }
  289. // FullPath returns the storage working path combined with the path provided
  290. func (s3 *S3Storage) FullPath(name string) string {
  291. name = trimLeading(name)
  292. return name
  293. }
  294. // Get returns a reader for the given object name.
  295. func (s3 *S3Storage) Read(name string) ([]byte, error) {
  296. name = trimLeading(name)
  297. log.Debugf("S3Storage::Read(%s)", name)
  298. ctx := context.Background()
  299. return s3.getRange(ctx, name, 0, -1)
  300. }
  301. // Exists checks if the given object exists.
  302. func (s3 *S3Storage) Exists(name string) (bool, error) {
  303. name = trimLeading(name)
  304. //log.Debugf("S3Storage::Exists(%s)", name)
  305. ctx := context.Background()
  306. _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  307. if err != nil {
  308. if s3.isDoesNotExist(err) {
  309. return false, nil
  310. }
  311. return false, errors.Wrap(err, "stat s3 object")
  312. }
  313. return true, nil
  314. }
  315. // Upload the contents of the reader as an object into the bucket.
  316. func (s3 *S3Storage) Write(name string, data []byte) error {
  317. name = trimLeading(name)
  318. log.Debugf("S3Storage::Write(%s)", name)
  319. ctx := context.Background()
  320. sse, err := s3.getServerSideEncryption(ctx)
  321. if err != nil {
  322. return err
  323. }
  324. var size int64 = int64(len(data))
  325. // Set partSize to 0 to write files in one go. This prevents chunking of
  326. // upload into multiple parts, which requires additional memory for buffering
  327. // the sub-parts. To remain consistent with other storage implementations,
  328. // we would rather attempt to lower cost fast upload and fast-fail.
  329. var partSize uint64 = 0
  330. r := bytes.NewReader(data)
  331. _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
  332. PartSize: partSize,
  333. ServerSideEncryption: sse,
  334. UserMetadata: s3.putUserMetadata,
  335. })
  336. if err != nil {
  337. return errors.Wrap(err, "upload s3 object")
  338. }
  339. return nil
  340. }
  341. // Attributes returns information about the specified object.
  342. func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
  343. name = trimLeading(name)
  344. //log.Debugf("S3Storage::Stat(%s)", name)
  345. ctx := context.Background()
  346. objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  347. if err != nil {
  348. if s3.isDoesNotExist(err) {
  349. return nil, DoesNotExistError
  350. }
  351. return nil, err
  352. }
  353. return &StorageInfo{
  354. Name: trimName(name),
  355. Size: objInfo.Size,
  356. ModTime: objInfo.LastModified,
  357. }, nil
  358. }
  359. // Delete removes the object with the given name.
  360. func (s3 *S3Storage) Remove(name string) error {
  361. name = trimLeading(name)
  362. log.Debugf("S3Storage::Remove(%s)", name)
  363. ctx := context.Background()
  364. return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{})
  365. }
  366. func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
  367. path = trimLeading(path)
  368. log.Debugf("S3Storage::List(%s)", path)
  369. ctx := context.Background()
  370. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  371. // object itself as one prefix item.
  372. if path != "" {
  373. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  374. }
  375. opts := minio.ListObjectsOptions{
  376. Prefix: path,
  377. Recursive: false,
  378. UseV1: s3.listObjectsV1,
  379. }
  380. var stats []*StorageInfo
  381. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  382. // Catch the error when failed to list objects.
  383. if object.Err != nil {
  384. return nil, object.Err
  385. }
  386. // This sometimes happens with empty buckets.
  387. if object.Key == "" {
  388. continue
  389. }
  390. // The s3 client can also return the directory itself in the ListObjects call above.
  391. if object.Key == path {
  392. continue
  393. }
  394. stats = append(stats, &StorageInfo{
  395. Name: trimName(object.Key),
  396. Size: object.Size,
  397. ModTime: object.LastModified,
  398. })
  399. }
  400. return stats, nil
  401. }
  402. // getServerSideEncryption returns the SSE to use.
  403. func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
  404. if value := ctx.Value(sseConfigKey); value != nil {
  405. if sse, ok := value.(encrypt.ServerSide); ok {
  406. return sse, nil
  407. }
  408. return nil, errors.New("invalid SSE config override provided in the context")
  409. }
  410. return s3.defaultSSE, nil
  411. }
  412. // isDoesNotExist returns true if error means that object key is not found.
  413. func (s3 *S3Storage) isDoesNotExist(err error) bool {
  414. return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey"
  415. }
  416. // isObjNotFound returns true if the error means that the object was not found
  417. func (s3 *S3Storage) isObjNotFound(err error) bool {
  418. return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject"
  419. }
  420. func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) {
  421. sse, err := s3.getServerSideEncryption(ctx)
  422. if err != nil {
  423. return nil, err
  424. }
  425. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  426. if length != -1 {
  427. if err := opts.SetRange(off, off+length-1); err != nil {
  428. return nil, err
  429. }
  430. } else if off > 0 {
  431. if err := opts.SetRange(off, 0); err != nil {
  432. return nil, err
  433. }
  434. }
  435. r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
  436. if err != nil {
  437. if s3.isObjNotFound(err) {
  438. return nil, DoesNotExistError
  439. }
  440. return nil, err
  441. }
  442. // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
  443. // for convenience.
  444. if _, err := r.Read(nil); err != nil {
  445. r.Close()
  446. if s3.isObjNotFound(err) {
  447. return nil, DoesNotExistError
  448. }
  449. return nil, errors.Wrap(err, "Read from S3 failed")
  450. }
  451. return ioutil.ReadAll(r)
  452. }
  453. // awsAuth retrieves credentials from the aws-sdk-go.
  454. type awsAuth struct {
  455. Region string
  456. creds aws.Credentials
  457. }
  458. // Retrieve retrieves the keys from the environment.
  459. func (a *awsAuth) Retrieve() (credentials.Value, error) {
  460. cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(a.Region))
  461. if err != nil {
  462. return credentials.Value{}, errors.Wrap(err, "load AWS SDK config")
  463. }
  464. creds, err := cfg.Credentials.Retrieve(context.TODO())
  465. if err != nil {
  466. return credentials.Value{}, errors.Wrap(err, "retrieve AWS SDK credentials")
  467. }
  468. a.creds = creds
  469. return credentials.Value{
  470. AccessKeyID: creds.AccessKeyID,
  471. SecretAccessKey: creds.SecretAccessKey,
  472. SessionToken: creds.SessionToken,
  473. SignerType: credentials.SignatureV4,
  474. }, nil
  475. }
  476. // IsExpired returns if the credentials have been retrieved.
  477. func (a *awsAuth) IsExpired() bool {
  478. return a.creds.Expired()
  479. }
  480. type overrideSignerType struct {
  481. credentials.Provider
  482. signerType credentials.SignatureType
  483. }
  484. func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
  485. v, err := s.Provider.Retrieve()
  486. if err != nil {
  487. return v, err
  488. }
  489. if !v.SignerType.IsAnonymous() {
  490. v.SignerType = s.signerType
  491. }
  492. return v, nil
  493. }