Explorar o código

Add GetAssetKey and ListDirectories for Federator

Signed-off-by: Kaelan Patel <kaelanspatel@gmail.com>
Kaelan Patel %!s(int64=3) %!d(string=hai) anos
pai
achega
205458f3da

+ 4 - 0
pkg/kubecost/asset.go

@@ -320,6 +320,10 @@ func key(a Asset, aggregateBy []string) (string, error) {
 	return buffer.String(), nil
 }
 
+func GetAssetKey(a Asset, aggregateBy []string) (string, error) {
+	return key(a, aggregateBy)
+}
+
 func toString(a Asset) string {
 	return fmt.Sprintf("%s{%s}%s=%.2f", a.Type().String(), a.GetProperties(), a.GetWindow(), a.TotalCost())
 }

+ 87 - 0
pkg/storage/azurestorage.go

@@ -271,6 +271,28 @@ func (b *AzureStorage) Stat(name string) (*StorageInfo, error) {
 	}, nil
 }
 
+func (b *AzureStorage) StatDirectories(name string) (*StorageInfo, error) {
+	name = trimLeading(name)
+	ctx := context.Background()
+
+	blobURL := getBlobURL(name, b.containerURL)
+	props, err := blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
+	if err != nil {
+		return nil, err
+	}
+
+	if trimName(name) == "" {
+		return &StorageInfo{
+			Name:    trimName(name),
+			Size:    props.ContentLength(),
+			ModTime: props.LastModified(),
+		}, nil
+	} else {
+		return nil, fmt.Errorf("non-directory in dir")
+	}
+
+}
+
 // Read uses the relative path of the storage combined with the provided path to
 // read the contents.
 func (b *AzureStorage) Read(name string) ([]byte, error) {
@@ -412,6 +434,71 @@ func (b *AzureStorage) List(path string) ([]*StorageInfo, error) {
 	return stats, nil
 }
 
+func (b *AzureStorage) ListDirectories(path string) ([]*StorageInfo, error) {
+	path = trimLeading(path)
+
+	log.Debugf("AzureStorage::List(%s)", path)
+	ctx := context.Background()
+
+	// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
+	// object itself as one prefix item.
+	if path != "" {
+		path = strings.TrimSuffix(path, DirDelim) + DirDelim
+	}
+
+	marker := blob.Marker{}
+	listOptions := blob.ListBlobsSegmentOptions{Prefix: path}
+
+	var names []string
+	for i := 1; ; i++ {
+		var blobItems []blob.BlobItemInternal
+
+		list, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, DirDelim, listOptions)
+		if err != nil {
+			return nil, errors.Wrapf(err, "cannot list hierarchy blobs with prefix %s (iteration #%d)", path, i)
+		}
+
+		marker = list.NextMarker
+		blobItems = list.Segment.BlobItems
+
+		for _, blob := range blobItems {
+			names = append(names, blob.Name)
+		}
+
+		// Continue iterating if we are not done.
+		if !marker.NotDone() {
+			break
+		}
+
+		log.Debugf("Requesting next iteration of listing blobs. Entries: %d, iteration: %d", len(names), i)
+	}
+
+	// get the storage information for each blob (really unfortunate we have to do this)
+	var lock sync.Mutex
+	var stats []*StorageInfo
+	var wg sync.WaitGroup
+	wg.Add(len(names))
+
+	for i := 0; i < len(names); i++ {
+		go func(n string) {
+			defer wg.Done()
+
+			stat, err := b.StatDirectories(n)
+			if err != nil {
+				log.Errorf("Error statting blob %s: %s", n, err)
+			} else {
+				lock.Lock()
+				stats = append(stats, stat)
+				lock.Unlock()
+			}
+		}(names[i])
+	}
+
+	wg.Wait()
+
+	return stats, nil
+}
+
 // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
 func (b *AzureStorage) isObjNotFoundErr(err error) bool {
 	if err == nil {

+ 5 - 0
pkg/storage/filestorage.go

@@ -1,6 +1,7 @@
 package storage
 
 import (
+	"fmt"
 	gofs "io/fs"
 	"io/ioutil"
 	"os"
@@ -60,6 +61,10 @@ func (fs *FileStorage) List(path string) ([]*StorageInfo, error) {
 	return FilesToStorageInfo(files), nil
 }
 
+func (fs *FileStorage) ListDirectories(path string) ([]*StorageInfo, error) {
+	return []*StorageInfo{}, fmt.Errorf("ListDirectories not supported for filestorage")
+}
+
 // Read uses the relative path of the storage combined with the provided path to
 // read the contents.
 func (fs *FileStorage) Read(path string) ([]byte, error) {

+ 47 - 0
pkg/storage/gcsstorage.go

@@ -236,3 +236,50 @@ func (gs *GCSStorage) List(path string) ([]*StorageInfo, error) {
 
 	return stats, nil
 }
+
+func (gs *GCSStorage) ListDirectories(path string) ([]*StorageInfo, error) {
+	path = trimLeading(path)
+
+	log.Debugf("GCSStorage::List(%s)", path)
+	ctx := context.Background()
+
+	// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
+	// object itself as one prefix item.
+	if path != "" {
+		path = strings.TrimSuffix(path, DirDelim) + DirDelim
+	}
+
+	it := gs.bucket.Objects(ctx, &gcs.Query{
+		Prefix:    path,
+		Delimiter: DirDelim,
+	})
+
+	// iterate over the objects at the path, collect storage info
+	var stats []*StorageInfo
+	for {
+		attrs, err := it.Next()
+		if err == iterator.Done {
+			break
+		}
+		if err != nil {
+			return nil, errors.Wrap(err, "list gcs objects")
+		}
+
+		// ignore the root path directory
+		if attrs.Name == path {
+			continue
+		}
+
+		// If trim removes the entire name, it's a directory, ergo we list it
+		if trimName(attrs.Name) == "" {
+			stats = append(stats, &StorageInfo{
+				Name:    attrs.Name,
+				Size:    attrs.Size,
+				ModTime: attrs.Updated,
+			})
+		}
+
+	}
+
+	return stats, nil
+}

+ 46 - 1
pkg/storage/s3storage.go

@@ -168,7 +168,7 @@ type S3Storage struct {
 // parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
 func parseS3Config(conf []byte) (S3Config, error) {
 	config := defaultS3Config
-	if err := yaml.UnmarshalStrict(conf, &config); err != nil {
+	if err := yaml.Unmarshal(conf, &config); err != nil {
 		return S3Config{}, err
 	}
 
@@ -480,6 +480,51 @@ func (s3 *S3Storage) List(path string) ([]*StorageInfo, error) {
 	return stats, nil
 }
 
+func (s3 *S3Storage) ListDirectories(path string) ([]*StorageInfo, error) {
+	path = trimLeading(path)
+
+	log.Debugf("S3Storage::List(%s)", path)
+	ctx := context.Background()
+
+	if path != "" {
+		path = strings.TrimSuffix(path, DirDelim) + DirDelim
+	}
+
+	opts := minio.ListObjectsOptions{
+		Prefix:    path,
+		Recursive: false,
+		UseV1:     s3.listObjectsV1,
+	}
+
+	var stats []*StorageInfo
+	for object := range s3.client.ListObjects(ctx, s3.name, opts) {
+
+		if object.Err != nil {
+			return nil, object.Err
+		}
+
+		if object.Key == "" {
+			continue
+		}
+
+		if object.Key == path {
+			continue
+		}
+
+		// If trim removes the entire name, it's a directory, ergo we list it
+		if trimName(object.Key) == "" {
+			stats = append(stats, &StorageInfo{
+				Name:    object.Key,
+				Size:    object.Size,
+				ModTime: object.LastModified,
+			})
+		}
+
+	}
+
+	return stats, nil
+}
+
 // getServerSideEncryption returns the SSE to use.
 func (s3 *S3Storage) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
 	if value := ctx.Value(sseConfigKey); value != nil {

+ 2 - 0
pkg/storage/storage.go

@@ -51,6 +51,8 @@ type Storage interface {
 	// List uses the relative path of the storage combined with the provided path to return
 	// storage information for the files.
 	List(path string) ([]*StorageInfo, error)
+
+	ListDirectories(path string) ([]*StorageInfo, error)
 }
 
 // Validate uses the provided storage implementation to write a test file to the store, followed by a removal.