| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- package filemanager
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "net/url"
- "os"
- "path"
- "path/filepath"
- "strings"
- "time"
- "cloud.google.com/go/storage"
- "github.com/Azure/azure-sdk-for-go/sdk/azcore"
- "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
- "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
- "github.com/aws/aws-sdk-go-v2/aws"
- "github.com/aws/aws-sdk-go-v2/config"
- "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
- "github.com/aws/aws-sdk-go-v2/service/s3"
- "github.com/aws/aws-sdk-go-v2/service/s3/types"
- )
- var ErrNotFound = errors.New("not found")
- // FileManager is a unified interface for downloading and uploading files from various storage providers.
- type FileManager interface {
- Download(ctx context.Context, f *os.File) error
- Upload(ctx context.Context, f *os.File) error
- }
- // Examples of valid path:
- // - s3://bucket-name/path/to/file.csv
- // - gs://bucket-name/path/to/file.csv
- // - https://azblobaccount.blob.core.windows.net/containerName/path/to/file.csv
- // - alts3://fqdn:port/bucket-name/path/to/file.csv
- // - local/file/path.csv
- func NewFileManager(filePath string) (FileManager, error) {
- switch {
- case strings.HasPrefix(filePath, "s3://"):
- return NewS3File(filePath)
- case strings.HasPrefix(filePath, "gs://"):
- return NewGCSStorageFile(filePath)
- case strings.Contains(filePath, "blob.core.windows.net"):
- return NewAzureBlobFile(filePath)
- case strings.HasPrefix(filePath, "alts3://"):
- return NewAltS3File(filePath)
- case filePath == "":
- return nil, errors.New("empty path")
- default:
- return NewSystemFile(filePath), nil
- }
- }
- type AzureBlobFile struct {
- client *blockblob.Client
- }
- func NewAzureBlobFile(blobURL string) (*AzureBlobFile, error) {
- credential, err := azidentity.NewDefaultAzureCredential(nil)
- if err != nil {
- return nil, err
- }
- client, err := blockblob.NewClient(blobURL, credential, nil)
- return &AzureBlobFile{client: client}, err
- }
- func (a *AzureBlobFile) Download(ctx context.Context, f *os.File) error {
- _, err := a.client.DownloadFile(ctx, f, nil)
- // Convert Azure error into our own error.
- var storageErr *azcore.ResponseError
- if errors.As(err, &storageErr) && storageErr.ErrorCode == "BlobNotFound" {
- return ErrNotFound
- }
- return err
- }
- func (a *AzureBlobFile) Upload(ctx context.Context, f *os.File) error {
- _, err := a.client.UploadFile(ctx, f, nil)
- return err
- }
- type S3File struct {
- s3Client *s3.Client
- bucket string
- key string
- }
- func NewS3File(filePath string) (*S3File, error) {
- u, err := url.Parse(filePath)
- if err != nil {
- return nil, err
- }
- bucket := u.Host
- key := strings.TrimPrefix(u.Path, "/")
- if bucket == "" || key == "" {
- return nil, fmt.Errorf("invalid s3 path: %s", filePath)
- }
- cfg, err := config.LoadDefaultConfig(context.Background())
- if err != nil {
- return nil, err
- }
- return &S3File{
- s3Client: s3.NewFromConfig(cfg),
- bucket: bucket,
- key: key,
- }, nil
- }
- func NewAltS3File(filePath string) (*S3File, error) {
- u, err := url.Parse(filePath)
- if err != nil {
- return nil, err
- }
- clPath := path.Clean(u.Path)
- if len(strings.Split(clPath, "/")) < 3 {
- return nil, fmt.Errorf("invalid s3 path: %s", filePath)
- }
- // Extract bucket and path from url
- bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
- if bucket == "" || key == "" {
- return nil, fmt.Errorf("invalid s3 path: %s", filePath)
- }
- cfg, err := config.LoadDefaultConfig(context.Background())
- if err != nil {
- return nil, err
- }
- return &S3File{
- s3Client: s3.NewFromConfig(cfg, func(o *s3.Options) {
- // Always use https for the endpoint when using an alternative s3 url.
- // NOTE: From service/s3 v1.38.0 and onwards use EndpointResolverV2 as described in the AWS SDK docs.
- o.EndpointResolver = s3.EndpointResolverFromURL(fmt.Sprintf("https://%v", u.Host), func(e *aws.Endpoint) {
- e.HostnameImmutable = true
- })
- }),
- bucket: bucket, // bucket
- key: key, // path/to/file.csv
- }, nil
- }
- func (c *S3File) Download(ctx context.Context, f *os.File) error {
- _, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
- Bucket: aws.String(c.bucket),
- Key: aws.String(c.key),
- })
- // Convert AWS error into our own error type.
- var notFound *types.NoSuchKey
- if errors.As(err, ¬Found) {
- return ErrNotFound
- }
- return err
- }
- func (c *S3File) Upload(ctx context.Context, f *os.File) error {
- _, err := manager.NewUploader(c.s3Client).Upload(ctx, &s3.PutObjectInput{
- Bucket: aws.String(c.bucket),
- Key: aws.String(c.key),
- Body: f,
- })
- return err
- }
- type GCSStorageFile struct {
- bucket string
- key string
- client *storage.Client
- }
- func NewGCSStorageFile(filePath string) (*GCSStorageFile, error) {
- filePath = strings.TrimPrefix(filePath, "gs://")
- parts := strings.SplitN(filePath, "/", 2)
- if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
- return nil, errors.New("invalid GCS path")
- }
- client, err := storage.NewClient(context.TODO())
- if err != nil {
- return nil, err
- }
- return &GCSStorageFile{
- client: client,
- bucket: parts[0],
- key: parts[1],
- }, nil
- }
- func (g *GCSStorageFile) Download(ctx context.Context, f *os.File) error {
- r, err := g.client.Bucket(g.bucket).Object(g.key).NewReader(ctx)
- if err != nil {
- if errors.Is(err, storage.ErrObjectNotExist) {
- return ErrNotFound
- }
- return err
- }
- defer r.Close()
- _, err = io.Copy(f, r)
- return err
- }
- func (g *GCSStorageFile) Upload(ctx context.Context, f *os.File) error {
- client, err := storage.NewClient(ctx)
- if err != nil {
- return err
- }
- w := client.Bucket(g.bucket).Object(g.key).NewWriter(ctx)
- if _, err := io.Copy(w, f); err != nil {
- return err
- }
- return w.Close()
- }
- func NewSystemFile(filePath string) *SystemFile {
- return &SystemFile{filePath: filePath}
- }
- type SystemFile struct {
- filePath string
- }
- func (s *SystemFile) Download(ctx context.Context, f *os.File) error {
- sFile, err := os.Open(s.filePath)
- if err != nil {
- if os.IsNotExist(err) {
- return ErrNotFound
- }
- return err
- }
- defer sFile.Close()
- _, err = io.Copy(f, sFile)
- return err
- }
- func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
- // we want to avoid truncating the file if the upload fails
- // so want to write to a temp file and then rename it
- // to the final destination
- // temp file should be in the same directory as the final destination
- // to avoid "invalid cross-device link" errors when attempting to rename the file
- _, err := f.Seek(0, io.SeekStart)
- if err != nil {
- return err
- }
- tmpFilePath := filepath.Join(filepath.Dir(s.filePath), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
- tmpF, err := os.Create(tmpFilePath)
- if err != nil {
- return err
- }
- defer os.Remove(tmpF.Name())
- defer tmpF.Close()
- _, err = io.Copy(tmpF, f)
- if err != nil {
- return err
- }
- err = os.Rename(tmpF.Name(), s.filePath)
- if err != nil {
- return err
- }
- return nil
- }
- type InMemoryFile struct {
- Data []byte
- }
- func (c *InMemoryFile) Download(ctx context.Context, f *os.File) error {
- if len(c.Data) == 0 {
- return ErrNotFound
- }
- _, err := f.Write(c.Data)
- return err
- }
- func (c *InMemoryFile) Upload(ctx context.Context, f *os.File) error {
- var err error
- c.Data, err = io.ReadAll(f)
- return err
- }
|