gcsstorage.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package storage
  2. // Fork from Thanos GCS Bucket support to reuse configuration options
  3. // Licensed under the Apache License 2.0.
  4. // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/gcs/gcs.go
  5. import (
  6. "context"
  7. "io"
  8. "net/http"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. gcs "cloud.google.com/go/storage"
  13. "github.com/opencost/opencost/core/pkg/log"
  14. "github.com/pkg/errors"
  15. "golang.org/x/oauth2/google"
  16. "google.golang.org/api/googleapi"
  17. "google.golang.org/api/iterator"
  18. "google.golang.org/api/option"
  19. "gopkg.in/yaml.v2"
  20. )
  21. // Config stores the configuration for gcs bucket.
  22. type GCSConfig struct {
  23. Bucket string `yaml:"bucket"`
  24. ServiceAccount string `yaml:"service_account"`
  25. }
  26. // GCSStorage is a storage.Storage implementation for Google Cloud Storage.
  27. type GCSStorage struct {
  28. name string
  29. bucket *gcs.BucketHandle
  30. client *gcs.Client
  31. }
  32. // NewGCSStorage creates a new GCSStorage instance using the provided GCS configuration.
  33. func NewGCSStorage(conf []byte) (*GCSStorage, error) {
  34. var gc GCSConfig
  35. if err := yaml.Unmarshal(conf, &gc); err != nil {
  36. return nil, err
  37. }
  38. return NewGCSStorageWith(gc)
  39. }
  40. // NewGCSStorageWith creates a new GCSStorage instance using the provided GCS configuration.
  41. func NewGCSStorageWith(gc GCSConfig) (*GCSStorage, error) {
  42. if gc.Bucket == "" {
  43. return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
  44. }
  45. ctx := context.Background()
  46. var opts []option.ClientOption
  47. // If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
  48. if gc.ServiceAccount != "" {
  49. credentials, err := google.CredentialsFromJSON(ctx, []byte(gc.ServiceAccount), gcs.ScopeFullControl)
  50. if err != nil {
  51. return nil, errors.Wrap(err, "failed to create credentials from JSON")
  52. }
  53. opts = append(opts, option.WithCredentials(credentials))
  54. }
  55. // HTTPS Protocol Configuration: Google Cloud Storage client uses HTTPS by default.
  56. // The GCS client library (cloud.google.com/go/storage) automatically uses HTTPS
  57. // for all API calls. All GCS operations (read, write, delete, list) use HTTPS protocol.
  58. gcsClient, err := gcs.NewClient(ctx, opts...)
  59. if err != nil {
  60. return nil, err
  61. }
  62. log.Debugf("GCSStorage: New GCS client initialized with 'https://storage.googleapis.com/%s'", gc.Bucket)
  63. return &GCSStorage{
  64. name: gc.Bucket,
  65. bucket: gcsClient.Bucket(gc.Bucket),
  66. client: gcsClient,
  67. }, nil
  68. }
  69. // String returns the bucket name for gcs.
  70. func (gs *GCSStorage) String() string {
  71. return gs.name
  72. }
  73. // StorageType returns a string identifier for the type of storage used by the implementation.
  74. func (gs *GCSStorage) StorageType() StorageType {
  75. return StorageTypeBucketGCS
  76. }
  77. // FullPath returns the storage working path combined with the path provided
  78. func (gs *GCSStorage) FullPath(name string) string {
  79. name = trimLeading(name)
  80. return name
  81. }
  82. // Stat returns the StorageStats for the specific path.
  83. func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
  84. name = trimLeading(name)
  85. //log.Debugf("GCSStorage::Stat(%s)", name)]
  86. ctx := context.Background()
  87. attrs, err := gs.bucket.Object(name).Attrs(ctx)
  88. if err != nil {
  89. if gs.isDoesNotExist(err) {
  90. return nil, DoesNotExistError
  91. }
  92. return nil, err
  93. }
  94. return &StorageInfo{
  95. Name: trimName(attrs.Name),
  96. Size: attrs.Size,
  97. ModTime: attrs.Updated,
  98. }, nil
  99. }
  100. // isDoesNotExist returns true if the error matches resource not exists errors.
  101. func (gs *GCSStorage) isDoesNotExist(err error) bool {
  102. return errors.Is(err, gcs.ErrObjectNotExist)
  103. }
  104. // Read uses the relative path of the storage combined with the provided path to
  105. // read the contents.
  106. func (gs *GCSStorage) Read(name string) ([]byte, error) {
  107. name = trimLeading(name)
  108. log.Debugf("GCSStorage::Read::HTTPS(%s)", name)
  109. ctx := context.Background()
  110. reader, err := gs.bucket.Object(name).NewReader(ctx)
  111. if err != nil {
  112. // Normalize GCS "object not found" errors to DoesNotExistError for consistency
  113. if err == gcs.ErrObjectNotExist {
  114. return nil, DoesNotExistError
  115. }
  116. if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
  117. return nil, DoesNotExistError
  118. }
  119. return nil, err
  120. }
  121. defer reader.Close()
  122. data, err := io.ReadAll(reader)
  123. if err != nil {
  124. return nil, err
  125. }
  126. return data, nil
  127. }
  128. // ReadStream returns a streaming reader for the specified object path.
  129. func (gs *GCSStorage) ReadStream(path string) (io.ReadCloser, error) {
  130. path = trimLeading(path)
  131. log.Debugf("GCSStorage::ReadStream::HTTPS(%s)", path)
  132. ctx := context.Background()
  133. reader, err := gs.bucket.Object(path).NewReader(ctx)
  134. if err != nil {
  135. if err == gcs.ErrObjectNotExist {
  136. return nil, DoesNotExistError
  137. }
  138. if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
  139. return nil, DoesNotExistError
  140. }
  141. return nil, err
  142. }
  143. return reader, nil
  144. }
  145. // ReadToLocalFile streams the specified object at path to destPath on the local file system.
  146. func (gs *GCSStorage) ReadToLocalFile(path, destPath string) error {
  147. path = trimLeading(path)
  148. log.Debugf("GCSStorage::ReadToLocalFile::HTTPS(%s) -> %s", path, destPath)
  149. ctx := context.Background()
  150. reader, err := gs.bucket.Object(path).NewReader(ctx)
  151. if err != nil {
  152. // Normalize GCS "object not found" errors to DoesNotExistError for consistency
  153. if err == gcs.ErrObjectNotExist {
  154. return DoesNotExistError
  155. }
  156. if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
  157. return DoesNotExistError
  158. }
  159. return err
  160. }
  161. defer reader.Close()
  162. if err := os.MkdirAll(filepath.Dir(destPath), os.ModePerm); err != nil {
  163. return errors.Wrap(err, "creating destination directory")
  164. }
  165. f, err := os.Create(destPath)
  166. if err != nil {
  167. return errors.Wrapf(err, "creating destination file %s", destPath)
  168. }
  169. defer f.Close()
  170. // Use 1 MB buffer for streaming operations
  171. buf := make([]byte, 1024*1024)
  172. if _, err := io.CopyBuffer(f, reader, buf); err != nil {
  173. return errors.Wrapf(err, "streaming %s to %s", path, destPath)
  174. }
  175. return nil
  176. }
  177. // Write uses the relative path of the storage combined with the provided path
  178. // to write a new file or overwrite an existing file.
  179. func (gs *GCSStorage) Write(name string, data []byte) error {
  180. name = trimLeading(name)
  181. log.Debugf("GCSStorage::Write::HTTPS(%s)", name)
  182. ctx := context.Background()
  183. writer := gs.bucket.Object(name).NewWriter(ctx)
  184. // Set chunksize to 0 to write files in one go. This prevents chunking of
  185. // upload into multiple parts, which requires additional memory for buffering
  186. // the sub-parts. To remain consistent with other storage implementations,
  187. // we would rather attempt to lower cost fast upload and fast-fail.
  188. writer.ChunkSize = 0
  189. // Write the data to GCS object
  190. if _, err := writer.Write(data); err != nil {
  191. return errors.Wrap(err, "upload gcs object")
  192. }
  193. // NOTE: Sometimes errors don't arrive during Write(), so we must also check
  194. // NOTE: the error returned by Close().
  195. if err := writer.Close(); err != nil {
  196. return errors.Wrap(err, "upload gcs object")
  197. }
  198. return nil
  199. }
  200. // Remove uses the relative path of the storage combined with the provided path to
  201. // remove a file from storage permanently.
  202. func (gs *GCSStorage) Remove(name string) error {
  203. name = trimLeading(name)
  204. log.Debugf("GCSStorage::Remove::HTTPS(%s)", name)
  205. ctx := context.Background()
  206. return gs.bucket.Object(name).Delete(ctx)
  207. }
  208. // Exists uses the relative path of the storage combined with the provided path to
  209. // determine if the file exists.
  210. func (gs *GCSStorage) Exists(name string) (bool, error) {
  211. name = trimLeading(name)
  212. //log.Debugf("GCSStorage::Exists(%s)", name)
  213. ctx := context.Background()
  214. _, err := gs.bucket.Object(name).Attrs(ctx)
  215. if err != nil {
  216. if gs.isDoesNotExist(err) {
  217. return false, nil
  218. }
  219. return false, errors.Wrap(err, "stat gcs object")
  220. }
  221. return true, nil
  222. }
  223. // List uses the relative path of the storage combined with the provided path to return
  224. // storage information for the files.
  225. func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
  226. path = trimLeading(path)
  227. log.Debugf("GCSStorage::List::HTTPS(%s)", path)
  228. ctx := context.Background()
  229. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  230. // object itself as one prefix item.
  231. if path != "" {
  232. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  233. }
  234. it := gs.bucket.Objects(ctx, &gcs.Query{
  235. Prefix: path,
  236. Delimiter: DirDelim,
  237. })
  238. // iterate over the objects at the path, collect storage info
  239. var stats []*StorageInfo
  240. for {
  241. attrs, err := it.Next()
  242. if err == iterator.Done {
  243. break
  244. }
  245. if err != nil {
  246. return nil, errors.Wrap(err, "list gcs objects")
  247. }
  248. // ignore the root path directory
  249. if attrs.Name == path {
  250. continue
  251. }
  252. stats = append(stats, &StorageInfo{
  253. Name: trimName(attrs.Name),
  254. Size: attrs.Size,
  255. ModTime: attrs.Updated,
  256. })
  257. }
  258. return stats, nil
  259. }
  260. func (gs *GCSStorage) ListDirectories(path string) ([]*StorageInfo, error) {
  261. path = trimLeading(path)
  262. log.Debugf("GCSStorage::ListDirectories::HTTPS(%s)", path)
  263. ctx := context.Background()
  264. // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
  265. // object itself as one prefix item.
  266. if path != "" {
  267. path = strings.TrimSuffix(path, DirDelim) + DirDelim
  268. }
  269. it := gs.bucket.Objects(ctx, &gcs.Query{
  270. Prefix: path,
  271. Delimiter: DirDelim,
  272. })
  273. // iterate over the objects at the path, collect storage info
  274. var stats []*StorageInfo
  275. for {
  276. attrs, err := it.Next()
  277. if err == iterator.Done {
  278. break
  279. }
  280. if err != nil {
  281. return nil, errors.Wrap(err, "list gcs prefixes")
  282. }
  283. // We filter directories using DirDelim, so a non empty prefix entry is a prefix(directory)
  284. // See gcs.ObjectAttrs Prefix property
  285. if attrs.Prefix != "" {
  286. stats = append(stats, &StorageInfo{
  287. Name: attrs.Prefix,
  288. Size: attrs.Size,
  289. ModTime: attrs.Updated,
  290. })
  291. }
  292. }
  293. return stats, nil
  294. }