| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- package storage
- // Fork from Thanos GCS Bucket support to reuse configuration options
- // Licensed under the Apache License 2.0.
- // https://github.com/thanos-io/thanos/blob/main/pkg/objstore/gcs/gcs.go
- import (
- "context"
- "io"
- "strings"
- gcs "cloud.google.com/go/storage"
- "github.com/opencost/opencost/core/pkg/log"
- "github.com/pkg/errors"
- "golang.org/x/oauth2/google"
- "google.golang.org/api/iterator"
- "google.golang.org/api/option"
- "gopkg.in/yaml.v2"
- )
- // Config stores the configuration for gcs bucket.
- type GCSConfig struct {
- Bucket string `yaml:"bucket"`
- ServiceAccount string `yaml:"service_account"`
- }
- // GCSStorage is a storage.Storage implementation for Google Cloud Storage.
- type GCSStorage struct {
- name string
- bucket *gcs.BucketHandle
- client *gcs.Client
- }
- // NewGCSStorage creates a new GCSStorage instance using the provided GCS configuration.
- func NewGCSStorage(conf []byte) (*GCSStorage, error) {
- var gc GCSConfig
- if err := yaml.Unmarshal(conf, &gc); err != nil {
- return nil, err
- }
- return NewGCSStorageWith(gc)
- }
- // NewGCSStorageWith creates a new GCSStorage instance using the provided GCS configuration.
- func NewGCSStorageWith(gc GCSConfig) (*GCSStorage, error) {
- if gc.Bucket == "" {
- return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
- }
- ctx := context.Background()
- var opts []option.ClientOption
- // If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
- if gc.ServiceAccount != "" {
- credentials, err := google.CredentialsFromJSON(ctx, []byte(gc.ServiceAccount), gcs.ScopeFullControl)
- if err != nil {
- return nil, errors.Wrap(err, "failed to create credentials from JSON")
- }
- opts = append(opts, option.WithCredentials(credentials))
- }
- gcsClient, err := gcs.NewClient(ctx, opts...)
- if err != nil {
- return nil, err
- }
- return &GCSStorage{
- name: gc.Bucket,
- bucket: gcsClient.Bucket(gc.Bucket),
- client: gcsClient,
- }, nil
- }
- // Name returns the bucket name for gcs.
- func (gs *GCSStorage) Name() string {
- return gs.name
- }
- // StorageType returns a string identifier for the type of storage used by the implementation.
- func (gs *GCSStorage) StorageType() StorageType {
- return StorageTypeBucketGCS
- }
- // FullPath returns the storage working path combined with the path provided
- func (gs *GCSStorage) FullPath(name string) string {
- name = trimLeading(name)
- return name
- }
- // Stat returns the StorageStats for the specific path.
- func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
- name = trimLeading(name)
- //log.Debugf("GCSStorage::Stat(%s)", name)]
- ctx := context.Background()
- attrs, err := gs.bucket.Object(name).Attrs(ctx)
- if err != nil {
- if gs.isDoesNotExist(err) {
- return nil, DoesNotExistError
- }
- return nil, err
- }
- return &StorageInfo{
- Name: trimName(attrs.Name),
- Size: attrs.Size,
- ModTime: attrs.Updated,
- }, nil
- }
- // isDoesNotExist returns true if the error matches resource not exists errors.
- func (gs *GCSStorage) isDoesNotExist(err error) bool {
- msg := err.Error()
- return msg == gcs.ErrBucketNotExist.Error() || msg == gcs.ErrObjectNotExist.Error()
- }
- // Read uses the relative path of the storage combined with the provided path to
- // read the contents.
- func (gs *GCSStorage) Read(name string) ([]byte, error) {
- name = trimLeading(name)
- log.Debugf("GCSStorage::Read(%s)", name)
- ctx := context.Background()
- reader, err := gs.bucket.Object(name).NewReader(ctx)
- if err != nil {
- return nil, err
- }
- data, err := io.ReadAll(reader)
- if err != nil {
- return nil, err
- }
- return data, nil
- }
- // Write uses the relative path of the storage combined with the provided path
- // to write a new file or overwrite an existing file.
- func (gs *GCSStorage) Write(name string, data []byte) error {
- name = trimLeading(name)
- log.Debugf("GCSStorage::Write(%s)", name)
- ctx := context.Background()
- writer := gs.bucket.Object(name).NewWriter(ctx)
- // Set chunksize to 0 to write files in one go. This prevents chunking of
- // upload into multiple parts, which requires additional memory for buffering
- // the sub-parts. To remain consistent with other storage implementations,
- // we would rather attempt to lower cost fast upload and fast-fail.
- writer.ChunkSize = 0
- // Write the data to GCS object
- if _, err := writer.Write(data); err != nil {
- return errors.Wrap(err, "upload gcs object")
- }
- // NOTE: Sometimes errors don't arrive during Write(), so we must also check
- // NOTE: the error returned by Close().
- if err := writer.Close(); err != nil {
- return errors.Wrap(err, "upload gcs object")
- }
- return nil
- }
- // Remove uses the relative path of the storage combined with the provided path to
- // remove a file from storage permanently.
- func (gs *GCSStorage) Remove(name string) error {
- name = trimLeading(name)
- log.Debugf("GCSStorage::Remove(%s)", name)
- ctx := context.Background()
- return gs.bucket.Object(name).Delete(ctx)
- }
- // Exists uses the relative path of the storage combined with the provided path to
- // determine if the file exists.
- func (gs *GCSStorage) Exists(name string) (bool, error) {
- name = trimLeading(name)
- //log.Debugf("GCSStorage::Exists(%s)", name)
- ctx := context.Background()
- _, err := gs.bucket.Object(name).Attrs(ctx)
- if err != nil {
- if gs.isDoesNotExist(err) {
- return false, nil
- }
- return false, errors.Wrap(err, "stat gcs object")
- }
- return true, nil
- }
- // List uses the relative path of the storage combined with the provided path to return
- // storage information for the files.
- func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
- path = trimLeading(path)
- log.Debugf("GCSStorage::List(%s)", path)
- ctx := context.Background()
- // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
- // object itself as one prefix item.
- if path != "" {
- path = strings.TrimSuffix(path, DirDelim) + DirDelim
- }
- it := gs.bucket.Objects(ctx, &gcs.Query{
- Prefix: path,
- Delimiter: DirDelim,
- })
- // iterate over the objects at the path, collect storage info
- var stats []*StorageInfo
- for {
- attrs, err := it.Next()
- if err == iterator.Done {
- break
- }
- if err != nil {
- return nil, errors.Wrap(err, "list gcs objects")
- }
- // ignore the root path directory
- if attrs.Name == path {
- continue
- }
- stats = append(stats, &StorageInfo{
- Name: trimName(attrs.Name),
- Size: attrs.Size,
- ModTime: attrs.Updated,
- })
- }
- return stats, nil
- }
- func (gs *GCSStorage) ListDirectories(path string) ([]*StorageInfo, error) {
- path = trimLeading(path)
- log.Debugf("GCSStorage::List(%s)", path)
- ctx := context.Background()
- // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
- // object itself as one prefix item.
- if path != "" {
- path = strings.TrimSuffix(path, DirDelim) + DirDelim
- }
- it := gs.bucket.Objects(ctx, &gcs.Query{
- Prefix: path,
- Delimiter: DirDelim,
- })
- // iterate over the objects at the path, collect storage info
- var stats []*StorageInfo
- for {
- attrs, err := it.Next()
- if err == iterator.Done {
- break
- }
- if err != nil {
- return nil, errors.Wrap(err, "list gcs objects")
- }
- // ignore the root path directory
- if attrs.Name == path {
- continue
- }
- // We filter directories using DirDelim, so a nameless entry is a dir
- // See gcs.ObjectAttrs Prefix property
- if attrs.Name == "" {
- stats = append(stats, &StorageInfo{
- Name: attrs.Prefix,
- Size: attrs.Size,
- ModTime: attrs.Updated,
- })
- }
- }
- return stats, nil
- }
|