bucketstorage.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "strings"
  6. "github.com/pkg/errors"
  7. "gopkg.in/yaml.v2"
  8. )
  9. // StorageProvider is the type of provider used for storage if not leveraging a file implementation.
  10. type StorageProvider string
  11. const (
  12. S3 StorageProvider = "S3"
  13. GCS StorageProvider = "GCS"
  14. AZURE StorageProvider = "AZURE"
  15. CLUSTER StorageProvider = "CLUSTER"
  16. )
  17. // StorageConfig is the configuration type used as the "parent" configuration. It contains a type, which will
  18. // specify the bucket storage implementation, and a configuration object specific to that storage implementation.
  19. type StorageConfig struct {
  20. Type StorageProvider `yaml:"type"`
  21. Config interface{} `yaml:"config"`
  22. Prefix string `yaml:"prefix"`
  23. }
  24. // NewBucketStorage initializes and returns new Storage implementation leveraging the storage provider
  25. // configuration. This configuration type uses the layout provided in thanos: https://thanos.io/tip/thanos/storage.md/
  26. func NewBucketStorage(config []byte) (Storage, error) {
  27. storageConfig := &StorageConfig{}
  28. if err := yaml.UnmarshalStrict(config, storageConfig); err != nil {
  29. return nil, errors.Wrap(err, "parsing config YAML file")
  30. }
  31. // Because the Config property is specific to the storage implementation, we'll marshal back into yaml, and allow
  32. // the specific implementation to unmarshal back into a concrete configuration type.
  33. config, err := yaml.Marshal(storageConfig.Config)
  34. if err != nil {
  35. return nil, errors.Wrap(err, "marshal content of storage configuration")
  36. }
  37. var storage Storage
  38. switch strings.ToUpper(string(storageConfig.Type)) {
  39. case string(S3):
  40. storage, err = NewS3Storage(config)
  41. case string(GCS):
  42. storage, err = NewGCSStorage(config)
  43. case string(AZURE):
  44. storage, err = NewAzureStorage(config)
  45. case string(CLUSTER):
  46. storage, err = NewClusterStorage(config)
  47. default:
  48. return nil, errors.Errorf("storage with type %s is not supported", storageConfig.Type)
  49. }
  50. if err != nil {
  51. return nil, errors.Wrap(err, fmt.Sprintf("create %s client", storageConfig.Type))
  52. }
  53. if storageConfig.Prefix != "" {
  54. return NewPrefixedBucketStorage(storage, storageConfig.Prefix)
  55. }
  56. return storage, nil
  57. }
  58. // asyncPipeWriter wraps *io.PipeWriter so that Close() blocks until the
  59. // background upload goroutine finishes and surfaces any upload error to the caller.
  60. type asyncPipeWriter struct {
  61. *io.PipeWriter
  62. done <-chan error
  63. }
  64. // newAsyncPipeWriter creates a new async pipe writer that implements io.WriteCloser that
  65. // handles asynchronous closing of an io.PipeWriter
  66. func newAsyncPipeWriter(writer *io.PipeWriter, done <-chan error) *asyncPipeWriter {
  67. return &asyncPipeWriter{
  68. PipeWriter: writer,
  69. done: done,
  70. }
  71. }
  72. // Close propagates any errors that were received on the pipewriter or reader.
  73. func (apw *asyncPipeWriter) Close() error {
  74. if err := apw.PipeWriter.Close(); err != nil {
  75. return err
  76. }
  77. return <-apw.done
  78. }
  79. // trimLeading removes a leading / from the file name
  80. func trimLeading(file string) string {
  81. if len(file) == 0 {
  82. return file
  83. }
  84. if file[0] == '/' {
  85. return file[1:]
  86. }
  87. return file
  88. }
  89. // trimName removes the leading directory prefix
  90. func trimName(file string) string {
  91. slashIndex := strings.LastIndex(file, "/")
  92. if slashIndex < 0 {
  93. return file
  94. }
  95. name := file[slashIndex+1:]
  96. return name
  97. }