s3storage.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. // Fork from Thanos S3 Bucket support to reuse configuration options
  2. // Licensed under the Apache License 2.0
  3. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/s3/s3.go
  4. package storage
  5. import (
  6. "bytes"
  7. "context"
  8. "crypto/tls"
  9. "io/ioutil"
  10. "net"
  11. "net/http"
  12. "strings"
  13. "time"
  14. "github.com/kubecost/cost-model/pkg/log"
  15. "github.com/minio/minio-go/v7"
  16. "github.com/minio/minio-go/v7/pkg/credentials"
  17. "github.com/minio/minio-go/v7/pkg/encrypt"
  18. "github.com/pkg/errors"
  19. "gopkg.in/yaml.v2"
  20. )
  21. type ctxKey int
  22. const (
  23. // DirDelim is the delimiter used to model a directory structure in an object store bucket.
  24. DirDelim = "/"
  25. // SSEKMS is the name of the SSE-KMS method for objectstore encryption.
  26. SSEKMS = "SSE-KMS"
  27. // SSEC is the name of the SSE-C method for objstore encryption.
  28. SSEC = "SSE-C"
  29. // SSES3 is the name of the SSE-S3 method for objstore encryption.
  30. SSES3 = "SSE-S3"
  31. // sseConfigKey is the context key to override SSE config. This feature is used by downstream
  32. // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
  33. // refactoring can introduce breaking changes as far as the functionality is preserved.
  34. // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
  35. // be available to wider set of backends we should probably add a variadic option to Get() and Upload().
  36. sseConfigKey = ctxKey(0)
  37. )
  38. var DefaultConfig = S3Config{
  39. PutUserMetadata: map[string]string{},
  40. HTTPConfig: HTTPConfig{
  41. IdleConnTimeout: time.Duration(90 * time.Second),
  42. ResponseHeaderTimeout: time.Duration(2 * time.Minute),
  43. TLSHandshakeTimeout: time.Duration(10 * time.Second),
  44. ExpectContinueTimeout: time.Duration(1 * time.Second),
  45. MaxIdleConns: 100,
  46. MaxIdleConnsPerHost: 100,
  47. MaxConnsPerHost: 0,
  48. },
  49. PartSize: 1024 * 1024 * 64, // 64Ms3.
  50. }
  51. // Config stores the configuration for s3 bucket.
  52. type S3Config struct {
  53. Bucket string `yaml:"bucket"`
  54. Endpoint string `yaml:"endpoint"`
  55. Region string `yaml:"region"`
  56. AccessKey string `yaml:"access_key"`
  57. Insecure bool `yaml:"insecure"`
  58. SignatureV2 bool `yaml:"signature_version2"`
  59. SecretKey string `yaml:"secret_key"`
  60. PutUserMetadata map[string]string `yaml:"put_user_metadata"`
  61. HTTPConfig HTTPConfig `yaml:"http_config"`
  62. TraceConfig TraceConfig `yaml:"trace"`
  63. ListObjectsVersion string `yaml:"list_objects_version"`
  64. // PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
  65. // NOTE we need to make sure this number does not produce more parts than 10 000.
  66. PartSize uint64 `yaml:"part_size"`
  67. SSEConfig SSEConfig `yaml:"sse_config"`
  68. }
  69. // SSEConfig deals with the configuration of SSE for Minio. The following options are valid:
  70. // kmsencryptioncontext == https://docs.aws.amazon.com/kms/latest/developerguide/services-s3.html#s3-encryption-context
  71. type SSEConfig struct {
  72. Type string `yaml:"type"`
  73. KMSKeyID string `yaml:"kms_key_id"`
  74. KMSEncryptionContext map[string]string `yaml:"kms_encryption_context"`
  75. EncryptionKey string `yaml:"encryption_key"`
  76. }
  77. type TraceConfig struct {
  78. Enable bool `yaml:"enable"`
  79. }
  80. // HTTPConfig stores the http.Transport configuration for the s3 minio client.
  81. type HTTPConfig struct {
  82. IdleConnTimeout time.Duration `yaml:"idle_conn_timeout"`
  83. ResponseHeaderTimeout time.Duration `yaml:"response_header_timeout"`
  84. InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
  85. TLSHandshakeTimeout time.Duration `yaml:"tls_handshake_timeout"`
  86. ExpectContinueTimeout time.Duration `yaml:"expect_continue_timeout"`
  87. MaxIdleConns int `yaml:"max_idle_conns"`
  88. MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host"`
  89. MaxConnsPerHost int `yaml:"max_conns_per_host"`
  90. // Allow upstream callers to inject a round tripper
  91. Transport http.RoundTripper `yaml:"-"`
  92. }
  93. // DefaultTransport - this default transport is based on the Minio
  94. // DefaultTransport up until the following commit:
  95. // https://githus3.com/minio/minio-go/commit/008c7aa71fc17e11bf980c209a4f8c4d687fc884
  96. // The values have since diverged.
  97. func DefaultTransport(config S3Config) *http.Transport {
  98. return &http.Transport{
  99. Proxy: http.ProxyFromEnvironment,
  100. DialContext: (&net.Dialer{
  101. Timeout: 30 * time.Second,
  102. KeepAlive: 30 * time.Second,
  103. DualStack: true,
  104. }).DialContext,
  105. MaxIdleConns: config.HTTPConfig.MaxIdleConns,
  106. MaxIdleConnsPerHost: config.HTTPConfig.MaxIdleConnsPerHost,
  107. IdleConnTimeout: time.Duration(config.HTTPConfig.IdleConnTimeout),
  108. MaxConnsPerHost: config.HTTPConfig.MaxConnsPerHost,
  109. TLSHandshakeTimeout: time.Duration(config.HTTPConfig.TLSHandshakeTimeout),
  110. ExpectContinueTimeout: time.Duration(config.HTTPConfig.ExpectContinueTimeout),
  111. // A custom ResponseHeaderTimeout was introduced
  112. // to cover cases where the tcp connection works but
  113. // the server never answers. Defaults to 2 minutes.
  114. ResponseHeaderTimeout: time.Duration(config.HTTPConfig.ResponseHeaderTimeout),
  115. // Set this value so that the underlying transport round-tripper
  116. // doesn't try to auto decode the body of objects with
  117. // content-encoding set to `gzip`.
  118. //
  119. // Refer: https://golang.org/src/net/http/transport.go?h=roundTrip#L1843.
  120. DisableCompression: true,
  121. // #nosec It's up to the user to decide on TLS configs
  122. TLSClientConfig: &tls.Config{InsecureSkipVerify: config.HTTPConfig.InsecureSkipVerify},
  123. }
  124. }
  125. // S3Storage provides storage via S3
  126. type S3Storage struct {
  127. name string
  128. client *minio.Client
  129. defaultSSE encrypt.ServerSide
  130. putUserMetadata map[string]string
  131. partSize uint64
  132. listObjectsV1 bool
  133. }
  134. // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
  135. func parseConfig(conf []byte) (S3Config, error) {
  136. config := DefaultConfig
  137. if err := yaml.UnmarshalStrict(conf, &config); err != nil {
  138. return S3Config{}, err
  139. }
  140. return config, nil
  141. }
  142. // NewBucket returns a new Bucket using the provided s3 config values.
  143. func NewS3Storage(conf []byte) (*S3Storage, error) {
  144. log.Infof("Creating new S3 Storage...")
  145. config, err := parseConfig(conf)
  146. if err != nil {
  147. return nil, err
  148. }
  149. return NewS3StorageWith(config)
  150. }
  151. // NewBucketWithConfig returns a new Bucket using the provided s3 config values.
  152. func NewS3StorageWith(config S3Config) (*S3Storage, error) {
  153. var chain []credentials.Provider
  154. log.Infof("New S3 Storage With Config: %+v", config)
  155. wrapCredentialsProvider := func(p credentials.Provider) credentials.Provider { return p }
  156. if config.SignatureV2 {
  157. wrapCredentialsProvider = func(p credentials.Provider) credentials.Provider {
  158. return &overrideSignerType{Provider: p, signerType: credentials.SignatureV2}
  159. }
  160. }
  161. if err := validate(config); err != nil {
  162. return nil, err
  163. }
  164. if config.AccessKey != "" {
  165. chain = []credentials.Provider{wrapCredentialsProvider(&credentials.Static{
  166. Value: credentials.Value{
  167. AccessKeyID: config.AccessKey,
  168. SecretAccessKey: config.SecretKey,
  169. SignerType: credentials.SignatureV4,
  170. },
  171. })}
  172. } else {
  173. chain = []credentials.Provider{
  174. wrapCredentialsProvider(&credentials.EnvAWS{}),
  175. wrapCredentialsProvider(&credentials.FileAWSCredentials{}),
  176. wrapCredentialsProvider(&credentials.IAM{
  177. Client: &http.Client{
  178. Transport: http.DefaultTransport,
  179. },
  180. }),
  181. }
  182. }
  183. // Check if a roundtripper has been set in the config
  184. // otherwise build the default transport.
  185. var rt http.RoundTripper
  186. if config.HTTPConfig.Transport != nil {
  187. rt = config.HTTPConfig.Transport
  188. } else {
  189. rt = DefaultTransport(config)
  190. }
  191. client, err := minio.New(config.Endpoint, &minio.Options{
  192. Creds: credentials.NewChainCredentials(chain),
  193. Secure: !config.Insecure,
  194. Region: config.Region,
  195. Transport: rt,
  196. })
  197. if err != nil {
  198. return nil, errors.Wrap(err, "initialize s3 client")
  199. }
  200. var sse encrypt.ServerSide
  201. if config.SSEConfig.Type != "" {
  202. switch config.SSEConfig.Type {
  203. case SSEKMS:
  204. sse, err = encrypt.NewSSEKMS(config.SSEConfig.KMSKeyID, config.SSEConfig.KMSEncryptionContext)
  205. if err != nil {
  206. return nil, errors.Wrap(err, "initialize s3 client SSE-KMS")
  207. }
  208. case SSEC:
  209. key, err := ioutil.ReadFile(config.SSEConfig.EncryptionKey)
  210. if err != nil {
  211. return nil, err
  212. }
  213. sse, err = encrypt.NewSSEC(key)
  214. if err != nil {
  215. return nil, errors.Wrap(err, "initialize s3 client SSE-C")
  216. }
  217. case SSES3:
  218. sse = encrypt.NewSSE()
  219. default:
  220. sseErrMsg := errors.Errorf("Unsupported type %q was provided. Supported types are SSE-S3, SSE-KMS, SSE-C", config.SSEConfig.Type)
  221. return nil, errors.Wrap(sseErrMsg, "Initialize s3 client SSE Config")
  222. }
  223. }
  224. if config.ListObjectsVersion != "" && config.ListObjectsVersion != "v1" && config.ListObjectsVersion != "v2" {
  225. return nil, errors.Errorf("Initialize s3 client list objects version: Unsupported version %q was provided. Supported values are v1, v2", config.ListObjectsVersion)
  226. }
  227. bkt := &S3Storage{
  228. name: config.Bucket,
  229. client: client,
  230. defaultSSE: sse,
  231. putUserMetadata: config.PutUserMetadata,
  232. partSize: config.PartSize,
  233. listObjectsV1: config.ListObjectsVersion == "v1",
  234. }
  235. return bkt, nil
  236. }
  237. // Name returns the bucket name for s3.
  238. func (s3 *S3Storage) Name() string {
  239. return s3.name
  240. }
  241. // validate checks to see the config options are set.
  242. func validate(conf S3Config) error {
  243. if conf.Endpoint == "" {
  244. return errors.New("no s3 endpoint in config file")
  245. }
  246. if conf.AccessKey == "" && conf.SecretKey != "" {
  247. return errors.New("no s3 acccess_key specified while secret_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  248. }
  249. if conf.AccessKey != "" && conf.SecretKey == "" {
  250. return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
  251. }
  252. if conf.SSEConfig.Type == SSEC && conf.SSEConfig.EncryptionKey == "" {
  253. return errors.New("encryption_key must be set if sse_config.type is set to 'SSE-C'")
  254. }
  255. if conf.SSEConfig.Type == SSEKMS && conf.SSEConfig.KMSKeyID == "" {
  256. return errors.New("kms_key_id must be set if sse_config.type is set to 'SSE-KMS'")
  257. }
  258. return nil
  259. }
  260. // FullPath returns the storage working path combined with the path provided
  261. func (s3 *S3Storage) FullPath(name string) string {
  262. name = s3.trimLeading(name)
  263. return name
  264. }
  265. // Get returns a reader for the given object name.
  266. func (s3 *S3Storage) Read(name string) ([]byte, error) {
  267. name = s3.trimLeading(name)
  268. log.Infof("S3Storage::Read(%s)", name)
  269. ctx := context.Background()
  270. return s3.getRange(ctx, name, 0, -1)
  271. }
  272. // Exists checks if the given object exists.
  273. func (s3 *S3Storage) Exists(name string) (bool, error) {
  274. name = s3.trimLeading(name)
  275. //log.Infof("S3Storage::Exists(%s)", name)
  276. ctx := context.Background()
  277. _, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  278. if err != nil {
  279. if s3.isDoesNotExist(err) {
  280. return false, nil
  281. }
  282. return false, errors.Wrap(err, "stat s3 object")
  283. }
  284. return true, nil
  285. }
  286. // Upload the contents of the reader as an object into the bucket.
  287. func (s3 *S3Storage) Write(name string, data []byte) error {
  288. name = s3.trimLeading(name)
  289. log.Infof("S3Storage::Write(%s)", name)
  290. ctx := context.Background()
  291. sse, err := s3.getServerSideEncryption(ctx)
  292. if err != nil {
  293. return err
  294. }
  295. var size int64 = int64(len(data))
  296. var partSize uint64 = 0
  297. r := bytes.NewReader(data)
  298. _, err = s3.client.PutObject(ctx, s3.name, name, r, int64(size), minio.PutObjectOptions{
  299. PartSize: partSize,
  300. ServerSideEncryption: sse,
  301. UserMetadata: s3.putUserMetadata,
  302. })
  303. if err != nil {
  304. return errors.Wrap(err, "upload s3 object")
  305. }
  306. return nil
  307. }
  308. // Attributes returns information about the specified object.
  309. func (s3 *S3Storage) Stat(name string) (*StorageInfo, error) {
  310. name = s3.trimLeading(name)
  311. //log.Infof("S3Storage::Stat(%s)", name)
  312. ctx := context.Background()
  313. objInfo, err := s3.client.StatObject(ctx, s3.name, name, minio.StatObjectOptions{})
  314. if err != nil {
  315. if s3.isDoesNotExist(err) {
  316. return nil, DoesNotExistError
  317. }
  318. return nil, err
  319. }
  320. return &StorageInfo{
  321. Name: s3.trimName(name),
  322. Size: objInfo.Size,
  323. ModTime: objInfo.LastModified,
  324. }, nil
  325. }
  326. // Delete removes the object with the given name.
  327. func (s3 *S3Storage) Remove(name string) error {
  328. name = s3.trimLeading(name)
  329. log.Infof("S3Storage::Remove(%s)", name)
  330. ctx := context.Background()
  331. return s3.client.RemoveObject(ctx, s3.name, name, minio.RemoveObjectOptions{})
  332. }
  333. func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
  334. path = s3.trimLeading(path)
  335. log.Infof("S3Storage::List(%s)", path)
  336. ctx := context.Background()
  337. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  338. // object itself as one prefix item.
  339. if path != "" {
  340. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  341. }
  342. opts := minio.ListObjectsOptions{
  343. Prefix: path,
  344. Recursive: false,
  345. UseV1: s3.listObjectsV1,
  346. }
  347. var stats []*StorageInfo
  348. for object := range s3.client.ListObjects(ctx, s3.name, opts) {
  349. // Catch the error when failed to list objects.
  350. if object.Err != nil {
  351. return nil, object.Err
  352. }
  353. // This sometimes happens with empty buckets.
  354. if object.Key == "" {
  355. continue
  356. }
  357. // The s3 client can also return the directory itself in the ListObjects call above.
  358. if object.Key == path {
  359. continue
  360. }
  361. stats = append(stats, &StorageInfo{
  362. Name: s3.trimName(object.Key),
  363. Size: object.Size,
  364. ModTime: object.LastModified,
  365. })
  366. }
  367. return stats, nil
  368. }
  369. // trimLeading removes a leading / from the file name
  370. func (s3 *S3Storage) trimLeading(file string) string {
  371. if len(file) == 0 {
  372. return file
  373. }
  374. if file[0] == '/' {
  375. return file[1:]
  376. }
  377. return file
  378. }
  379. // trimName removes the leading directory prefix
  380. func (s3 *S3Storage) trimName(file string) string {
  381. slashIndex := strings.LastIndex(file, "/")
  382. if slashIndex < 0 {
  383. return file
  384. }
  385. name := file[slashIndex+1:]
  386. return name
  387. }
  388. // getServerSideEncryption returns the SSE to use.
  389. func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
  390. if value := ctx.Value(sseConfigKey); value != nil {
  391. if sse, ok := value.(encrypt.ServerSide); ok {
  392. return sse, nil
  393. }
  394. return nil, errors.New("invalid SSE config override provided in the context")
  395. }
  396. return s3.defaultSSE, nil
  397. }
  398. // isDoesNotExist returns true if error means that object key is not found.
  399. func (s3 *S3Storage) isDoesNotExist(err error) bool {
  400. return minio.ToErrorResponse(errors.Cause(err)).Code == "NoSuchKey"
  401. }
  402. // isObjNotFound returns true if the error means that the object was not found
  403. func (s3 *S3Storage) isObjNotFound(err error) bool {
  404. return minio.ToErrorResponse(errors.Cause(err)).Code == "NotFoundObject"
  405. }
  406. func (s3 *S3Storage) getRange(ctx context.Context, name string, off, length int64) ([]byte, error) {
  407. sse, err := s3.getServerSideEncryption(ctx)
  408. if err != nil {
  409. return nil, err
  410. }
  411. opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
  412. if length != -1 {
  413. if err := opts.SetRange(off, off+length-1); err != nil {
  414. return nil, err
  415. }
  416. } else if off > 0 {
  417. if err := opts.SetRange(off, 0); err != nil {
  418. return nil, err
  419. }
  420. }
  421. r, err := s3.client.GetObject(ctx, s3.name, name, *opts)
  422. if err != nil {
  423. if s3.isObjNotFound(err) {
  424. return nil, DoesNotExistError
  425. }
  426. return nil, err
  427. }
  428. // NotFoundObject error is revealed only after first Read. This does the initial GetRequest. Prefetch this here
  429. // for convenience.
  430. if _, err := r.Read(nil); err != nil {
  431. r.Close()
  432. if s3.isObjNotFound(err) {
  433. return nil, DoesNotExistError
  434. }
  435. return nil, errors.Wrap(err, "Read from S3 failed")
  436. }
  437. return ioutil.ReadAll(r)
  438. }
  439. type overrideSignerType struct {
  440. credentials.Provider
  441. signerType credentials.SignatureType
  442. }
  443. func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
  444. v, err := s.Provider.Retrieve()
  445. if err != nil {
  446. return v, err
  447. }
  448. if !v.SignerType.IsAnonymous() {
  449. v.SignerType = s.signerType
  450. }
  451. return v, nil
  452. }