|
|
@@ -7,7 +7,7 @@ import (
|
|
|
"io"
|
|
|
"net/url"
|
|
|
"os"
|
|
|
- gp "path"
|
|
|
+ "path"
|
|
|
"path/filepath"
|
|
|
"strings"
|
|
|
"time"
|
|
|
@@ -38,20 +38,20 @@ type FileManager interface {
|
|
|
// - alts3://fqdn:port/bucket-name/path/to/file.csv
|
|
|
// - local/file/path.csv
|
|
|
|
|
|
-func NewFileManager(path string) (FileManager, error) {
|
|
|
+func NewFileManager(filePath string) (FileManager, error) {
|
|
|
switch {
|
|
|
- case strings.HasPrefix(path, "s3://"):
|
|
|
- return NewS3File(path)
|
|
|
- case strings.HasPrefix(path, "gs://"):
|
|
|
- return NewGCSStorageFile(path)
|
|
|
- case strings.Contains(path, "blob.core.windows.net"):
|
|
|
- return NewAzureBlobFile(path)
|
|
|
- case strings.HasPrefix(path, "alts3://"):
|
|
|
- return NewAltS3File(path)
|
|
|
- case path == "":
|
|
|
+ case strings.HasPrefix(filePath, "s3://"):
|
|
|
+ return NewS3File(filePath)
|
|
|
+ case strings.HasPrefix(filePath, "gs://"):
|
|
|
+ return NewGCSStorageFile(filePath)
|
|
|
+ case strings.Contains(filePath, "blob.core.windows.net"):
|
|
|
+ return NewAzureBlobFile(filePath)
|
|
|
+ case strings.HasPrefix(filePath, "alts3://"):
|
|
|
+ return NewAltS3File(filePath)
|
|
|
+ case filePath == "":
|
|
|
return nil, errors.New("empty path")
|
|
|
default:
|
|
|
- return NewSystemFile(path), nil
|
|
|
+ return NewSystemFile(filePath), nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -89,8 +89,8 @@ type S3File struct {
|
|
|
key string
|
|
|
}
|
|
|
|
|
|
-func NewS3File(path string) (*S3File, error) {
|
|
|
- u, err := url.Parse(path)
|
|
|
+func NewS3File(filePath string) (*S3File, error) {
|
|
|
+ u, err := url.Parse(filePath)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -99,7 +99,7 @@ func NewS3File(path string) (*S3File, error) {
|
|
|
key := strings.TrimPrefix(u.Path, "/")
|
|
|
|
|
|
if bucket == "" || key == "" {
|
|
|
- return nil, fmt.Errorf("invalid s3 path: %s", path)
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", filePath)
|
|
|
}
|
|
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.Background())
|
|
|
@@ -114,23 +114,23 @@ func NewS3File(path string) (*S3File, error) {
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
-func NewAltS3File(path string) (*S3File, error) {
|
|
|
- u, err := url.Parse(path)
|
|
|
+func NewAltS3File(filePath string) (*S3File, error) {
|
|
|
+ u, err := url.Parse(filePath)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- clPath := gp.Clean(u.Path)
|
|
|
+ clPath := path.Clean(u.Path)
|
|
|
|
|
|
if len(strings.Split(clPath, "/")) < 3 {
|
|
|
- return nil, fmt.Errorf("invalid s3 path: %s", path)
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", filePath)
|
|
|
}
|
|
|
|
|
|
// Extract bucket and path from url
|
|
|
bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
|
|
|
|
|
|
if bucket == "" || key == "" {
|
|
|
- return nil, fmt.Errorf("invalid s3 path: %s", path)
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", filePath)
|
|
|
}
|
|
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.Background())
|
|
|
@@ -181,9 +181,9 @@ type GCSStorageFile struct {
|
|
|
client *storage.Client
|
|
|
}
|
|
|
|
|
|
-func NewGCSStorageFile(path string) (*GCSStorageFile, error) {
|
|
|
- path = strings.TrimPrefix(path, "gs://")
|
|
|
- parts := strings.SplitN(path, "/", 2)
|
|
|
+func NewGCSStorageFile(filePath string) (*GCSStorageFile, error) {
|
|
|
+ filePath = strings.TrimPrefix(filePath, "gs://")
|
|
|
+ parts := strings.SplitN(filePath, "/", 2)
|
|
|
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
|
|
|
return nil, errors.New("invalid GCS path")
|
|
|
}
|
|
|
@@ -225,16 +225,16 @@ func (g *GCSStorageFile) Upload(ctx context.Context, f *os.File) error {
|
|
|
return w.Close()
|
|
|
}
|
|
|
|
|
|
-func NewSystemFile(path string) *SystemFile {
|
|
|
- return &SystemFile{path: path}
|
|
|
+func NewSystemFile(filePath string) *SystemFile {
|
|
|
+ return &SystemFile{filePath: filePath}
|
|
|
}
|
|
|
|
|
|
type SystemFile struct {
|
|
|
- path string
|
|
|
+ filePath string
|
|
|
}
|
|
|
|
|
|
func (s *SystemFile) Download(ctx context.Context, f *os.File) error {
|
|
|
- sFile, err := os.Open(s.path)
|
|
|
+ sFile, err := os.Open(s.filePath)
|
|
|
if err != nil {
|
|
|
if os.IsNotExist(err) {
|
|
|
return ErrNotFound
|
|
|
@@ -256,7 +256,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- tmpFilePath := filepath.Join(filepath.Dir(s.path), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
|
|
|
+ tmpFilePath := filepath.Join(filepath.Dir(s.filePath), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
|
|
|
tmpF, err := os.Create(tmpFilePath)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
@@ -267,7 +267,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- err = os.Rename(tmpF.Name(), s.path)
|
|
|
+ err = os.Rename(tmpF.Name(), s.filePath)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|