|
|
@@ -7,6 +7,7 @@ import (
|
|
|
"io"
|
|
|
"net/url"
|
|
|
"os"
|
|
|
+ gp "path"
|
|
|
"path/filepath"
|
|
|
"strings"
|
|
|
"time"
|
|
|
@@ -34,6 +35,7 @@ type FileManager interface {
|
|
|
// - s3://bucket-name/path/to/file.csv
|
|
|
// - gs://bucket-name/path/to/file.csv
|
|
|
// - https://azblobaccount.blob.core.windows.net/containerName/path/to/file.csv
|
|
|
+// - https://fqdn:port/bucket-name/path/to/file.csv
|
|
|
// - local/file/path.csv
|
|
|
|
|
|
func NewFileManager(path string) (FileManager, error) {
|
|
|
@@ -44,6 +46,8 @@ func NewFileManager(path string) (FileManager, error) {
|
|
|
return NewGCSStorageFile(path)
|
|
|
case strings.Contains(path, "blob.core.windows.net"):
|
|
|
return NewAzureBlobFile(path)
|
|
|
+ case strings.HasPrefix(path, "http://"), strings.HasPrefix(path, "https://"):
|
|
|
+ return NewCustomS3File(path)
|
|
|
case path == "":
|
|
|
return nil, errors.New("empty path")
|
|
|
default:
|
|
|
@@ -51,6 +55,82 @@ func NewFileManager(path string) (FileManager, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+type CustomS3File struct {
|
|
|
+ s3Client *s3.Client
|
|
|
+ bucket string
|
|
|
+ key string
|
|
|
+}
|
|
|
+
|
|
|
+func NewCustomS3File(path string) (*CustomS3File, error) {
|
|
|
+ u, err := url.Parse(path)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ clPath := gp.Clean(u.Path)
|
|
|
+
|
|
|
+ if len(strings.Split(clPath, "/")) < 3 {
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", path)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Extract bucket and path from url
|
|
|
+ bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
|
|
|
+
|
|
|
+ if bucket == "" || key == "" {
|
|
|
+ return nil, fmt.Errorf("invalid s3 path: %s", path)
|
|
|
+ }
|
|
|
+
|
|
|
+ const defaultRegion = "us-east-1"
|
|
|
+
|
|
|
+ resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...any) (aws.Endpoint, error) {
|
|
|
+ return aws.Endpoint{
|
|
|
+ PartitionID: "aws",
|
|
|
+ SigningRegion: defaultRegion,
|
|
|
+ URL: fmt.Sprintf("%v://%v", u.Scheme, u.Host),
|
|
|
+ HostnameImmutable: true,
|
|
|
+ }, nil
|
|
|
+ })
|
|
|
+
|
|
|
+ cfg, err := config.LoadDefaultConfig(
|
|
|
+ context.Background(),
|
|
|
+ config.WithDefaultRegion(defaultRegion),
|
|
|
+ config.WithEndpointResolverWithOptions(resolver))
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return &CustomS3File{
|
|
|
+ s3Client: s3.NewFromConfig(cfg),
|
|
|
+ bucket: bucket, // bucket
|
|
|
+ key: key, // path/to/file.csv
|
|
|
+ }, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (c *CustomS3File) Download(ctx context.Context, f *os.File) error {
|
|
|
+ _, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
|
|
|
+ Bucket: aws.String(c.bucket),
|
|
|
+ Key: aws.String(c.key),
|
|
|
+ })
|
|
|
+
|
|
|
+ // Convert AWS error into our own error type.
|
|
|
+ var notFound *types.NoSuchKey
|
|
|
+ if errors.As(err, ¬Found) {
|
|
|
+ return ErrNotFound
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (c *CustomS3File) Upload(ctx context.Context, f *os.File) error {
|
|
|
+ _, err := manager.NewUploader(c.s3Client).Upload(ctx, &s3.PutObjectInput{
|
|
|
+ Bucket: aws.String(c.bucket),
|
|
|
+ Key: aws.String(c.key),
|
|
|
+ Body: f,
|
|
|
+ })
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
type AzureBlobFile struct {
|
|
|
client *blockblob.Client
|
|
|
}
|