gcsstorage.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package storage
  2. // Fork from Thanos GCS Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0.
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/gcs/gcs.go
  5. import (
  6. "context"
  7. "io"
  8. "strings"
  9. gcs "cloud.google.com/go/storage"
  10. "github.com/opencost/opencost/core/pkg/log"
  11. "github.com/pkg/errors"
  12. "golang.org/x/oauth2/google"
  13. "google.golang.org/api/iterator"
  14. "google.golang.org/api/option"
  15. "gopkg.in/yaml.v2"
  16. )
  17. // Config stores the configuration for gcs bucket.
  18. type GCSConfig struct {
  19. Bucket string `yaml:"bucket"`
  20. ServiceAccount string `yaml:"service_account"`
  21. }
  22. // GCSStorage is a storage.Storage implementation for Google Cloud Storage.
  23. type GCSStorage struct {
  24. name string
  25. bucket *gcs.BucketHandle
  26. client *gcs.Client
  27. }
  28. // NewGCSStorage creates a new GCSStorage instance using the provided GCS configuration.
  29. func NewGCSStorage(conf []byte) (*GCSStorage, error) {
  30. var gc GCSConfig
  31. if err := yaml.Unmarshal(conf, &gc); err != nil {
  32. return nil, err
  33. }
  34. return NewGCSStorageWith(gc)
  35. }
  36. // NewGCSStorageWith creates a new GCSStorage instance using the provided GCS configuration.
  37. func NewGCSStorageWith(gc GCSConfig) (*GCSStorage, error) {
  38. if gc.Bucket == "" {
  39. return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
  40. }
  41. ctx := context.Background()
  42. var opts []option.ClientOption
  43. // If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
  44. if gc.ServiceAccount != "" {
  45. credentials, err := google.CredentialsFromJSON(ctx, []byte(gc.ServiceAccount), gcs.ScopeFullControl)
  46. if err != nil {
  47. return nil, errors.Wrap(err, "failed to create credentials from JSON")
  48. }
  49. opts = append(opts, option.WithCredentials(credentials))
  50. }
  51. gcsClient, err := gcs.NewClient(ctx, opts...)
  52. if err != nil {
  53. return nil, err
  54. }
  55. return &GCSStorage{
  56. name: gc.Bucket,
  57. bucket: gcsClient.Bucket(gc.Bucket),
  58. client: gcsClient,
  59. }, nil
  60. }
  61. // Name returns the bucket name for gcs.
  62. func (gs *GCSStorage) Name() string {
  63. return gs.name
  64. }
  65. // StorageType returns a string identifier for the type of storage used by the implementation.
  66. func (gs *GCSStorage) StorageType() StorageType {
  67. return StorageTypeBucketGCS
  68. }
  69. // FullPath returns the storage working path combined with the path provided
  70. func (gs *GCSStorage) FullPath(name string) string {
  71. name = trimLeading(name)
  72. return name
  73. }
  74. // Stat returns the StorageStats for the specific path.
  75. func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
  76. name = trimLeading(name)
  77. //log.Debugf("GCSStorage::Stat(%s)", name)]
  78. ctx := context.Background()
  79. attrs, err := gs.bucket.Object(name).Attrs(ctx)
  80. if err != nil {
  81. if gs.isDoesNotExist(err) {
  82. return nil, DoesNotExistError
  83. }
  84. return nil, err
  85. }
  86. return &StorageInfo{
  87. Name: trimName(attrs.Name),
  88. Size: attrs.Size,
  89. ModTime: attrs.Updated,
  90. }, nil
  91. }
  92. // isDoesNotExist returns true if the error matches resource not exists errors.
  93. func (gs *GCSStorage) isDoesNotExist(err error) bool {
  94. return errors.Is(err, gcs.ErrObjectNotExist)
  95. }
  96. // Read uses the relative path of the storage combined with the provided path to
  97. // read the contents.
  98. func (gs *GCSStorage) Read(name string) ([]byte, error) {
  99. name = trimLeading(name)
  100. log.Debugf("GCSStorage::Read(%s)", name)
  101. ctx := context.Background()
  102. reader, err := gs.bucket.Object(name).NewReader(ctx)
  103. if err != nil {
  104. return nil, err
  105. }
  106. data, err := io.ReadAll(reader)
  107. if err != nil {
  108. return nil, err
  109. }
  110. return data, nil
  111. }
  112. // Write uses the relative path of the storage combined with the provided path
  113. // to write a new file or overwrite an existing file.
  114. func (gs *GCSStorage) Write(name string, data []byte) error {
  115. name = trimLeading(name)
  116. log.Debugf("GCSStorage::Write(%s)", name)
  117. ctx := context.Background()
  118. writer := gs.bucket.Object(name).NewWriter(ctx)
  119. // Set chunksize to 0 to write files in one go. This prevents chunking of
  120. // upload into multiple parts, which requires additional memory for buffering
  121. // the sub-parts. To remain consistent with other storage implementations,
  122. // we would rather attempt to lower cost fast upload and fast-fail.
  123. writer.ChunkSize = 0
  124. // Write the data to GCS object
  125. if _, err := writer.Write(data); err != nil {
  126. return errors.Wrap(err, "upload gcs object")
  127. }
  128. // NOTE: Sometimes errors don't arrive during Write(), so we must also check
  129. // NOTE: the error returned by Close().
  130. if err := writer.Close(); err != nil {
  131. return errors.Wrap(err, "upload gcs object")
  132. }
  133. return nil
  134. }
  135. // Remove uses the relative path of the storage combined with the provided path to
  136. // remove a file from storage permanently.
  137. func (gs *GCSStorage) Remove(name string) error {
  138. name = trimLeading(name)
  139. log.Debugf("GCSStorage::Remove(%s)", name)
  140. ctx := context.Background()
  141. return gs.bucket.Object(name).Delete(ctx)
  142. }
  143. // Exists uses the relative path of the storage combined with the provided path to
  144. // determine if the file exists.
  145. func (gs *GCSStorage) Exists(name string) (bool, error) {
  146. name = trimLeading(name)
  147. //log.Debugf("GCSStorage::Exists(%s)", name)
  148. ctx := context.Background()
  149. _, err := gs.bucket.Object(name).Attrs(ctx)
  150. if err != nil {
  151. if gs.isDoesNotExist(err) {
  152. return false, nil
  153. }
  154. return false, errors.Wrap(err, "stat gcs object")
  155. }
  156. return true, nil
  157. }
  158. // List uses the relative path of the storage combined with the provided path to return
  159. // storage information for the files.
  160. func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
  161. path = trimLeading(path)
  162. log.Debugf("GCSStorage::List(%s)", path)
  163. ctx := context.Background()
  164. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  165. // object itself as one prefix item.
  166. if path != "" {
  167. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  168. }
  169. it := gs.bucket.Objects(ctx, &gcs.Query{
  170. Prefix: path,
  171. Delimiter: DirDelim,
  172. })
  173. // iterate over the objects at the path, collect storage info
  174. var stats []*StorageInfo
  175. for {
  176. attrs, err := it.Next()
  177. if err == iterator.Done {
  178. break
  179. }
  180. if err != nil {
  181. return nil, errors.Wrap(err, "list gcs objects")
  182. }
  183. // ignore the root path directory
  184. if attrs.Name == path {
  185. continue
  186. }
  187. stats = append(stats, &StorageInfo{
  188. Name: trimName(attrs.Name),
  189. Size: attrs.Size,
  190. ModTime: attrs.Updated,
  191. })
  192. }
  193. return stats, nil
  194. }
  195. func (gs *GCSStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  196. path = trimLeading(path)
  197. log.Debugf("GCSStorage::List(%s)", path)
  198. ctx := context.Background()
  199. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  200. // object itself as one prefix item.
  201. if path != "" {
  202. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  203. }
  204. it := gs.bucket.Objects(ctx, &gcs.Query{
  205. Prefix: path,
  206. Delimiter: DirDelim,
  207. })
  208. // iterate over the objects at the path, collect storage info
  209. var stats []*StorageInfo
  210. for {
  211. attrs, err := it.Next()
  212. if err == iterator.Done {
  213. break
  214. }
  215. if err != nil {
  216. return nil, errors.Wrap(err, "list gcs objects")
  217. }
  218. // ignore the root path directory
  219. if attrs.Name == path {
  220. continue
  221. }
  222. // We filter directories using DirDelim, so a nameless entry is a dir
  223. // See gcs.ObjectAttrs Prefix property
  224. if attrs.Name == "" {
  225. stats = append(stats, &StorageInfo{
  226. Name: attrs.Prefix,
  227. Size: attrs.Size,
  228. ModTime: attrs.Updated,
  229. })
  230. }
  231. }
  232. return stats, nil
  233. }