gcsstorage.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package storage
  2. // Fork from Thanos GCS Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0.
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/gcs/gcs.go
  5. import (
  6. "context"
  7. "io"
  8. "strings"
  9. gcs "cloud.google.com/go/storage"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/pkg/errors"
  12. "golang.org/x/oauth2/google"
  13. "google.golang.org/api/iterator"
  14. "google.golang.org/api/option"
  15. "gopkg.in/yaml.v2"
  16. )
  17. // Config stores the configuration for gcs bucket.
  18. type GCSConfig struct {
  19. Bucket string `yaml:"bucket"`
  20. ServiceAccount string `yaml:"service_account"`
  21. }
  22. // GCSStorage is a storage.Storage implementation for Google Cloud Storage.
  23. type GCSStorage struct {
  24. name string
  25. bucket *gcs.BucketHandle
  26. client *gcs.Client
  27. }
  28. // NewGCSStorage creates a new GCSStorage instance using the provided GCS configuration.
  29. func NewGCSStorage(conf []byte) (*GCSStorage, error) {
  30. var gc GCSConfig
  31. if err := yaml.Unmarshal(conf, &gc); err != nil {
  32. return nil, err
  33. }
  34. return NewGCSStorageWith(gc)
  35. }
  36. // NewGCSStorageWith creates a new GCSStorage instance using the provided GCS configuration.
  37. func NewGCSStorageWith(gc GCSConfig) (*GCSStorage, error) {
  38. if gc.Bucket == "" {
  39. return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
  40. }
  41. ctx := context.Background()
  42. var opts []option.ClientOption
  43. // If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
  44. if gc.ServiceAccount != "" {
  45. credentials, err := google.CredentialsFromJSON(ctx, []byte(gc.ServiceAccount), gcs.ScopeFullControl)
  46. if err != nil {
  47. return nil, errors.Wrap(err, "failed to create credentials from JSON")
  48. }
  49. opts = append(opts, option.WithCredentials(credentials))
  50. }
  51. // HTTPS Protocol Configuration: Google Cloud Storage client uses HTTPS by default.
  52. // The GCS client library (cloud.google.com/go/storage) automatically uses HTTPS
  53. // for all API calls. All GCS operations (read, write, delete, list) use HTTPS protocol.
  54. gcsClient, err := gcs.NewClient(ctx, opts...)
  55. if err != nil {
  56. return nil, err
  57. }
  58. log.Debugf("GCSStorage: New GCS client initialized with 'https://storage.googleapis.com/%s'", gc.Bucket)
  59. return &GCSStorage{
  60. name: gc.Bucket,
  61. bucket: gcsClient.Bucket(gc.Bucket),
  62. client: gcsClient,
  63. }, nil
  64. }
  65. // String returns the bucket name for gcs.
  66. func (gs *GCSStorage) String() string {
  67. return gs.name
  68. }
  69. // StorageType returns a string identifier for the type of storage used by the implementation.
  70. func (gs *GCSStorage) StorageType() StorageType {
  71. return StorageTypeBucketGCS
  72. }
  73. // FullPath returns the storage working path combined with the path provided
  74. func (gs *GCSStorage) FullPath(name string) string {
  75. name = trimLeading(name)
  76. return name
  77. }
  78. // Stat returns the StorageStats for the specific path.
  79. func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
  80. name = trimLeading(name)
  81. //log.Debugf("GCSStorage::Stat(%s)", name)]
  82. ctx := context.Background()
  83. attrs, err := gs.bucket.Object(name).Attrs(ctx)
  84. if err != nil {
  85. if gs.isDoesNotExist(err) {
  86. return nil, DoesNotExistError
  87. }
  88. return nil, err
  89. }
  90. return &StorageInfo{
  91. Name: trimName(attrs.Name),
  92. Size: attrs.Size,
  93. ModTime: attrs.Updated,
  94. }, nil
  95. }
  96. // isDoesNotExist returns true if the error matches resource not exists errors.
  97. func (gs *GCSStorage) isDoesNotExist(err error) bool {
  98. return errors.Is(err, gcs.ErrObjectNotExist)
  99. }
  100. // Read uses the relative path of the storage combined with the provided path to
  101. // read the contents.
  102. func (gs *GCSStorage) Read(name string) ([]byte, error) {
  103. name = trimLeading(name)
  104. log.Debugf("GCSStorage::Read::HTTPS(%s)", name)
  105. ctx := context.Background()
  106. reader, err := gs.bucket.Object(name).NewReader(ctx)
  107. if err != nil {
  108. return nil, err
  109. }
  110. data, err := io.ReadAll(reader)
  111. if err != nil {
  112. return nil, err
  113. }
  114. return data, nil
  115. }
  116. // Write uses the relative path of the storage combined with the provided path
  117. // to write a new file or overwrite an existing file.
  118. func (gs *GCSStorage) Write(name string, data []byte) error {
  119. name = trimLeading(name)
  120. log.Debugf("GCSStorage::Write::HTTPS(%s)", name)
  121. ctx := context.Background()
  122. writer := gs.bucket.Object(name).NewWriter(ctx)
  123. // Set chunksize to 0 to write files in one go. This prevents chunking of
  124. // upload into multiple parts, which requires additional memory for buffering
  125. // the sub-parts. To remain consistent with other storage implementations,
  126. // we would rather attempt to lower cost fast upload and fast-fail.
  127. writer.ChunkSize = 0
  128. // Write the data to GCS object
  129. if _, err := writer.Write(data); err != nil {
  130. return errors.Wrap(err, "upload gcs object")
  131. }
  132. // NOTE: Sometimes errors don't arrive during Write(), so we must also check
  133. // NOTE: the error returned by Close().
  134. if err := writer.Close(); err != nil {
  135. return errors.Wrap(err, "upload gcs object")
  136. }
  137. return nil
  138. }
  139. // Remove uses the relative path of the storage combined with the provided path to
  140. // remove a file from storage permanently.
  141. func (gs *GCSStorage) Remove(name string) error {
  142. name = trimLeading(name)
  143. log.Debugf("GCSStorage::Remove::HTTPS(%s)", name)
  144. ctx := context.Background()
  145. return gs.bucket.Object(name).Delete(ctx)
  146. }
  147. // Exists uses the relative path of the storage combined with the provided path to
  148. // determine if the file exists.
  149. func (gs *GCSStorage) Exists(name string) (bool, error) {
  150. name = trimLeading(name)
  151. //log.Debugf("GCSStorage::Exists(%s)", name)
  152. ctx := context.Background()
  153. _, err := gs.bucket.Object(name).Attrs(ctx)
  154. if err != nil {
  155. if gs.isDoesNotExist(err) {
  156. return false, nil
  157. }
  158. return false, errors.Wrap(err, "stat gcs object")
  159. }
  160. return true, nil
  161. }
  162. // List uses the relative path of the storage combined with the provided path to return
  163. // storage information for the files.
  164. func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
  165. path = trimLeading(path)
  166. log.Debugf("GCSStorage::List::HTTPS(%s)", path)
  167. ctx := context.Background()
  168. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  169. // object itself as one prefix item.
  170. if path != "" {
  171. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  172. }
  173. it := gs.bucket.Objects(ctx, &gcs.Query{
  174. Prefix: path,
  175. Delimiter: DirDelim,
  176. })
  177. // iterate over the objects at the path, collect storage info
  178. var stats []*StorageInfo
  179. for {
  180. attrs, err := it.Next()
  181. if err == iterator.Done {
  182. break
  183. }
  184. if err != nil {
  185. return nil, errors.Wrap(err, "list gcs objects")
  186. }
  187. // ignore the root path directory
  188. if attrs.Name == path {
  189. continue
  190. }
  191. stats = append(stats, &StorageInfo{
  192. Name: trimName(attrs.Name),
  193. Size: attrs.Size,
  194. ModTime: attrs.Updated,
  195. })
  196. }
  197. return stats, nil
  198. }
  199. func (gs *GCSStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  200. path = trimLeading(path)
  201. log.Debugf("GCSStorage::ListDirectories::HTTPS(%s)", path)
  202. ctx := context.Background()
  203. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  204. // object itself as one prefix item.
  205. if path != "" {
  206. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  207. }
  208. it := gs.bucket.Objects(ctx, &gcs.Query{
  209. Prefix: path,
  210. Delimiter: DirDelim,
  211. })
  212. // iterate over the objects at the path, collect storage info
  213. var stats []*StorageInfo
  214. for {
  215. attrs, err := it.Next()
  216. if err == iterator.Done {
  217. break
  218. }
  219. if err != nil {
  220. return nil, errors.Wrap(err, "list gcs prefixes")
  221. }
  222. // We filter directories using DirDelim, so a non empty prefix entry is a prefix(directory)
  223. // See gcs.ObjectAttrs Prefix property
  224. if attrs.Prefix != "" {
  225. stats = append(stats, &StorageInfo{
  226. Name: attrs.Prefix,
  227. Size: attrs.Size,
  228. ModTime: attrs.Updated,
  229. })
  230. }
  231. }
  232. return stats, nil
  233. }