Parcourir la source

feat: changed custom s3 endpoint naming and scheme to alts3, used aws sdk based s3 client endpoint construction

Signed-off-by: Klaas Jan Dijksterhuis <klaasjand@users.noreply.github.com>
Klaas Jan Dijksterhuis il y a 2 ans
Parent
commit
e339a9afd1
1 fichiers modifiés avec 40 ajouts et 79 suppressions
  1. 40 79
      pkg/filemanager/filemanager.go

+ 40 - 79
pkg/filemanager/filemanager.go

@@ -35,7 +35,7 @@ type FileManager interface {
 // - s3://bucket-name/path/to/file.csv
 // - gs://bucket-name/path/to/file.csv
 // - https://azblobaccount.blob.core.windows.net/containerName/path/to/file.csv
-// - https://fqdn:port/bucket-name/path/to/file.csv
+// - alts3://fqdn:port/bucket-name/path/to/file.csv
 // - local/file/path.csv
 
 func NewFileManager(path string) (FileManager, error) {
@@ -46,8 +46,8 @@ func NewFileManager(path string) (FileManager, error) {
 		return NewGCSStorageFile(path)
 	case strings.Contains(path, "blob.core.windows.net"):
 		return NewAzureBlobFile(path)
-	case strings.HasPrefix(path, "http://"), strings.HasPrefix(path, "https://"):
-		return NewCustomS3File(path)
+	case strings.HasPrefix(path, "alts3://"):
+		return NewAltS3File(path)
 	case path == "":
 		return nil, errors.New("empty path")
 	default:
@@ -55,82 +55,6 @@ func NewFileManager(path string) (FileManager, error) {
 	}
 }
 
-type CustomS3File struct {
-	s3Client *s3.Client
-	bucket   string
-	key      string
-}
-
-func NewCustomS3File(path string) (*CustomS3File, error) {
-	u, err := url.Parse(path)
-	if err != nil {
-		return nil, err
-	}
-
-	clPath := gp.Clean(u.Path)
-
-	if len(strings.Split(clPath, "/")) < 3 {
-		return nil, fmt.Errorf("invalid s3 path: %s", path)
-	}
-
-	// Extract bucket and path from url
-	bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
-
-	if bucket == "" || key == "" {
-		return nil, fmt.Errorf("invalid s3 path: %s", path)
-	}
-
-	const defaultRegion = "us-east-1"
-
-	resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...any) (aws.Endpoint, error) {
-		return aws.Endpoint{
-			PartitionID:       "aws",
-			SigningRegion:     defaultRegion,
-			URL:               fmt.Sprintf("%v://%v", u.Scheme, u.Host),
-			HostnameImmutable: true,
-		}, nil
-	})
-
-	cfg, err := config.LoadDefaultConfig(
-		context.Background(),
-		config.WithDefaultRegion(defaultRegion),
-		config.WithEndpointResolverWithOptions(resolver))
-
-	if err != nil {
-		return nil, err
-	}
-
-	return &CustomS3File{
-		s3Client: s3.NewFromConfig(cfg),
-		bucket:   bucket, // bucket
-		key:      key,    // path/to/file.csv
-	}, nil
-}
-
-func (c *CustomS3File) Download(ctx context.Context, f *os.File) error {
-	_, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
-		Bucket: aws.String(c.bucket),
-		Key:    aws.String(c.key),
-	})
-
-	// Convert AWS error into our own error type.
-	var notFound *types.NoSuchKey
-	if errors.As(err, &notFound) {
-		return ErrNotFound
-	}
-
-	return err
-}
-
-func (c *CustomS3File) Upload(ctx context.Context, f *os.File) error {
-	_, err := manager.NewUploader(c.s3Client).Upload(ctx, &s3.PutObjectInput{
-		Bucket: aws.String(c.bucket),
-		Key:    aws.String(c.key),
-		Body:   f,
-	})
-	return err
-}
-
 type AzureBlobFile struct {
 	client *blockblob.Client
 }
@@ -190,6 +114,43 @@ func NewS3File(path string) (*S3File, error) {
 	}, nil
 }
 
+func NewAltS3File(path string) (*S3File, error) {
+	u, err := url.Parse(path)
+	if err != nil {
+		return nil, err
+	}
+
+	clPath := gp.Clean(u.Path)
+
+	if len(strings.Split(clPath, "/")) < 3 {
+		return nil, fmt.Errorf("invalid s3 path: %s", path)
+	}
+
+	// Extract bucket and path from url
+	bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
+
+	if bucket == "" || key == "" {
+		return nil, fmt.Errorf("invalid s3 path: %s", path)
+	}
+
+	cfg, err := config.LoadDefaultConfig(context.Background())
+	if err != nil {
+		return nil, err
+	}
+
+	return &S3File{
+		s3Client: s3.NewFromConfig(cfg, func(o *s3.Options) {
+			// Always use https for the endpoint when using an alternative s3 url.
+			// NOTE: From service/s3 v1.38.0 and onwards use EndpointResolverV2 as described in the AWS SDK docs.
+			o.EndpointResolver = s3.EndpointResolverFromURL(fmt.Sprintf("https://%v", u.Host), func(e *aws.Endpoint) {
+				e.HostnameImmutable = true
+			})
+		}),
+		bucket: bucket, // bucket
+		key:    key,    // path/to/file.csv
+	}, nil
+}
+
 func (c *S3File) Download(ctx context.Context, f *os.File) error {
 	_, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
 		Bucket: aws.String(c.bucket),