Sfoglia il codice sorgente

Merge pull request #1991 from klaasjand/custom-s3

Add custom S3 CSV billing export endpoint
Matt Ray 2 anni fa
parent
commit
326cf1be04
1 ha cambiato i file con 62 aggiunte e 21 eliminazioni
  1. 62 21
      pkg/filemanager/filemanager.go

+ 62 - 21
pkg/filemanager/filemanager.go

@@ -7,6 +7,7 @@ import (
 	"io"
 	"net/url"
 	"os"
+	"path"
 	"path/filepath"
 	"strings"
 	"time"
@@ -34,20 +35,23 @@ type FileManager interface {
 // - s3://bucket-name/path/to/file.csv
 // - gs://bucket-name/path/to/file.csv
 // - https://azblobaccount.blob.core.windows.net/containerName/path/to/file.csv
+// - alts3://fqdn:port/bucket-name/path/to/file.csv
 // - local/file/path.csv
 
-func NewFileManager(path string) (FileManager, error) {
+func NewFileManager(filePath string) (FileManager, error) {
 	switch {
-	case strings.HasPrefix(path, "s3://"):
-		return NewS3File(path)
-	case strings.HasPrefix(path, "gs://"):
-		return NewGCSStorageFile(path)
-	case strings.Contains(path, "blob.core.windows.net"):
-		return NewAzureBlobFile(path)
-	case path == "":
+	case strings.HasPrefix(filePath, "s3://"):
+		return NewS3File(filePath)
+	case strings.HasPrefix(filePath, "gs://"):
+		return NewGCSStorageFile(filePath)
+	case strings.Contains(filePath, "blob.core.windows.net"):
+		return NewAzureBlobFile(filePath)
+	case strings.HasPrefix(filePath, "alts3://"):
+		return NewAltS3File(filePath)
+	case filePath == "":
 		return nil, errors.New("empty path")
 	default:
-		return NewSystemFile(path), nil
+		return NewSystemFile(filePath), nil
 	}
 }
 
@@ -85,8 +89,8 @@ type S3File struct {
 	key      string
 }
 
-func NewS3File(path string) (*S3File, error) {
-	u, err := url.Parse(path)
+func NewS3File(filePath string) (*S3File, error) {
+	u, err := url.Parse(filePath)
 	if err != nil {
 		return nil, err
 	}
@@ -95,7 +99,7 @@ func NewS3File(path string) (*S3File, error) {
 	key := strings.TrimPrefix(u.Path, "/")
 
 	if bucket == "" || key == "" {
-		return nil, fmt.Errorf("invalid s3 path: %s", path)
+		return nil, fmt.Errorf("invalid s3 path: %s", filePath)
 	}
 
 	cfg, err := config.LoadDefaultConfig(context.Background())
@@ -110,6 +114,43 @@ func NewS3File(path string) (*S3File, error) {
 	}, nil
 }
 
+func NewAltS3File(filePath string) (*S3File, error) {
+	u, err := url.Parse(filePath)
+	if err != nil {
+		return nil, err
+	}
+
+	clPath := path.Clean(u.Path)
+
+	if len(strings.Split(clPath, "/")) < 3 {
+		return nil, fmt.Errorf("invalid s3 path: %s", filePath)
+	}
+
+	// Extract bucket and path from url
+	bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
+
+	if bucket == "" || key == "" {
+		return nil, fmt.Errorf("invalid s3 path: %s", filePath)
+	}
+
+	cfg, err := config.LoadDefaultConfig(context.Background())
+	if err != nil {
+		return nil, err
+	}
+
+	return &S3File{
+		s3Client: s3.NewFromConfig(cfg, func(o *s3.Options) {
+			// Always use https for the endpoint when using an alternative s3 url.
+			// NOTE: From service/s3 v1.38.0 and onwards use EndpointResolverV2 as described in the AWS SDK docs.
+			o.EndpointResolver = s3.EndpointResolverFromURL(fmt.Sprintf("https://%v", u.Host), func(e *aws.Endpoint) {
+				e.HostnameImmutable = true
+			})
+		}),
+		bucket: bucket, // bucket
+		key:    key,    // path/to/file.csv
+	}, nil
+}
+
 func (c *S3File) Download(ctx context.Context, f *os.File) error {
 	_, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
 		Bucket: aws.String(c.bucket),
@@ -140,9 +181,9 @@ type GCSStorageFile struct {
 	client *storage.Client
 }
 
-func NewGCSStorageFile(path string) (*GCSStorageFile, error) {
-	path = strings.TrimPrefix(path, "gs://")
-	parts := strings.SplitN(path, "/", 2)
+func NewGCSStorageFile(filePath string) (*GCSStorageFile, error) {
+	filePath = strings.TrimPrefix(filePath, "gs://")
+	parts := strings.SplitN(filePath, "/", 2)
 	if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
 		return nil, errors.New("invalid GCS path")
 	}
@@ -184,16 +225,16 @@ func (g *GCSStorageFile) Upload(ctx context.Context, f *os.File) error {
 	return w.Close()
 }
 
-func NewSystemFile(path string) *SystemFile {
-	return &SystemFile{path: path}
+func NewSystemFile(filePath string) *SystemFile {
+	return &SystemFile{filePath: filePath}
 }
 
 type SystemFile struct {
-	path string
+	filePath string
 }
 
 func (s *SystemFile) Download(ctx context.Context, f *os.File) error {
-	sFile, err := os.Open(s.path)
+	sFile, err := os.Open(s.filePath)
 	if err != nil {
 		if os.IsNotExist(err) {
 			return ErrNotFound
@@ -215,7 +256,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
 	if err != nil {
 		return err
 	}
-	tmpFilePath := filepath.Join(filepath.Dir(s.path), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
+	tmpFilePath := filepath.Join(filepath.Dir(s.filePath), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
 	tmpF, err := os.Create(tmpFilePath)
 	if err != nil {
 		return err
@@ -226,7 +267,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
 	if err != nil {
 		return err
 	}
-	err = os.Rename(tmpF.Name(), s.path)
+	err = os.Rename(tmpF.Name(), s.filePath)
 	if err != nil {
 		return err
 	}