gcsstorage.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package storage
  2. // Fork from Thanos GCS Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0.
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/gcs/gcs.go
  5. import (
  6. "context"
  7. "io"
  8. "strings"
  9. gcs "cloud.google.com/go/storage"
  10. "github.com/opencost/opencost/pkg/log"
  11. "github.com/pkg/errors"
  12. "golang.org/x/oauth2/google"
  13. "google.golang.org/api/iterator"
  14. "google.golang.org/api/option"
  15. "gopkg.in/yaml.v2"
  16. )
  17. // Config stores the configuration for gcs bucket.
  18. type GCSConfig struct {
  19. Bucket string `yaml:"bucket"`
  20. ServiceAccount string `yaml:"service_account"`
  21. }
  22. // GCSStorage is a storage.Storage implementation for Google Cloud Storage.
  23. type GCSStorage struct {
  24. name string
  25. bucket *gcs.BucketHandle
  26. client *gcs.Client
  27. }
  28. // NewGCSStorage creates a new GCSStorage instance using the provided GCS configuration.
  29. func NewGCSStorage(conf []byte) (*GCSStorage, error) {
  30. var gc GCSConfig
  31. if err := yaml.Unmarshal(conf, &gc); err != nil {
  32. return nil, err
  33. }
  34. return NewGCSStorageWith(gc)
  35. }
  36. // NewGCSStorageWith creates a new GCSStorage instance using the provided GCS configuration.
  37. func NewGCSStorageWith(gc GCSConfig) (*GCSStorage, error) {
  38. if gc.Bucket == "" {
  39. return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
  40. }
  41. ctx := context.Background()
  42. var opts []option.ClientOption
  43. // If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
  44. if gc.ServiceAccount != "" {
  45. credentials, err := google.CredentialsFromJSON(ctx, []byte(gc.ServiceAccount), gcs.ScopeFullControl)
  46. if err != nil {
  47. return nil, errors.Wrap(err, "failed to create credentials from JSON")
  48. }
  49. opts = append(opts, option.WithCredentials(credentials))
  50. }
  51. gcsClient, err := gcs.NewClient(ctx, opts...)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &GCSStorage{
  56. name: gc.Bucket,
  57. bucket: gcsClient.Bucket(gc.Bucket),
  58. client: gcsClient,
  59. }, nil
  60. }
  61. // Name returns the bucket name for gcs.
  62. func (gs *GCSStorage) Name() string {
  63. return gs.name
  64. }
  65. // StorageType returns a string identifier for the type of storage used by the implementation.
  66. func (gs *GCSStorage) StorageType() StorageType {
  67. return StorageTypeBucketGCS
  68. }
  69. // FullPath returns the storage working path combined with the path provided
  70. func (gs *GCSStorage) FullPath(name string) string {
  71. name = trimLeading(name)
  72. return name
  73. }
  74. // Stat returns the StorageStats for the specific path.
  75. func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
  76. name = trimLeading(name)
  77. //log.Debugf("GCSStorage::Stat(%s)", name)]
  78. ctx := context.Background()
  79. attrs, err := gs.bucket.Object(name).Attrs(ctx)
  80. if err != nil {
  81. if gs.isDoesNotExist(err) {
  82. return nil, DoesNotExistError
  83. }
  84. return nil, err
  85. }
  86. return &StorageInfo{
  87. Name: trimName(attrs.Name),
  88. Size: attrs.Size,
  89. ModTime: attrs.Updated,
  90. }, nil
  91. }
  92. // isDoesNotExist returns true if the error matches resource not exists errors.
  93. func (gs *GCSStorage) isDoesNotExist(err error) bool {
  94. msg := err.Error()
  95. return msg == gcs.ErrBucketNotExist.Error() || msg == gcs.ErrObjectNotExist.Error()
  96. }
  97. // Read uses the relative path of the storage combined with the provided path to
  98. // read the contents.
  99. func (gs *GCSStorage) Read(name string) ([]byte, error) {
  100. name = trimLeading(name)
  101. log.Debugf("GCSStorage::Read(%s)", name)
  102. ctx := context.Background()
  103. reader, err := gs.bucket.Object(name).NewReader(ctx)
  104. if err != nil {
  105. return nil, err
  106. }
  107. data, err := io.ReadAll(reader)
  108. if err != nil {
  109. return nil, err
  110. }
  111. return data, nil
  112. }
  113. // Write uses the relative path of the storage combined with the provided path
  114. // to write a new file or overwrite an existing file.
  115. func (gs *GCSStorage) Write(name string, data []byte) error {
  116. name = trimLeading(name)
  117. log.Debugf("GCSStorage::Write(%s)", name)
  118. ctx := context.Background()
  119. writer := gs.bucket.Object(name).NewWriter(ctx)
  120. // Set chunksize to 0 to write files in one go. This prevents chunking of
  121. // upload into multiple parts, which requires additional memory for buffering
  122. // the sub-parts. To remain consistent with other storage implementations,
  123. // we would rather attempt to lower cost fast upload and fast-fail.
  124. writer.ChunkSize = 0
  125. // Write the data to GCS object
  126. if _, err := writer.Write(data); err != nil {
  127. return errors.Wrap(err, "upload gcs object")
  128. }
  129. // NOTE: Sometimes errors don't arrive during Write(), so we must also check
  130. // NOTE: the error returned by Close().
  131. if err := writer.Close(); err != nil {
  132. return errors.Wrap(err, "upload gcs object")
  133. }
  134. return nil
  135. }
  136. // Remove uses the relative path of the storage combined with the provided path to
  137. // remove a file from storage permanently.
  138. func (gs *GCSStorage) Remove(name string) error {
  139. name = trimLeading(name)
  140. log.Debugf("GCSStorage::Remove(%s)", name)
  141. ctx := context.Background()
  142. return gs.bucket.Object(name).Delete(ctx)
  143. }
  144. // Exists uses the relative path of the storage combined with the provided path to
  145. // determine if the file exists.
  146. func (gs *GCSStorage) Exists(name string) (bool, error) {
  147. name = trimLeading(name)
  148. //log.Debugf("GCSStorage::Exists(%s)", name)
  149. ctx := context.Background()
  150. _, err := gs.bucket.Object(name).Attrs(ctx)
  151. if err != nil {
  152. if gs.isDoesNotExist(err) {
  153. return false, nil
  154. }
  155. return false, errors.Wrap(err, "stat gcs object")
  156. }
  157. return true, nil
  158. }
  159. // List uses the relative path of the storage combined with the provided path to return
  160. // storage information for the files.
  161. func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
  162. path = trimLeading(path)
  163. log.Debugf("GCSStorage::List(%s)", path)
  164. ctx := context.Background()
  165. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  166. // object itself as one prefix item.
  167. if path != "" {
  168. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  169. }
  170. it := gs.bucket.Objects(ctx, &gcs.Query{
  171. Prefix: path,
  172. Delimiter: DirDelim,
  173. })
  174. // iterate over the objects at the path, collect storage info
  175. var stats []*StorageInfo
  176. for {
  177. attrs, err := it.Next()
  178. if err == iterator.Done {
  179. break
  180. }
  181. if err != nil {
  182. return nil, errors.Wrap(err, "list gcs objects")
  183. }
  184. // ignore the root path directory
  185. if attrs.Name == path {
  186. continue
  187. }
  188. stats = append(stats, &StorageInfo{
  189. Name: trimName(attrs.Name),
  190. Size: attrs.Size,
  191. ModTime: attrs.Updated,
  192. })
  193. }
  194. return stats, nil
  195. }