|
|
@@ -7,6 +7,7 @@ import (
|
|
|
"io"
|
|
|
"net/url"
|
|
|
"os"
|
|
|
+ "path"
|
|
|
"path/filepath"
|
|
|
"strings"
|
|
|
"time"
|
|
|
@@ -34,20 +35,23 @@ type FileManager interface {
|
|
|
// - s3://bucket-name/path/to/file.csv
|
|
|
// - gs://bucket-name/path/to/file.csv
|
|
|
// - https://azblobaccount.blob.core.windows.net/containerName/path/to/file.csv
|
|
|
+// - alts3://fqdn:port/bucket-name/path/to/file.csv
|
|
|
// - local/file/path.csv
|
|
|
|
|
|
-func NewFileManager(path string) (FileManager, error) {
|
|
|
+func NewFileManager(filePath string) (FileManager, error) {
|
|
|
switch {
|
|
|
- case strings.HasPrefix(path, "s3://"):
|
|
|
- return NewS3File(path)
|
|
|
- case strings.HasPrefix(path, "gs://"):
|
|
|
- return NewGCSStorageFile(path)
|
|
|
- case strings.Contains(path, "blob.core.windows.net"):
|
|
|
- return NewAzureBlobFile(path)
|
|
|
- case path == "":
|
|
|
+ case strings.HasPrefix(filePath, "s3://"):
|
|
|
+ return NewS3File(filePath)
|
|
|
+ case strings.HasPrefix(filePath, "gs://"):
|
|
|
+ return NewGCSStorageFile(filePath)
|
|
|
+ case strings.Contains(filePath, "blob.core.windows.net"):
|
|
|
+ return NewAzureBlobFile(filePath)
|
|
|
+ case strings.HasPrefix(filePath, "alts3://"):
|
|
|
+ return NewAltS3File(filePath)
|
|
|
+ case filePath == "":
|
|
|
return nil, errors.New("empty path")
|
|
|
default:
|
|
|
- return NewSystemFile(path), nil
|
|
|
+ return NewSystemFile(filePath), nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -85,8 +89,8 @@ type S3File struct {
|
|
|
key string
|
|
|
}
|
|
|
|
|
|
-func NewS3File(path string) (*S3File, error) {
|
|
|
- u, err := url.Parse(path)
|
|
|
+func NewS3File(filePath string) (*S3File, error) {
|
|
|
+ u, err := url.Parse(filePath)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -95,7 +99,7 @@ func NewS3File(path string) (*S3File, error) {
|
|
|
key := strings.TrimPrefix(u.Path, "/")
|
|
|
|
|
|
if bucket == "" || key == "" {
|
|
|
- return nil, fmt.Errorf("invalid s3 path: %s", path)
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", filePath)
|
|
|
}
|
|
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.Background())
|
|
|
@@ -110,6 +114,43 @@ func NewS3File(path string) (*S3File, error) {
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
+func NewAltS3File(filePath string) (*S3File, error) {
|
|
|
+ u, err := url.Parse(filePath)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ clPath := path.Clean(u.Path)
|
|
|
+
|
|
|
+ if len(strings.Split(clPath, "/")) < 3 {
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", filePath)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Extract bucket and path from url
|
|
|
+ bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
|
|
|
+
|
|
|
+ if bucket == "" || key == "" {
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", filePath)
|
|
|
+ }
|
|
|
+
|
|
|
+ cfg, err := config.LoadDefaultConfig(context.Background())
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return &S3File{
|
|
|
+ s3Client: s3.NewFromConfig(cfg, func(o *s3.Options) {
|
|
|
+ // Always use https for the endpoint when using an alternative s3 url.
|
|
|
+ // NOTE: From service/s3 v1.38.0 and onwards use EndpointResolverV2 as described in the AWS SDK docs.
|
|
|
+ o.EndpointResolver = s3.EndpointResolverFromURL(fmt.Sprintf("https://%v", u.Host), func(e *aws.Endpoint) {
|
|
|
+ e.HostnameImmutable = true
|
|
|
+ })
|
|
|
+ }),
|
|
|
+ bucket: bucket, // bucket
|
|
|
+ key: key, // path/to/file.csv
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
func (c *S3File) Download(ctx context.Context, f *os.File) error {
|
|
|
_, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
|
|
|
Bucket: aws.String(c.bucket),
|
|
|
@@ -140,9 +181,9 @@ type GCSStorageFile struct {
|
|
|
client *storage.Client
|
|
|
}
|
|
|
|
|
|
-func NewGCSStorageFile(path string) (*GCSStorageFile, error) {
|
|
|
- path = strings.TrimPrefix(path, "gs://")
|
|
|
- parts := strings.SplitN(path, "/", 2)
|
|
|
+func NewGCSStorageFile(filePath string) (*GCSStorageFile, error) {
|
|
|
+ filePath = strings.TrimPrefix(filePath, "gs://")
|
|
|
+ parts := strings.SplitN(filePath, "/", 2)
|
|
|
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
|
|
|
return nil, errors.New("invalid GCS path")
|
|
|
}
|
|
|
@@ -184,16 +225,16 @@ func (g *GCSStorageFile) Upload(ctx context.Context, f *os.File) error {
|
|
|
return w.Close()
|
|
|
}
|
|
|
|
|
|
-func NewSystemFile(path string) *SystemFile {
|
|
|
- return &SystemFile{path: path}
|
|
|
+func NewSystemFile(filePath string) *SystemFile {
|
|
|
+ return &SystemFile{filePath: filePath}
|
|
|
}
|
|
|
|
|
|
type SystemFile struct {
|
|
|
- path string
|
|
|
+ filePath string
|
|
|
}
|
|
|
|
|
|
func (s *SystemFile) Download(ctx context.Context, f *os.File) error {
|
|
|
- sFile, err := os.Open(s.path)
|
|
|
+ sFile, err := os.Open(s.filePath)
|
|
|
if err != nil {
|
|
|
if os.IsNotExist(err) {
|
|
|
return ErrNotFound
|
|
|
@@ -215,7 +256,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- tmpFilePath := filepath.Join(filepath.Dir(s.path), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
|
|
|
+ tmpFilePath := filepath.Join(filepath.Dir(s.filePath), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
|
|
|
tmpF, err := os.Create(tmpFilePath)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
@@ -226,7 +267,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- err = os.Rename(tmpF.Name(), s.path)
|
|
|
+ err = os.Rename(tmpF.Name(), s.filePath)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|