filemanager.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package filemanager
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "cloud.google.com/go/storage"
  14. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  15. "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  16. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
  17. "github.com/aws/aws-sdk-go-v2/aws"
  18. "github.com/aws/aws-sdk-go-v2/config"
  19. "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
  20. "github.com/aws/aws-sdk-go-v2/service/s3"
  21. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  22. )
  23. var ErrNotFound = errors.New("not found")
  24. // FileManager is a unified interface for downloading and uploading files from various storage providers.
  25. type FileManager interface {
  26. Download(ctx context.Context, f *os.File) error
  27. Upload(ctx context.Context, f *os.File) error
  28. }
  29. // Examples of valid path:
  30. // - s3://bucket-name/path/to/file.csv
  31. // - gs://bucket-name/path/to/file.csv
  32. // - https://azblobaccount.blob.core.windows.net/containerName/path/to/file.csv
  33. // - alts3://fqdn:port/bucket-name/path/to/file.csv
  34. // - local/file/path.csv
  35. func NewFileManager(filePath string) (FileManager, error) {
  36. switch {
  37. case strings.HasPrefix(filePath, "s3://"):
  38. return NewS3File(filePath)
  39. case strings.HasPrefix(filePath, "gs://"):
  40. return NewGCSStorageFile(filePath)
  41. case strings.Contains(filePath, "blob.core.windows.net"):
  42. return NewAzureBlobFile(filePath)
  43. case strings.HasPrefix(filePath, "alts3://"):
  44. return NewAltS3File(filePath)
  45. case filePath == "":
  46. return nil, errors.New("empty path")
  47. default:
  48. return NewSystemFile(filePath), nil
  49. }
  50. }
  51. type AzureBlobFile struct {
  52. client *blockblob.Client
  53. }
  54. func NewAzureBlobFile(blobURL string) (*AzureBlobFile, error) {
  55. credential, err := azidentity.NewDefaultAzureCredential(nil)
  56. if err != nil {
  57. return nil, err
  58. }
  59. client, err := blockblob.NewClient(blobURL, credential, nil)
  60. return &AzureBlobFile{client: client}, err
  61. }
  62. func (a *AzureBlobFile) Download(ctx context.Context, f *os.File) error {
  63. _, err := a.client.DownloadFile(ctx, f, nil)
  64. // Convert Azure error into our own error.
  65. var storageErr *azcore.ResponseError
  66. if errors.As(err, &storageErr) && storageErr.ErrorCode == "BlobNotFound" {
  67. return ErrNotFound
  68. }
  69. return err
  70. }
  71. func (a *AzureBlobFile) Upload(ctx context.Context, f *os.File) error {
  72. _, err := a.client.UploadFile(ctx, f, nil)
  73. return err
  74. }
  75. type S3File struct {
  76. s3Client *s3.Client
  77. bucket string
  78. key string
  79. }
  80. func NewS3File(filePath string) (*S3File, error) {
  81. u, err := url.Parse(filePath)
  82. if err != nil {
  83. return nil, err
  84. }
  85. bucket := u.Host
  86. key := strings.TrimPrefix(u.Path, "/")
  87. if bucket == "" || key == "" {
  88. return nil, fmt.Errorf("invalid s3 path: %s", filePath)
  89. }
  90. cfg, err := config.LoadDefaultConfig(context.Background())
  91. if err != nil {
  92. return nil, err
  93. }
  94. return &S3File{
  95. s3Client: s3.NewFromConfig(cfg),
  96. bucket: bucket,
  97. key: key,
  98. }, nil
  99. }
  100. func NewAltS3File(filePath string) (*S3File, error) {
  101. u, err := url.Parse(filePath)
  102. if err != nil {
  103. return nil, err
  104. }
  105. clPath := path.Clean(u.Path)
  106. if len(strings.Split(clPath, "/")) < 3 {
  107. return nil, fmt.Errorf("invalid s3 path: %s", filePath)
  108. }
  109. // Extract bucket and path from url
  110. bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")
  111. if bucket == "" || key == "" {
  112. return nil, fmt.Errorf("invalid s3 path: %s", filePath)
  113. }
  114. cfg, err := config.LoadDefaultConfig(context.Background())
  115. if err != nil {
  116. return nil, err
  117. }
  118. return &S3File{
  119. s3Client: s3.NewFromConfig(cfg, func(o *s3.Options) {
  120. // Always use https for the endpoint when using an alternative s3 url.
  121. // NOTE: From service/s3 v1.38.0 and onwards use EndpointResolverV2 as described in the AWS SDK docs.
  122. o.EndpointResolver = s3.EndpointResolverFromURL(fmt.Sprintf("https://%v", u.Host), func(e *aws.Endpoint) {
  123. e.HostnameImmutable = true
  124. })
  125. }),
  126. bucket: bucket, // bucket
  127. key: key, // path/to/file.csv
  128. }, nil
  129. }
  130. func (c *S3File) Download(ctx context.Context, f *os.File) error {
  131. _, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
  132. Bucket: aws.String(c.bucket),
  133. Key: aws.String(c.key),
  134. })
  135. // Convert AWS error into our own error type.
  136. var notFound *types.NoSuchKey
  137. if errors.As(err, &notFound) {
  138. return ErrNotFound
  139. }
  140. return err
  141. }
  142. func (c *S3File) Upload(ctx context.Context, f *os.File) error {
  143. _, err := manager.NewUploader(c.s3Client).Upload(ctx, &s3.PutObjectInput{
  144. Bucket: aws.String(c.bucket),
  145. Key: aws.String(c.key),
  146. Body: f,
  147. })
  148. return err
  149. }
  150. type GCSStorageFile struct {
  151. bucket string
  152. key string
  153. client *storage.Client
  154. }
  155. func NewGCSStorageFile(filePath string) (*GCSStorageFile, error) {
  156. filePath = strings.TrimPrefix(filePath, "gs://")
  157. parts := strings.SplitN(filePath, "/", 2)
  158. if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
  159. return nil, errors.New("invalid GCS path")
  160. }
  161. client, err := storage.NewClient(context.TODO())
  162. if err != nil {
  163. return nil, err
  164. }
  165. return &GCSStorageFile{
  166. client: client,
  167. bucket: parts[0],
  168. key: parts[1],
  169. }, nil
  170. }
  171. func (g *GCSStorageFile) Download(ctx context.Context, f *os.File) error {
  172. r, err := g.client.Bucket(g.bucket).Object(g.key).NewReader(ctx)
  173. if err != nil {
  174. if errors.Is(err, storage.ErrObjectNotExist) {
  175. return ErrNotFound
  176. }
  177. return err
  178. }
  179. defer r.Close()
  180. _, err = io.Copy(f, r)
  181. return err
  182. }
  183. func (g *GCSStorageFile) Upload(ctx context.Context, f *os.File) error {
  184. client, err := storage.NewClient(ctx)
  185. if err != nil {
  186. return err
  187. }
  188. w := client.Bucket(g.bucket).Object(g.key).NewWriter(ctx)
  189. if _, err := io.Copy(w, f); err != nil {
  190. return err
  191. }
  192. return w.Close()
  193. }
  194. func NewSystemFile(filePath string) *SystemFile {
  195. return &SystemFile{filePath: filePath}
  196. }
  197. type SystemFile struct {
  198. filePath string
  199. }
  200. func (s *SystemFile) Download(ctx context.Context, f *os.File) error {
  201. sFile, err := os.Open(s.filePath)
  202. if err != nil {
  203. if os.IsNotExist(err) {
  204. return ErrNotFound
  205. }
  206. return err
  207. }
  208. defer sFile.Close()
  209. _, err = io.Copy(f, sFile)
  210. return err
  211. }
  212. func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
  213. // we want to avoid truncating the file if the upload fails
  214. // so want to write to a temp file and then rename it
  215. // to the final destination
  216. // temp file should be in the same directory as the final destination
  217. // to avoid "invalid cross-device link" errors when attempting to rename the file
  218. _, err := f.Seek(0, io.SeekStart)
  219. if err != nil {
  220. return err
  221. }
  222. tmpFilePath := filepath.Join(filepath.Dir(s.filePath), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
  223. tmpF, err := os.Create(tmpFilePath)
  224. if err != nil {
  225. return err
  226. }
  227. defer os.Remove(tmpF.Name())
  228. defer tmpF.Close()
  229. _, err = io.Copy(tmpF, f)
  230. if err != nil {
  231. return err
  232. }
  233. err = os.Rename(tmpF.Name(), s.filePath)
  234. if err != nil {
  235. return err
  236. }
  237. return nil
  238. }
  239. type InMemoryFile struct {
  240. Data []byte
  241. }
  242. func (c *InMemoryFile) Download(ctx context.Context, f *os.File) error {
  243. if len(c.Data) == 0 {
  244. return ErrNotFound
  245. }
  246. _, err := f.Write(c.Data)
  247. return err
  248. }
  249. func (c *InMemoryFile) Upload(ctx context.Context, f *os.File) error {
  250. var err error
  251. c.Data, err = io.ReadAll(f)
  252. return err
  253. }