s3storage.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. package storage
  2. // Fork from Thanos S3 Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
  5. import (
  6. "bytes"
  7. "context"
  8. "io"
  9. "net/http"
  10. "os"
  11. "strings"
  12. "time"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/aws/aws-sdk-go-v2/aws"
  15. awsconfig "github.com/aws/aws-sdk-go-v2/config"
  16. "github.com/minio/minio-go/v7"
  17. "github.com/minio/minio-go/v7/pkg/credentials"
  18. "github.com/minio/minio-go/v7/pkg/encrypt"
  19. "github.com/pkg/errors"
  20. "gopkg.in/yaml.v2"
  21. )
  22. type ctxKey int
  23. const (
  24. // SSEKMS is the name of the SSE-KMS method for objectstore encryption.
  25. SSEKMS = "SSE-KMS"
  26. // SSEC is the name of the SSE-C method for objstore encryption.
  27. SSEC = "SSE-C"
  28. // SSES3 is the name of the SSE-S3 method for objstore encryption.
  29. SSES3 = "SSE-S3"
  30. // sseConfigKey is the context key to override SSE config. This feature is used by downstream
  31. // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
  32. // refactoring can introduce breaking changes as far as the functionality is preserved.
  33. // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
  34. // be available to wider set of backends we should probably add a variadic option to Get() and Upload().
  35. sseConfigKey = ctxKey(0)
  36. )
  37. var defaultS3Config = S3Config{
  38. PutUserMetadata: map[string]string{},
  39. HTTPConfig: HTTPConfig{
  40. IdleConnTimeout: 90 * time.Second,
  41. ResponseHeaderTimeout: 2 * time.Minute,
  42. TLSHandshakeTimeout: 10 * time.Second,
  43. ExpectContinueTimeout: 1 * time.Second,
  44. MaxIdleConns: 100,
  45. MaxIdleConnsPerHost: 100,
  46. MaxConnsPerHost: 0,
  47. DisableCompression: true,
  48. },
  49. PartSize: 1024 * 1024 * 64, // 64MB.
  50. }
  51. // Config stores the configuration for s3 bucket.
  52. type S3Config struct {
  53. Bucket string `yaml:"bucket"`
  54. Endpoint string `yaml:"endpoint"`
  55. Region string `yaml:"region"`
  56. AWSSDKAuth bool `yaml:"aws_sdk_auth"`
  57. AccessKey string `yaml:"access_key"`
  58. Insecure bool `yaml:"insecure"`
  59. SignatureV2 bool `yaml:"signature_version2"`
  60. SecretKey string `yaml:"secret_key"`
  61. PutUserMetadata map[string]string `yaml:"put_user_metadata"`
  62. HTTPConfig HTTPConfig `yaml:"http_config"`
  63. TraceConfig TraceConfig `yaml:"trace"`
  64. ListObjectsVersion string `yaml:"list_objects_version"`
  65. // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
  66. // NOTE we need to make sure this number does not produce more parts than 10 000.
  67. PartSize uint64 `yaml:"part_size"`
  68. SSEConfig SSEConfig `yaml:"sse_config"`
  69. STSEndpoint string `yaml:"sts_endpoint"`
  70. }
  71. // SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
  72. // kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context
  73. type SSEConfig struct {
  74. Type string `yaml:"type"`
  75. KMSKeyID string `yaml:"kms_key_id"`
  76. KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"`
  77. EncryptionKey string `yaml:"encryption_key"`
  78. }
  79. type TraceConfig struct {
  80. Enable bool `yaml:"enable"`
  81. }
  82. // S3Storage provides storage via S3
  83. type S3Storage struct {
  84. name string
  85. client *minio.Client
  86. defaultSSE encrypt.ServerSide
  87. putUserMetadata map[string]string
  88. partSize uint64
  89. listObjectsV1 bool
  90. insecure bool
  91. }
  92. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  93. func parseS3Config(conf []byte) (S3Config, error) {
  94. config := defaultS3Config
  95. if err := yaml.Unmarshal(conf, &config); err != nil {
  96. return S3Config{}, err
  97. }
  98. return config, nil
  99. }
  100. // NewBucket returns a new Bucket using the provided s3 config values.
  101. func NewS3Storage(conf []byte) (*S3Storage, error) {
  102. config, err := parseS3Config(conf)
  103. if err != nil {
  104. return nil, err
  105. }
  106. return NewS3StorageWith(config)
  107. }
  108. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  109. func NewS3StorageWith(config S3Config) (*S3Storage, error) {
  110. var chain []credentials.Provider
  111. wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p }
  112. if config.SignatureV2 {
  113. wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider {
  114. return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2}
  115. }
  116. }
  117. if err := validate(config); err != nil {
  118. return nil, err
  119. }
  120. if config.AWSSDKAuth {
  121. chain = []credentials.Provider{
  122. wrapCredentialsProvider(&awsAuth{Region: config.Region}),
  123. }
  124. } else if config.AccessKey != "" {
  125. chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
  126. Value: credentials.Value{
  127. AccessKeyID: config.AccessKey,
  128. SecretAccessKey: config.SecretKey,
  129. SignerType: credentials.SignatureV4,
  130. },
  131. })}
  132. } else {
  133. chain = []credentials.Provider{
  134. wrapCredentialsProvider(&credentials.EnvAWS{}),
  135. wrapCredentialsProvider(&credentials.FileAWSCredentials{}),
  136. wrapCredentialsProvider(&credentials.IAM{
  137. Client: &http.Client{
  138. Transport: http.DefaultTransport,
  139. },
  140. Endpoint: config.STSEndpoint,
  141. }),
  142. }
  143. }
  144. rt, err := config.HTTPConfig.GetHTTPTransport()
  145. if err != nil {
  146. return nil, err
  147. }
  148. // HTTPS Protocol Configuration: The 'Secure' option controls whether HTTPS is used.
  149. // By default, config.Insecure is false if not set, so Secure=true.
  150. client, err := minio.New(config.Endpoint, &minio.Options{
  151. Creds: credentials.NewChainCredentials(chain),
  152. Secure: !config.Insecure,
  153. Region: config.Region,
  154. Transport: rt,
  155. })
  156. if err != nil {
  157. return nil, errors.Wrap(err, "initialize s3 client")
  158. }
  159. var sse encrypt.ServerSide
  160. if config.SSEConfig.Type != "" {
  161. switch config.SSEConfig.Type {
  162. case SSEKMS:
  163. // If the KMSEncryptionContext is a nil map the header that is
  164. // constructed by the encrypt.ServerSide object will be base64
  165. // encoded "nil" which is not accepted by AWS.
  166. if config.SSEConfig.KMSEncryptionContext == nil {
  167. config.SSEConfig.KMSEncryptionContext = make(map[string]string)
  168. }
  169. sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
  170. if err != nil {
  171. return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
  172. }
  173. case SSEC:
  174. key, err := os.ReadFile(config.SSEConfig.EncryptionKey)
  175. if err != nil {
  176. return nil, err
  177. }
  178. sse, err = encrypt.NewSSEC(key)
  179. if err != nil {
  180. return nil, errors.Wrap(err, "initialize s3 client SSE-C")
  181. }
  182. case SSES3:
  183. sse = encrypt.NewSSE()
  184. default:
  185. sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
  186. return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
  187. }
  188. }
  189. if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" {
  190. return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion)
  191. }
  192. bkt := &S3Storage{
  193. name: config.Bucket,
  194. client: client,
  195. defaultSSE: sse,
  196. putUserMetadata: config.PutUserMetadata,
  197. partSize: config.PartSize,
  198. listObjectsV1: config.ListObjectsVersion == "v1",
  199. insecure: config.Insecure,
  200. }
  201. log.Debugf("S3Storage: New S3 client initialized with '%s://%s/%s'", bkt.protocol(), config.Endpoint, config.Bucket)
  202. return bkt, nil
  203. }
  204. // String returns the bucket name for s3.
  205. func (s3 *S3Storage) String() string {
  206. return s3.name
  207. }
  208. // StorageType returns a string identifier for the type of storage used by the implementation.
  209. func (s3 *S3Storage) StorageType() StorageType {
  210. return StorageTypeBucketS3
  211. }
  212. // protocol returns the protocol string (HTTP or HTTPS) based on configuration
  213. func (s3 *S3Storage) protocol() string {
  214. if s3.insecure {
  215. return "HTTP"
  216. }
  217. return "HTTPS"
  218. }
  219. // validate checks to see the config options are set.
  220. func validate(conf S3Config) error {
  221. if conf.Endpoint == "" {
  222. return errors.New("no s3 endpoint in config file")
  223. }
  224. if conf.AWSSDKAuth && conf.AccessKey != "" {
  225. return errors.New("aws_sdk_auth and access_key are mutually exclusive configurations")
  226. }
  227. if conf.AccessKey == "" && conf.SecretKey != "" {
  228. return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  229. }
  230. if conf.AccessKey != "" && conf.SecretKey == "" {
  231. return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  232. }
  233. if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" {
  234. return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'")
  235. }
  236. if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" {
  237. return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'")
  238. }
  239. return nil
  240. }
  241. // FullPath returns the storage working path combined with the path provided
  242. func (s3 *S3Storage) FullPath(name string) string {
  243. name = trimLeading(name)
  244. return name
  245. }
  246. // Get returns a reader for the given object name.
  247. func (s3 *S3Storage) Read(name string) ([]byte, error) {
  248. name = trimLeading(name)
  249. log.Debugf("S3Storage::Read::%s(%s)", s3.protocol(), name)
  250. ctx := context.Background()
  251. return s3.getRange(ctx, name, 0, -1)
  252. }
  253. // Exists checks if the given object exists.
  254. func (s3 *S3Storage) Exists(name string) (bool, error) {
  255. name = trimLeading(name)
  256. log.Debugf("S3Storage::Exists::%s(%s)", s3.protocol(), name)
  257. ctx := context.Background()
  258. _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  259. if err != nil {
  260. if s3.isDoesNotExist(err) {
  261. return false, nil
  262. }
  263. return false, errors.Wrap(err, "stat s3 object")
  264. }
  265. return true, nil
  266. }
  267. // Upload the contents of the reader as an object into the bucket.
  268. func (s3 *S3Storage) Write(name string, data []byte) error {
  269. name = trimLeading(name)
  270. log.Debugf("S3Storage::Write::%s(%s)", s3.protocol(), name)
  271. ctx := context.Background()
  272. sse, err := s3.getServerSideEncryption(ctx)
  273. if err != nil {
  274. return err
  275. }
  276. var size int64 = int64(len(data))
  277. // Set partSize to 0 to write files in one go. This prevents chunking of
  278. // upload into multiple parts, which requires additional memory for buffering
  279. // the sub-parts. To remain consistent with other storage implementations,
  280. // we would rather attempt to lower cost fast upload and fast-fail.
  281. var partSize uint64 = 0
  282. r := bytes.NewReader(data)
  283. _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
  284. PartSize: partSize,
  285. ServerSideEncryption: sse,
  286. UserMetadata: s3.putUserMetadata,
  287. })
  288. if err != nil {
  289. return errors.Wrap(err, "upload s3 object")
  290. }
  291. return nil
  292. }
  293. // Attributes returns information about the specified object.
  294. func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
  295. name = trimLeading(name)
  296. log.Debugf("S3Storage::Stat::%s(%s)", s3.protocol(), name)
  297. ctx := context.Background()
  298. objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  299. if err != nil {
  300. if s3.isDoesNotExist(err) {
  301. return nil, DoesNotExistError
  302. }
  303. return nil, err
  304. }
  305. return &StorageInfo{
  306. Name: trimName(name),
  307. Size: objInfo.Size,
  308. ModTime: objInfo.LastModified,
  309. }, nil
  310. }
  311. // Delete removes the object with the given name.
  312. func (s3 *S3Storage) Remove(name string) error {
  313. name = trimLeading(name)
  314. log.Debugf("S3Storage::Remove::%s(%s)", s3.protocol(), name)
  315. ctx := context.Background()
  316. return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{})
  317. }
  318. func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
  319. path = trimLeading(path)
  320. log.Debugf("S3Storage::List::%s(%s)", s3.protocol(), path)
  321. ctx := context.Background()
  322. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  323. // object itself as one prefix item.
  324. if path != "" {
  325. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  326. }
  327. opts := minio.ListObjectsOptions{
  328. Prefix: path,
  329. Recursive: false,
  330. UseV1: s3.listObjectsV1,
  331. }
  332. var stats []*StorageInfo
  333. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  334. // Catch the error when failed to list objects.
  335. if object.Err != nil {
  336. return nil, object.Err
  337. }
  338. // The s3 client can also return the directory itself in the ListObjects call above.
  339. if object.Key == path {
  340. continue
  341. }
  342. name := trimName(object.Key)
  343. // This sometimes happens with empty buckets.
  344. if name == "" {
  345. continue
  346. }
  347. stats = append(stats, &StorageInfo{
  348. Name: name,
  349. Size: object.Size,
  350. ModTime: object.LastModified,
  351. })
  352. }
  353. return stats, nil
  354. }
  355. func (s3 *S3Storage) ListDirectories(path string) ([]*StorageInfo, error) {
  356. path = trimLeading(path)
  357. log.Debugf("S3Storage::ListDirectories::%s(%s)", s3.protocol(), path)
  358. ctx := context.Background()
  359. if path != "" {
  360. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  361. }
  362. opts := minio.ListObjectsOptions{
  363. Prefix: path,
  364. Recursive: false,
  365. UseV1: s3.listObjectsV1,
  366. }
  367. var stats []*StorageInfo
  368. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  369. if object.Err != nil {
  370. return nil, object.Err
  371. }
  372. if object.Key == "" {
  373. continue
  374. }
  375. if object.Key == path {
  376. continue
  377. }
  378. // If trim removes the entire name, it's a directory, ergo we list it
  379. if trimName(object.Key) == "" {
  380. stats = append(stats, &StorageInfo{
  381. Name: object.Key,
  382. Size: object.Size,
  383. ModTime: object.LastModified,
  384. })
  385. }
  386. }
  387. return stats, nil
  388. }
  389. // getServerSideEncryption returns the SSE to use.
  390. func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
  391. if value := ctx.Value(sseConfigKey); value != nil {
  392. if sse, ok := value.(encrypt.ServerSide); ok {
  393. return sse, nil
  394. }
  395. return nil, errors.New("invalid SSE config override provided in the context")
  396. }
  397. return s3.defaultSSE, nil
  398. }
  399. // isDoesNotExist returns true if error means that object key is not found.
  400. func (s3 *S3Storage) isDoesNotExist(err error) bool {
  401. return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey"
  402. }
  403. // isObjNotFound returns true if the error means that the object was not found
  404. func (s3 *S3Storage) isObjNotFound(err error) bool {
  405. return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject"
  406. }
  407. func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) {
  408. sse, err := s3.getServerSideEncryption(ctx)
  409. if err != nil {
  410. return nil, err
  411. }
  412. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  413. if length != -1 {
  414. if err := opts.SetRange(off, off+length-1); err != nil {
  415. return nil, err
  416. }
  417. } else if off > 0 {
  418. if err := opts.SetRange(off, 0); err != nil {
  419. return nil, err
  420. }
  421. }
  422. r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
  423. if err != nil {
  424. if s3.isObjNotFound(err) {
  425. return nil, DoesNotExistError
  426. }
  427. return nil, err
  428. }
  429. defer r.Close()
  430. // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
  431. // for convenience.
  432. if _, err := r.Read(nil); err != nil {
  433. if s3.isObjNotFound(err) {
  434. return nil, DoesNotExistError
  435. }
  436. return nil, errors.Wrap(err, "Read from S3 failed")
  437. }
  438. return io.ReadAll(r)
  439. }
  440. // awsAuth retrieves credentials from the aws-sdk-go.
  441. type awsAuth struct {
  442. Region string
  443. creds aws.Credentials
  444. }
  445. // Retrieve retrieves the keys from the environment.
  446. func (a *awsAuth) Retrieve() (credentials.Value, error) {
  447. cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(a.Region))
  448. if err != nil {
  449. return credentials.Value{}, errors.Wrap(err, "load AWS SDK config")
  450. }
  451. creds, err := cfg.Credentials.Retrieve(context.TODO())
  452. if err != nil {
  453. return credentials.Value{}, errors.Wrap(err, "retrieve AWS SDK credentials")
  454. }
  455. a.creds = creds
  456. return credentials.Value{
  457. AccessKeyID: creds.AccessKeyID,
  458. SecretAccessKey: creds.SecretAccessKey,
  459. SessionToken: creds.SessionToken,
  460. SignerType: credentials.SignatureV4,
  461. }, nil
  462. }
  463. func (a *awsAuth) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) {
  464. return a.Retrieve()
  465. }
  466. // IsExpired returns if the credentials have been retrieved.
  467. func (a *awsAuth) IsExpired() bool {
  468. return a.creds.Expired()
  469. }
  470. type overrideSignerType struct {
  471. credentials.Provider
  472. signerType credentials.SignatureType
  473. }
  474. func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
  475. v, err := s.Provider.Retrieve()
  476. if err != nil {
  477. return v, err
  478. }
  479. if !v.SignerType.IsAnonymous() {
  480. v.SignerType = s.signerType
  481. }
  482. return v, nil
  483. }
  484. func (s *overrideSignerType) RetrieveWithCredContext(ctx *credentials.CredContext) (credentials.Value, error) {
  485. v, err := s.Provider.RetrieveWithCredContext(ctx)
  486. if err != nil {
  487. return v, err
  488. }
  489. if !v.SignerType.IsAnonymous() {
  490. v.SignerType = s.signerType
  491. }
  492. return v, nil
  493. }