Ver Fonte

Add GCS Storage implementation

Matt Bolt há 4 anos atrás
pai
commit
f6ae344e39
5 ficheiros alterados com 269 adições e 8 exclusões
  1. 2 1
      go.mod
  2. 4 4
      pkg/storage/bucketstorage.go
  3. 255 0
      pkg/storage/gcsstorage.go
  4. 5 3
      pkg/storage/s3storage.go
  5. 3 0
      pkg/storage/storage.go

+ 2 - 1
go.mod

@@ -5,6 +5,7 @@ replace github.com/golang/lint => golang.org/x/lint v0.0.0-20180702182130-06c868
 require (
 require (
 	cloud.google.com/go v0.81.0
 	cloud.google.com/go v0.81.0
 	cloud.google.com/go/bigquery v1.8.0
 	cloud.google.com/go/bigquery v1.8.0
+	cloud.google.com/go/storage v1.10.0
 	github.com/Azure/azure-sdk-for-go v61.6.0+incompatible
 	github.com/Azure/azure-sdk-for-go v61.6.0+incompatible
 	github.com/Azure/go-autorest/autorest v0.11.27
 	github.com/Azure/go-autorest/autorest v0.11.27
 	github.com/Azure/go-autorest/autorest/azure/auth v0.5.11
 	github.com/Azure/go-autorest/autorest/azure/auth v0.5.11
@@ -24,6 +25,7 @@ require (
 	github.com/json-iterator/go v1.1.12
 	github.com/json-iterator/go v1.1.12
 	github.com/jszwec/csvutil v1.2.1
 	github.com/jszwec/csvutil v1.2.1
 	github.com/julienschmidt/httprouter v1.3.0
 	github.com/julienschmidt/httprouter v1.3.0
+	github.com/kubecost/events v0.0.3
 	github.com/lib/pq v1.2.0
 	github.com/lib/pq v1.2.0
 	github.com/microcosm-cc/bluemonday v1.0.16
 	github.com/microcosm-cc/bluemonday v1.0.16
 	github.com/minio/minio-go/v7 v7.0.15
 	github.com/minio/minio-go/v7 v7.0.15
@@ -89,7 +91,6 @@ require (
 	github.com/jstemmer/go-junit-report v0.9.1 // indirect
 	github.com/jstemmer/go-junit-report v0.9.1 // indirect
 	github.com/klauspost/compress v1.13.5 // indirect
 	github.com/klauspost/compress v1.13.5 // indirect
 	github.com/klauspost/cpuid v1.3.1 // indirect
 	github.com/klauspost/cpuid v1.3.1 // indirect
-	github.com/kubecost/events v0.0.3 // indirect
 	github.com/magiconair/properties v1.8.5 // indirect
 	github.com/magiconair/properties v1.8.5 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/minio/md5-simd v1.1.0 // indirect
 	github.com/minio/md5-simd v1.1.0 // indirect

+ 4 - 4
pkg/storage/bucketstorage.go

@@ -12,9 +12,9 @@ import (
 type StorageProvider string
 type StorageProvider string
 
 
 const (
 const (
-	S3 StorageProvider = "S3"
+	S3  StorageProvider = "S3"
+	GCS StorageProvider = "GCS"
 	// AZURE StorageProvider = "AZURE"
 	// AZURE StorageProvider = "AZURE"
-	// GCS   StorageProvider = "GCS"
 )
 )
 
 
 // StorageConfig is the configuration type used as the "parent" configuration. It contains a type, which will
 // StorageConfig is the configuration type used as the "parent" configuration. It contains a type, which will
@@ -43,8 +43,8 @@ func NewBucketStorage(config []byte) (Storage, error) {
 	switch strings.ToUpper(string(storageConfig.Type)) {
 	switch strings.ToUpper(string(storageConfig.Type)) {
 	case string(S3):
 	case string(S3):
 		storage, err = NewS3Storage(config)
 		storage, err = NewS3Storage(config)
-	//case string(GCS):
-	//	storage, err = NewGCSStorage(config)
+	case string(GCS):
+		storage, err = NewGCSStorage(config)
 	//case string(AZURE):
 	//case string(AZURE):
 	//	storage, err = NewAzureStorage(config)
 	//	storage, err = NewAzureStorage(config)
 	default:
 	default:

+ 255 - 0
pkg/storage/gcsstorage.go

@@ -0,0 +1,255 @@
+// Fork from Thanos GCS Bucket support to reuse configuration options
+// Licensed under the Apache License 2.0.
+// https://github.com/thanos-io/thanos/blob/main/pkg/objstore/gcs/gcs.go
+package storage
+
+import (
+	"context"
+	"io/ioutil"
+	"strings"
+
+	gcs "cloud.google.com/go/storage"
+	"github.com/kubecost/cost-model/pkg/log"
+	"github.com/pkg/errors"
+	"golang.org/x/oauth2/google"
+	"google.golang.org/api/iterator"
+	"google.golang.org/api/option"
+	"gopkg.in/yaml.v2"
+)
+
+// Config stores the configuration for gcs bucket.
+type GCSConfig struct {
+	Bucket         string `yaml:"bucket"`
+	ServiceAccount string `yaml:"service_account"`
+}
+
+// GCSStorage is a storage.Storage implementation for Google Cloud Storage.
+type GCSStorage struct {
+	name   string
+	bucket *gcs.BucketHandle
+	client *gcs.Client
+}
+
+// NewGCSStorage creates a new GCSStorage instance using the provided GCS configuration.
+func NewGCSStorage(conf []byte) (*GCSStorage, error) {
+	var gc GCSConfig
+	if err := yaml.Unmarshal(conf, &gc); err != nil {
+		return nil, err
+	}
+
+	return NewGCSStorageWith(gc)
+}
+
+// NewGCSStorageWith creates a new GCSStorage instance using the provided GCS configuration.
+func NewGCSStorageWith(gc GCSConfig) (*GCSStorage, error) {
+	if gc.Bucket == "" {
+		return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
+	}
+
+	ctx := context.Background()
+	var opts []option.ClientOption
+
+	// If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
+	if gc.ServiceAccount != "" {
+		credentials, err := google.CredentialsFromJSON(ctx, []byte(gc.ServiceAccount), gcs.ScopeFullControl)
+		if err != nil {
+			return nil, errors.Wrap(err, "failed to create credentials from JSON")
+		}
+		opts = append(opts, option.WithCredentials(credentials))
+	}
+
+	gcsClient, err := gcs.NewClient(ctx, opts...)
+	if err != nil {
+		return nil, err
+	}
+
+	return &GCSStorage{
+		name:   gc.Bucket,
+		bucket: gcsClient.Bucket(gc.Bucket),
+		client: gcsClient,
+	}, nil
+}
+
+// Name returns the bucket name for gcs.
+func (gs *GCSStorage) Name() string {
+	return gs.name
+}
+
+// FullPath returns the storage working path combined with the path provided
+func (gs *GCSStorage) FullPath(name string) string {
+	name = gs.trimLeading(name)
+
+	return name
+}
+
+// Stat returns the StorageStats for the specific path.
+func (gs *GCSStorage) Stat(name string) (*StorageInfo, error) {
+	name = gs.trimLeading(name)
+	//log.Infof("GCSStorage::Stat(%s)", name)]
+
+	ctx := context.Background()
+	attrs, err := gs.bucket.Object(name).Attrs(ctx)
+	if err != nil {
+		if gs.isDoesNotExist(err) {
+			return nil, DoesNotExistError
+		}
+		return nil, err
+	}
+
+	return &StorageInfo{
+		Name:    gs.trimName(attrs.Name),
+		Size:    attrs.Size,
+		ModTime: attrs.Updated,
+	}, nil
+}
+
+// isDoesNotExist returns true if the error matches resource not exists errors.
+func (gs *GCSStorage) isDoesNotExist(err error) bool {
+	msg := err.Error()
+	return msg == gcs.ErrBucketNotExist.Error() || msg == gcs.ErrObjectNotExist.Error()
+}
+
+// Read uses the relative path of the storage combined with the provided path to
+// read the contents.
+func (gs *GCSStorage) Read(name string) ([]byte, error) {
+	name = gs.trimLeading(name)
+	log.Infof("GCSStorage::Read(%s)", name)
+
+	ctx := context.Background()
+	reader, err := gs.bucket.Object(name).NewReader(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	data, err := ioutil.ReadAll(reader)
+	if err != nil {
+		return nil, err
+	}
+
+	return data, nil
+}
+
+// Write uses the relative path of the storage combined with the provided path
+// to write a new file or overwrite an existing file.
+func (gs *GCSStorage) Write(name string, data []byte) error {
+	name = gs.trimLeading(name)
+	log.Infof("GCSStorage::Write(%s)", name)
+
+	ctx := context.Background()
+
+	writer := gs.bucket.Object(name).NewWriter(ctx)
+	// Set chunksize to 0 to write files in one go. This prevents chunking of
+	// upload into multiple parts, which requires additional memory for buffering
+	// the sub-parts. To remain consistent with other storage implementations,
+	// we would rather attempt to lower cost fast upload and fast-fail.
+	writer.ChunkSize = 0
+
+	// Write the data to GCS object
+	if _, err := writer.Write(data); err != nil {
+		return errors.Wrap(err, "upload gcs object")
+	}
+
+	// NOTE: Sometimes errors don't arrive during Write(), so we must also check
+	// NOTE: the error returned by Close().
+	if err := writer.Close(); err != nil {
+		return errors.Wrap(err, "upload gcs object")
+	}
+	return nil
+}
+
+// Remove uses the relative path of the storage combined with the provided path to
+// remove a file from storage permanently.
+func (gs *GCSStorage) Remove(name string) error {
+	name = gs.trimLeading(name)
+
+	log.Infof("GCSStorage::Remove(%s)", name)
+	ctx := context.Background()
+
+	return gs.bucket.Object(name).Delete(ctx)
+}
+
+// Exists uses the relative path of the storage combined with the provided path to
+// determine if the file exists.
+func (gs *GCSStorage) Exists(name string) (bool, error) {
+	name = gs.trimLeading(name)
+	//log.Infof("GCSStorage::Exists(%s)", name)
+
+	ctx := context.Background()
+	_, err := gs.bucket.Object(name).Attrs(ctx)
+	if err != nil {
+		if gs.isDoesNotExist(err) {
+			return false, nil
+		}
+		return false, errors.Wrap(err, "stat gcs object")
+	}
+
+	return true, nil
+}
+
+// List uses the relative path of the storage combined with the provided path to return
+// storage information for the files.
+func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
+	path = gs.trimLeading(path)
+
+	log.Infof("GCSStorage::List(%s)", path)
+	ctx := context.Background()
+
+	// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
+	// object itself as one prefix item.
+	if path != "" {
+		path = strings.TrimSuffix(path, DirDelim) + DirDelim
+	}
+
+	it := gs.bucket.Objects(ctx, &gcs.Query{
+		Prefix:    path,
+		Delimiter: DirDelim,
+	})
+
+	// iterate over the objects at the path, collect storage info
+	var stats []*StorageInfo
+	for {
+		attrs, err := it.Next()
+		if err == iterator.Done {
+			break
+		}
+		if err != nil {
+			return nil, errors.Wrap(err, "list gcs objects")
+		}
+
+		// ignore the root path directory
+		if attrs.Name == path {
+			continue
+		}
+
+		stats = append(stats, &StorageInfo{
+			Name:    gs.trimName(attrs.Name),
+			Size:    attrs.Size,
+			ModTime: attrs.Updated,
+		})
+	}
+
+	return stats, nil
+}
+
+// trimLeading removes a leading / from the file name
+func (gs *GCSStorage) trimLeading(file string) string {
+	if len(file) == 0 {
+		return file
+	}
+
+	if file[0] == '/' {
+		return file[1:]
+	}
+	return file
+}
+
+// trimName removes the leading directory prefix
+func (gs *GCSStorage) trimName(file string) string {
+	slashIndex := strings.LastIndex(file, "/")
+	if slashIndex < 0 {
+		return file
+	}
+
+	name := file[slashIndex+1:]
+	return name
+}

+ 5 - 3
pkg/storage/s3storage.go

@@ -29,9 +29,6 @@ import (
 type ctxKey int
 type ctxKey int
 
 
 const (
 const (
-	// DirDelim is the delimiter used to model a directory structure in an object store bucket.
-	DirDelim = "/"
-
 	// SSEKMS is the name of the SSE-KMS method for objectstore encryption.
 	// SSEKMS is the name of the SSE-KMS method for objectstore encryption.
 	SSEKMS = "SSE-KMS"
 	SSEKMS = "SSE-KMS"
 
 
@@ -371,6 +368,11 @@ func (s3 *S3Storage) Write(name string, data []byte) error {
 	}
 	}
 
 
 	var size int64 = int64(len(data))
 	var size int64 = int64(len(data))
+
+	// Set partSize to 0 to write files in one go. This prevents chunking of
+	// upload into multiple parts, which requires additional memory for buffering
+	// the sub-parts. To remain consistent with other storage implementations,
+	// we would rather attempt to lower cost fast upload and fast-fail.
 	var partSize uint64 = 0
 	var partSize uint64 = 0
 
 
 	r := bytes.NewReader(data)
 	r := bytes.NewReader(data)

+ 3 - 0
pkg/storage/storage.go

@@ -5,6 +5,9 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+// DirDelim is the delimiter used to model a directory structure in an object store bucket.
+const DirDelim = "/"
+
 // DoesNotExistError is used as a generic error to return when a target path does not
 // DoesNotExistError is used as a generic error to return when a target path does not
 // exist in storage. Equivalent to os.ErrorNotExist such that it will work with os.IsNotExist(err)
 // exist in storage. Equivalent to os.ErrorNotExist such that it will work with os.IsNotExist(err)
 var DoesNotExistError = os.ErrNotExist
 var DoesNotExistError = os.ErrNotExist