Jelajahi Sumber

Add cross-provider multipart upload support

Introduce a MultipartUpload/UploadPart abstraction in the interface and
base layers, implemented by all four providers (AWS S3, Azure Blob, GCP
Storage, OpenStack Swift) and therefore the moto mock.

The explicit lifecycle is initiate -> upload_part(n) -> complete/abort,
exposed via BucketObject.create_multipart_upload(). The high-level
upload()/upload_from_file() methods now route inputs above a configurable
threshold (CB_MULTIPART_THRESHOLD/PART_SIZE) through the same mechanism,
streaming one part at a time so large payloads are never fully buffered.
Existing method signatures and return values are preserved.

Per-provider mapping: native S3 multipart; Azure block blobs
(stage_block/commit_block_list, with a documented no-op abort); GCS
compose over temporary part objects with >32-source chaining and cleanup;
Swift Static Large Objects with a manifest PUT. Azure's whole-file in-memory
upload is replaced with a streaming single-shot path, removing the unused
create_blob_from_text/create_blob_from_file helpers.

Adds object-store tests (roundtrip, out-of-order parts, abort, transparent
multipart, single-shot threshold, part-size validation) that run on the mock
provider in CI and on the cloud providers when selected.
Nuwan Goonasekera 1 hari lalu
induk
melakukan
8975b50c53

+ 176 - 0
cloudbridge/base/resources.py

@@ -2,6 +2,7 @@
 Base implementation for data objects exposed through a provider or service
 """
 import inspect
+import io
 import itertools
 import logging
 import os
@@ -14,6 +15,7 @@ from cloudbridge.interfaces.exceptions import \
     InvalidConfigurationException
 from cloudbridge.interfaces.exceptions import InvalidLabelException
 from cloudbridge.interfaces.exceptions import InvalidNameException
+from cloudbridge.interfaces.exceptions import InvalidValueException
 from cloudbridge.interfaces.exceptions import WaitStateException
 from cloudbridge.interfaces.resources import AttachmentInfo
 from cloudbridge.interfaces.resources import Bucket
@@ -31,6 +33,7 @@ from cloudbridge.interfaces.resources import KeyPair
 from cloudbridge.interfaces.resources import LaunchConfig
 from cloudbridge.interfaces.resources import MachineImage
 from cloudbridge.interfaces.resources import MachineImageState
+from cloudbridge.interfaces.resources import MultipartUpload
 from cloudbridge.interfaces.resources import Network
 from cloudbridge.interfaces.resources import NetworkState
 from cloudbridge.interfaces.resources import ObjectLifeCycleMixin
@@ -43,6 +46,7 @@ from cloudbridge.interfaces.resources import Snapshot
 from cloudbridge.interfaces.resources import SnapshotState
 from cloudbridge.interfaces.resources import Subnet
 from cloudbridge.interfaces.resources import SubnetState
+from cloudbridge.interfaces.resources import UploadPart
 from cloudbridge.interfaces.resources import VMFirewall
 from cloudbridge.interfaces.resources import VMFirewallRule
 from cloudbridge.interfaces.resources import VMType
@@ -680,6 +684,76 @@ class BaseRegion(BaseCloudResource, Region):
         return next(iter(self.zones))
 
 
+class BaseUploadPart(UploadPart):
+    """
+    A simple, serializable handle for a single uploaded part. Concrete
+    providers return these from ``upload_part`` and consume them in
+    ``complete_multipart_upload``.
+    """
+
+    def __init__(self, part_number, etag):
+        self._part_number = part_number
+        self._etag = etag
+
+    @property
+    def part_number(self):
+        return self._part_number
+
+    @property
+    def etag(self):
+        return self._etag
+
+    def __repr__(self):
+        return "<CB-{0}: {1} ({2})>".format(
+            self.__class__.__name__, self._part_number, self._etag)
+
+
+class BaseMultipartUpload(BaseCloudResource, MultipartUpload):
+    """
+    Base implementation of an in-progress multipart upload. It is a thin
+    handle that delegates the actual work to the provider's bucket-object
+    service, mirroring how other base resources delegate to their service
+    (e.g. ``BaseBucket.delete``).
+    """
+
+    def __init__(self, provider, bucket, object_name, upload_id):
+        super(BaseMultipartUpload, self).__init__(provider)
+        self._bucket = bucket
+        self._object_name = object_name
+        self._upload_id = upload_id
+
+    @property
+    def id(self):
+        return self._upload_id
+
+    @property
+    def name(self):
+        return self._object_name
+
+    @property
+    def bucket(self):
+        return self._bucket
+
+    @property
+    def object_name(self):
+        return self._object_name
+
+    def upload_part(self, part_number, data):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects.upload_part(
+            self._bucket, self, part_number, data)
+
+    def complete(self, parts):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects\
+            .complete_multipart_upload(self._bucket, self, parts)
+
+    def abort(self):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects\
+            .abort_multipart_upload(self._bucket, self)
+
+
 class BaseBucketObject(BaseCloudResource, BucketObject):
 
     # Regular expression for valid bucket keys.
@@ -690,6 +764,16 @@ class BaseBucketObject(BaseCloudResource, BucketObject):
     # s/537772/what-is-the-most-correct-regular-expression-for-a-unix-file-path
     CB_NAME_PATTERN = re.compile(r"[^\0]+")
 
+    # Uploads larger than this many bytes are split into parts.
+    CB_MULTIPART_THRESHOLD = int(os.environ.get(
+        'CB_MULTIPART_THRESHOLD', 100 * 1024 * 1024))   # 100 MiB
+    # The size of each part for multipart uploads.
+    CB_MULTIPART_PART_SIZE = int(os.environ.get(
+        'CB_MULTIPART_PART_SIZE', 50 * 1024 * 1024))    # 50 MiB
+    # Portable floor: S3 and Swift reject non-final parts smaller than 5 MiB,
+    # so part sizes below this are rejected up-front.
+    CB_MULTIPART_MIN_PART_SIZE = 5 * 1024 * 1024
+
     def __init__(self, provider):
         super(BaseBucketObject, self).__init__(provider)
 
@@ -711,6 +795,98 @@ class BaseBucketObject(BaseCloudResource, BucketObject):
     def save_content(self, target_stream):
         shutil.copyfileobj(self.iter_content(), target_stream)
 
+    @property
+    def _multipart_threshold(self):
+        # pylint:disable=protected-access
+        return int(self._provider._get_config_value(
+            'multipart_threshold', self.CB_MULTIPART_THRESHOLD))
+
+    @property
+    def _multipart_part_size(self):
+        # pylint:disable=protected-access
+        return int(self._provider._get_config_value(
+            'multipart_part_size', self.CB_MULTIPART_PART_SIZE))
+
+    @staticmethod
+    def _data_size(data):
+        """
+        Best-effort size of an upload payload, or ``None`` if it cannot be
+        determined without consuming the data (e.g. a non-seekable stream).
+        """
+        if isinstance(data, str):
+            return len(data.encode('utf-8'))
+        if isinstance(data, (bytes, bytearray)):
+            return len(data)
+        if hasattr(data, 'seek') and hasattr(data, 'tell'):
+            try:
+                pos = data.tell()
+                data.seek(0, os.SEEK_END)
+                size = data.tell()
+                data.seek(pos)
+                return size
+            except (OSError, ValueError):
+                return None
+        return None
+
+    @staticmethod
+    def _as_stream(data):
+        if isinstance(data, str):
+            data = data.encode('utf-8')
+        if isinstance(data, (bytes, bytearray)):
+            return io.BytesIO(data)
+        return data
+
+    def upload(self, data):
+        size = self._data_size(data)
+        if size is not None and size > self._multipart_threshold:
+            return self._upload_multipart(self._as_stream(data))
+        return self._upload_single_shot(data)
+
+    def upload_from_file(self, path):
+        if os.path.getsize(path) > self._multipart_threshold:
+            with open(path, 'rb') as f:
+                return self._upload_multipart(f)
+        return self._upload_from_file_single_shot(path)
+
+    def _upload_multipart(self, stream):
+        """
+        Drive the explicit multipart lifecycle over a stream, reading it one
+        part at a time so the whole payload is never held in memory. Any
+        failure aborts the upload to avoid leaking staged parts.
+        """
+        part_size = self._multipart_part_size
+        if part_size < self.CB_MULTIPART_MIN_PART_SIZE:
+            raise InvalidValueException('multipart_part_size', part_size)
+
+        upload = self.create_multipart_upload()
+        parts = []
+        try:
+            part_number = 1
+            while True:
+                chunk = stream.read(part_size)
+                if not chunk:
+                    break
+                parts.append(upload.upload_part(part_number, chunk))
+                part_number += 1
+            return upload.complete(parts)
+        except Exception:
+            upload.abort()
+            raise
+
+    def _upload_from_file_single_shot(self, path):
+        """
+        Default small-file upload: read the file and hand it to the provider's
+        single-shot upload. Providers with a more efficient native file upload
+        (e.g. AWS ``upload_file``) override :meth:`upload_from_file` directly.
+        """
+        with open(path, 'rb') as f:
+            return self._upload_single_shot(f)
+
+    def create_multipart_upload(self):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects.create_multipart_upload(
+            self.bucket, self.name)
+
     def __eq__(self, other):
         return (isinstance(other, BucketObject) and
                 # pylint:disable=protected-access

+ 136 - 0
cloudbridge/interfaces/resources.py

@@ -2125,6 +2125,118 @@ class VMFirewallRule(CloudResource):
         pass
 
 
+class UploadPart(object):
+    """
+    A handle for a single part uploaded as part of a multipart upload.
+
+    Returned by :meth:`.MultipartUpload.upload_part`. Callers must retain the
+    parts they receive and pass them to :meth:`.MultipartUpload.complete`.
+    The handle is a simple, serializable value object so that parts may be
+    collected across threads or processes when uploading in parallel.
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractproperty
+    def part_number(self):
+        """
+        The 1-based index of this part within the upload.
+
+        :rtype: ``int``
+        :return: The part number supplied to ``upload_part``.
+        """
+        pass
+
+    @abstractproperty
+    def etag(self):
+        """
+        Opaque provider handle identifying the stored part.
+
+        Its concrete form varies by provider (e.g. an S3 ETag, an Azure block
+        id, a GCS temp-object name, or a Swift segment descriptor) and should
+        not be interpreted by clients.
+
+        :rtype: ``object``
+        :return: The provider-specific part handle.
+        """
+        pass
+
+
+class MultipartUpload(CloudResource):
+    """
+    Represents an in-progress, multi-part upload to a :class:`.BucketObject`.
+
+    Created via :meth:`.BucketObject.create_multipart_upload`. Parts are
+    uploaded with :meth:`upload_part` (in any order, optionally in parallel)
+    and the upload is finalized with :meth:`complete` or cancelled with
+    :meth:`abort`.
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractproperty
+    def bucket(self):
+        """
+        The bucket this upload targets.
+
+        :rtype: :class:`.Bucket`
+        :return: The target Bucket.
+        """
+        pass
+
+    @abstractproperty
+    def object_name(self):
+        """
+        The key of the object being uploaded.
+
+        :rtype: ``str``
+        :return: The target object name.
+        """
+        pass
+
+    @abstractmethod
+    def upload_part(self, part_number, data):
+        """
+        Upload a single part of this multipart upload.
+
+        :type part_number: ``int``
+        :param part_number: 1-based index that determines the part's position
+            in the final object. Part numbers must be unique within an upload.
+
+        :type data: ``bytes`` or file-like object
+        :param data: The part payload. Every part except the last must be at
+            least the provider minimum part size (5 MiB on S3).
+
+        :rtype: :class:`.UploadPart`
+        :return: A part handle that MUST be retained and passed to
+            :meth:`complete`.
+        """
+        pass
+
+    @abstractmethod
+    def complete(self, parts):
+        """
+        Finalize the upload, assembling parts in ascending ``part_number``.
+
+        :type parts: ``list`` of :class:`.UploadPart`
+        :param parts: The handles returned by :meth:`upload_part`. They may be
+            supplied in any order.
+
+        :rtype: :class:`.BucketObject`
+        :return: The completed object.
+        """
+        pass
+
+    @abstractmethod
+    def abort(self):
+        """
+        Cancel the upload, releasing any staged parts or temporary objects.
+
+        ``abort`` is idempotent and safe to call after a partial upload. On
+        Azure it is best-effort: there is no server-side cancel, so uncommitted
+        blocks are left to expire (after ~7 days) rather than being deleted.
+        """
+        pass
+
+
 class BucketObject(CloudResource):
     """
     Represents an object stored within a bucket.
@@ -2199,11 +2311,35 @@ class BucketObject(CloudResource):
         """
         Store the contents of the file pointed by the "path" variable.
 
+        Files larger than the configured multipart threshold are streamed to
+        the provider in parts, so the whole file is never held in memory.
+
         :type path: ``str``
         :param path: Absolute path to the file to be uploaded to S3.
         """
         pass
 
+    @abstractmethod
+    def create_multipart_upload(self):
+        """
+        Begin an explicit, multi-part upload to this object.
+
+        Returns a :class:`.MultipartUpload` handle whose lifecycle is::
+
+            upload = obj.create_multipart_upload()
+            parts = [upload.upload_part(n, chunk) for n, chunk in ...]
+            upload.complete(parts)   # or upload.abort()
+
+        Parts may be uploaded in any order and in parallel; every part except
+        the last must be at least the provider minimum part size (5 MiB on
+        S3). :meth:`.MultipartUpload.complete` assembles them in ascending
+        ``part_number`` order.
+
+        :rtype: :class:`.MultipartUpload`
+        :return: An in-progress multipart upload handle for this object.
+        """
+        pass
+
     @abstractmethod
     def delete(self):
         """

+ 71 - 0
cloudbridge/interfaces/services.py

@@ -1225,6 +1225,77 @@ class BucketObjectService(CloudService):
         """
         pass
 
+    @abstractmethod
+    def create_multipart_upload(self, bucket, object_name):
+        """
+        Begin an explicit, multi-part upload of an object to a bucket.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket to upload into.
+
+        :type object_name: ``str``
+        :param object_name: The key of the object being uploaded.
+
+        :rtype: :class:`.MultipartUpload`
+        :return: An in-progress multipart upload handle.
+        """
+        pass
+
+    @abstractmethod
+    def upload_part(self, bucket, upload, part_number, data):
+        """
+        Upload a single part of an in-progress multipart upload.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket being uploaded into.
+
+        :type upload: :class:`.MultipartUpload`
+        :param upload: The in-progress upload, as returned by
+            :meth:`create_multipart_upload`.
+
+        :type part_number: ``int``
+        :param part_number: 1-based index of the part.
+
+        :type data: ``bytes`` or file-like object
+        :param data: The part payload.
+
+        :rtype: :class:`.UploadPart`
+        :return: A handle for the uploaded part.
+        """
+        pass
+
+    @abstractmethod
+    def complete_multipart_upload(self, bucket, upload, parts):
+        """
+        Finalize a multipart upload, assembling parts in part-number order.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket being uploaded into.
+
+        :type upload: :class:`.MultipartUpload`
+        :param upload: The in-progress upload to finalize.
+
+        :type parts: ``list`` of :class:`.UploadPart`
+        :param parts: The part handles returned by :meth:`upload_part`.
+
+        :rtype: :class:`.BucketObject`
+        :return: The completed object.
+        """
+        pass
+
+    @abstractmethod
+    def abort_multipart_upload(self, bucket, upload):
+        """
+        Cancel a multipart upload and release any staged parts.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket being uploaded into.
+
+        :type upload: :class:`.MultipartUpload`
+        :param upload: The in-progress upload to cancel.
+        """
+        pass
+
 
 class SecurityService(CloudService):
 

+ 9 - 1
cloudbridge/providers/aws/resources.py

@@ -866,13 +866,21 @@ class AWSBucketObject(BaseBucketObject):
     def last_modified(self):
         return self._obj.last_modified.strftime("%Y-%m-%dT%H:%M:%S.%f")
 
+    @property
+    def bucket(self):
+        return AWSBucket(self._provider,
+                         self._provider.s3_conn.Bucket(self._obj.bucket_name))
+
     def iter_content(self):
         return self.BucketObjIterator(self._obj.get().get('Body'))
 
-    def upload(self, data):
+    def _upload_single_shot(self, data):
         self._obj.put(Body=data)
 
     def upload_from_file(self, path):
+        # boto3's upload_file already streams large files in parts via its
+        # TransferManager, so it is used directly rather than CloudBridge's
+        # multipart driver.
         self._obj.upload_file(path)
 
     def delete(self):

+ 36 - 0
cloudbridge/providers/aws/services.py

@@ -8,6 +8,8 @@ from botocore.exceptions import ClientError
 
 import cloudbridge.base.helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
+from cloudbridge.base.resources import BaseMultipartUpload
+from cloudbridge.base.resources import BaseUploadPart
 from cloudbridge.base.resources import ClientPagedResultList
 from cloudbridge.base.resources import ServerPagedResultList
 from cloudbridge.base.services import BaseBucketObjectService
@@ -613,6 +615,40 @@ class AWSBucketObjectService(BaseBucketObjectService):
         obj = bucket._bucket.Object(name)
         return AWSBucketObject(self.provider, obj)
 
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        response = self.provider.s3_conn.meta.client.create_multipart_upload(
+            Bucket=bucket.name, Key=object_name)
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   response['UploadId'])
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        response = self.provider.s3_conn.meta.client.upload_part(
+            Bucket=bucket.name, Key=upload.object_name,
+            UploadId=upload.id, PartNumber=part_number, Body=data)
+        return BaseUploadPart(part_number, response['ETag'])
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        self.provider.s3_conn.meta.client.complete_multipart_upload(
+            Bucket=bucket.name, Key=upload.object_name, UploadId=upload.id,
+            MultipartUpload={'Parts': [
+                {'PartNumber': p.part_number, 'ETag': p.etag}
+                for p in ordered]})
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        self.provider.s3_conn.meta.client.abort_multipart_upload(
+            Bucket=bucket.name, Key=upload.object_name, UploadId=upload.id)
+
 
 class AWSComputeService(BaseComputeService):
 

+ 11 - 14
cloudbridge/providers/azure/azure_client.py

@@ -30,8 +30,8 @@ from azure.mgmt.resource.resources.models import ResourceGroup
 from azure.mgmt.storage import StorageManagementClient
 from azure.mgmt.storage.models import Sku, StorageAccountCreateParameters
 from azure.mgmt.subscription import SubscriptionClient
-from azure.storage.blob import (BlobSasPermissions, BlobServiceClient,
-                                generate_blob_sas)
+from azure.storage.blob import (BlobBlock, BlobSasPermissions,
+                                BlobServiceClient, generate_blob_sas)
 
 from . import helpers as azure_helpers
 
@@ -473,21 +473,18 @@ class AzureClient(object):
         blob_client = self.blob_client(container_name, blob_name)
         blob_client.upload_blob(data=data, length=length, overwrite=True)
 
-    def get_blob(self, container_name, blob_name):
+    def stage_block(self, container_name, blob_name, block_id, data):
         blob_client = self.blob_client(container_name, blob_name)
-        return blob_client.get_blob_properties(container_name, blob_name)
+        blob_client.stage_block(block_id, data)
 
-    def create_blob_from_text(self, container_name, blob_name, text):
-        if isinstance(text, bytes):
-            length = len(text)
-        else:
-            length = len(text.encode())
-        self.upload_blob(container_name, blob_name, text, length)
+    def commit_block_list(self, container_name, blob_name, block_ids):
+        blob_client = self.blob_client(container_name, blob_name)
+        block_list = [BlobBlock(block_id=block_id) for block_id in block_ids]
+        blob_client.commit_block_list(block_list)
 
-    def create_blob_from_file(self, container_name, blob_name, file_path, length=None):
-        with open(file_path, 'rb') as data:
-            data = data.read()
-            self.upload_blob(container_name, blob_name, data, len(data))
+    def get_blob(self, container_name, blob_name):
+        blob_client = self.blob_client(container_name, blob_name)
+        return blob_client.get_blob_properties(container_name, blob_name)
 
     def delete_blob(self, container_name, blob_name, delete_snapshots="include"):
         blob_client = self.blob_client(container_name, blob_name)

+ 10 - 16
cloudbridge/providers/azure/resources.py

@@ -228,26 +228,20 @@ class AzureBucketObject(BaseBucketObject):
 
         return iterable_to_stream(blob_iterator())
 
-    def upload(self, data):
-        """
-        Set the contents of this object to the data read from the source
-        string.
-        """
-        try:
-            self._provider.azure_client.create_blob_from_text(
-                self._container.id, self.id, data)
-            return True
-        except AzureException as azureEx:
-            log.exception(azureEx)
-            return False
+    @property
+    def bucket(self):
+        return self._container
 
-    def upload_from_file(self, path):
+    def _upload_single_shot(self, data):
         """
-        Store the contents of the file pointed by the "path" variable.
+        Upload the object in a single request. ``data`` may be text, bytes or
+        a file-like object; the Azure SDK streams file-like data rather than
+        buffering it all in memory. Larger uploads are handled transparently
+        by the base class via the multipart path.
         """
         try:
-            self._provider.azure_client.create_blob_from_file(
-                self._container.name, self.name, path)
+            self._provider.azure_client.upload_blob(
+                self._container.id, self.id, data)
             return True
         except AzureException as azureEx:
             log.exception(azureEx)

+ 44 - 1
cloudbridge/providers/azure/services.py

@@ -4,7 +4,8 @@ import uuid
 
 import cloudbridge.base.helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
-from cloudbridge.base.resources import (ClientPagedResultList,
+from cloudbridge.base.resources import (BaseMultipartUpload, BaseUploadPart,
+                                        ClientPagedResultList,
                                         ServerPagedResultList)
 from cloudbridge.base.services import (BaseBucketObjectService,
                                        BaseBucketService, BaseComputeService,
@@ -593,6 +594,48 @@ class AzureBucketObjectService(BaseBucketObjectService):
         blob_client.upload_blob('')
         return AzureBucketObject(self.provider, bucket, blob_client.get_blob_properties())
 
+    @staticmethod
+    def _block_id(upload_id, part_number):
+        # Azure requires every block id for a blob to be the same length and
+        # base64-encoded. The upload id is a fixed-length uuid hex, so the
+        # encoded ids are always equal length.
+        raw = "{0}-{1:08d}".format(upload_id, part_number)
+        return base64.b64encode(raw.encode()).decode()
+
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        # Azure block blobs have no server-side "initiate" step; the upload id
+        # only namespaces this upload's block ids.
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   uuid.uuid4().hex)
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        block_id = self._block_id(upload.id, part_number)
+        self.provider.azure_client.stage_block(
+            bucket.name, upload.object_name, block_id, data)
+        return BaseUploadPart(part_number, block_id)
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        self.provider.azure_client.commit_block_list(
+            bucket.name, upload.object_name, [p.etag for p in ordered])
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        # Azure has no server-side abort: uncommitted blocks are garbage
+        # collected automatically (after ~7 days), so there is nothing to do.
+        log.debug("Azure has no multipart abort; uncommitted blocks for "
+                  "%s/%s will expire automatically.",
+                  bucket.name, upload.object_name)
+
 
 class AzureComputeService(BaseComputeService):
     def __init__(self, provider):

+ 12 - 4
cloudbridge/providers/gcp/resources.py

@@ -1949,14 +1949,22 @@ class GCPBucketObject(BaseBucketObject):
                                          object=self.name)
                               .execute())
 
-    def upload(self, data):
+    @property
+    def bucket(self):
+        return self._bucket
+
+    def _upload_single_shot(self, data):
         """
-        Set the contents of this object to the given text.
+        Set the contents of this object in a single request. ``data`` may be
+        text, bytes or a seekable file-like object. Larger uploads are handled
+        transparently by the base class via the multipart (compose) path.
         """
-        if type(data) is str:
+        if isinstance(data, str):
             data = data.encode()
+        if isinstance(data, (bytes, bytearray)):
+            data = io.BytesIO(data)
         media_body = googleapiclient.http.MediaIoBaseUpload(
-                io.BytesIO(data), mimetype='plain/text')
+                data, mimetype='application/octet-stream')
         # pylint:disable=protected-access
         response = (self._provider
                         .storage._bucket_objects

+ 118 - 0
cloudbridge/providers/gcp/services.py

@@ -9,6 +9,8 @@ import googleapiclient
 
 from cloudbridge.base import helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
+from cloudbridge.base.resources import BaseMultipartUpload
+from cloudbridge.base.resources import BaseUploadPart
 from cloudbridge.base.resources import ClientPagedResultList
 from cloudbridge.base.resources import ServerPagedResultList
 from cloudbridge.base.services import BaseBucketObjectService
@@ -1497,6 +1499,122 @@ class GCPBucketObjectService(BaseBucketObjectService):
                                bucket,
                                response) if response else None
 
+    # GCS has no independent "upload part" API, so multipart is emulated by
+    # uploading each part as a temporary object and assembling them with the
+    # compose API. ``compose`` accepts at most this many source objects per
+    # call, so larger uploads are composed in chained batches.
+    _MAX_COMPOSE_SOURCES = 32
+
+    @staticmethod
+    def _temp_prefix(upload):
+        return ".cb-mpu/{0}/{1}/".format(upload.object_name, upload.id)
+
+    @classmethod
+    def _temp_part_name(cls, upload, part_number):
+        return "{0}part-{1:05d}".format(cls._temp_prefix(upload), part_number)
+
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        # No server-side initiation; the upload id namespaces this upload's
+        # temporary part objects.
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   uuid.uuid4().hex)
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        if isinstance(data, str):
+            data = data.encode()
+        if isinstance(data, (bytes, bytearray)):
+            data = io.BytesIO(data)
+        media_body = googleapiclient.http.MediaIoBaseUpload(
+            data, mimetype='application/octet-stream')
+        temp_name = self._temp_part_name(upload, part_number)
+        self._create_object_with_media_body(bucket, temp_name, media_body)
+        return BaseUploadPart(part_number, temp_name)
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        sources = [p.etag for p in ordered]
+        intermediates = self._compose(bucket, upload, upload.object_name,
+                                      sources)
+        # The temporary part objects and any compose intermediates are real,
+        # billable objects, so remove them once assembled.
+        self._delete_objects(bucket, sources + intermediates,
+                             ignore_missing=True)
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        self._delete_objects(bucket, self._list_temp_objects(bucket, upload),
+                             ignore_missing=True)
+
+    def _compose(self, bucket, upload, destination, sources):
+        """
+        Compose ``sources`` into ``destination``, chaining through
+        intermediate objects when there are more than ``_MAX_COMPOSE_SOURCES``.
+        Returns the list of intermediate object names created (to be cleaned
+        up by the caller).
+        """
+        intermediates = []
+        level = sources
+        batch = 0
+        while len(level) > self._MAX_COMPOSE_SOURCES:
+            next_level = []
+            for i in range(0, len(level), self._MAX_COMPOSE_SOURCES):
+                group = level[i:i + self._MAX_COMPOSE_SOURCES]
+                name = "{0}compose-{1:05d}".format(
+                    self._temp_prefix(upload), batch)
+                self._compose_once(bucket, name, group)
+                intermediates.append(name)
+                next_level.append(name)
+                batch += 1
+            level = next_level
+        self._compose_once(bucket, destination, level)
+        return intermediates
+
+    def _compose_once(self, bucket, destination, sources):
+        (self.provider
+             .gcp_storage
+             .objects()
+             .compose(destinationBucket=bucket.name,
+                      destinationObject=destination,
+                      body={'sourceObjects': [{'name': s} for s in sources]})
+             .execute())
+
+    def _list_temp_objects(self, bucket, upload):
+        prefix = self._temp_prefix(upload)
+        names = []
+        page_token = None
+        while True:
+            response = (self.provider
+                            .gcp_storage
+                            .objects()
+                            .list(bucket=bucket.name, prefix=prefix,
+                                  pageToken=page_token)
+                            .execute())
+            names.extend(o['name'] for o in response.get('items', []))
+            page_token = response.get('nextPageToken')
+            if not page_token:
+                return names
+
+    def _delete_objects(self, bucket, names, ignore_missing=False):
+        for name in names:
+            try:
+                (self.provider
+                     .gcp_storage
+                     .objects()
+                     .delete(bucket=bucket.name, object=name)
+                     .execute())
+            except googleapiclient.errors.HttpError:
+                if not ignore_missing:
+                    raise
+
 
 class GCPGatewayService(BaseGatewayService):
     _DEFAULT_GATEWAY_NAME = 'default-internet-gateway'

+ 9 - 4
cloudbridge/providers/openstack/resources.py

@@ -1314,12 +1314,17 @@ class OpenStackBucketObject(BaseBucketObject):
             self.cbcontainer.name, self.name, resp_chunk_size=65536)
         return content
 
-    def upload(self, data):
+    @property
+    def bucket(self):
+        return self.cbcontainer
+
+    def _upload_single_shot(self, data):
         """
-        Set the contents of this object to the data read from the source
-        string.
+        Set the contents of this object in a single request.
 
-        .. warning:: Will fail if the data is larger than 5 Gig.
+        Inputs larger than the multipart threshold are handled transparently
+        by the base class via the Static Large Object (SLO) multipart path, so
+        this single-request path only runs for smaller payloads.
         """
         self._provider.swift.put_object(self.cbcontainer.name, self.name,
                                         data)

+ 59 - 0
cloudbridge/providers/openstack/services.py

@@ -1,7 +1,9 @@
 """
 Services implemented by the OpenStack provider.
 """
+import json
 import logging
+import uuid
 
 from neutronclient.common.exceptions import NeutronClientException
 from neutronclient.common.exceptions import PortNotFoundClient
@@ -18,6 +20,8 @@ from swiftclient import ClientException as SwiftClientException
 import cloudbridge.base.helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
 from cloudbridge.base.resources import BaseLaunchConfig
+from cloudbridge.base.resources import BaseMultipartUpload
+from cloudbridge.base.resources import BaseUploadPart
 from cloudbridge.base.resources import ClientPagedResultList
 from cloudbridge.base.services import BaseBucketObjectService
 from cloudbridge.base.services import BaseBucketService
@@ -653,6 +657,61 @@ class OpenStackBucketObjectService(BaseBucketObjectService):
         self.provider.swift.put_object(bucket.name, object_name, None)
         return self.get(bucket, object_name)
 
+    @staticmethod
+    def _segment_prefix(upload):
+        return "{0}/slo/{1}/".format(upload.object_name, upload.id)
+
+    @classmethod
+    def _segment_name(cls, upload, part_number):
+        return "{0}{1:08d}".format(cls._segment_prefix(upload), part_number)
+
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        # Swift has no server-side initiation; the upload id namespaces this
+        # upload's Static Large Object segments.
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   uuid.uuid4().hex)
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        if isinstance(data, str):
+            data = data.encode()
+        if not isinstance(data, (bytes, bytearray)):
+            data = data.read()
+        segment_name = self._segment_name(upload, part_number)
+        etag = self.provider.swift.put_object(
+            bucket.name, segment_name, data)
+        # Retain the manifest entry needed to assemble the SLO on complete.
+        return BaseUploadPart(part_number, {
+            'path': "/{0}/{1}".format(bucket.name, segment_name),
+            'etag': etag,
+            'size_bytes': len(data)})
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        manifest = [p.etag for p in ordered]
+        self.provider.swift.put_object(
+            bucket.name, upload.object_name, json.dumps(manifest),
+            query_string='multipart-manifest=put')
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        prefix = self._segment_prefix(upload)
+        _, object_list = self.provider.swift.get_container(
+            bucket.name, prefix=prefix)
+        for obj in object_list:
+            try:
+                self.provider.swift.delete_object(bucket.name, obj.get('name'))
+            except SwiftClientException:
+                pass  # idempotent: ignore already-deleted segments
+
 
 class OpenStackComputeService(BaseComputeService):
 

+ 157 - 0
tests/test_object_store_service.py

@@ -3,12 +3,15 @@ import os
 import tempfile
 from datetime import datetime
 from io import BytesIO
+from unittest import mock
 from unittest import skip
 
 import requests
 
 from cloudbridge.base import helpers as cb_helpers
+from cloudbridge.base.resources import BaseBucketObject
 from cloudbridge.interfaces.exceptions import DuplicateResourceException
+from cloudbridge.interfaces.exceptions import InvalidValueException
 from cloudbridge.interfaces.provider import TestMockHelperMixin
 from cloudbridge.interfaces.resources import Bucket
 from cloudbridge.interfaces.resources import BucketObject
@@ -17,6 +20,10 @@ from tests import helpers
 from tests.helpers import ProviderTestBase
 from tests.helpers import standard_interface_tests as sit
 
+# S3 (and Swift) require every part except the last to be >= 5 MiB. Tests use
+# this size so they remain valid against real cloud providers, not just moto.
+MIN_PART_SIZE = 5 * 1024 * 1024
+
 
 class CloudObjectStoreServiceTestCase(ProviderTestBase):
 
@@ -240,6 +247,156 @@ class CloudObjectStoreServiceTestCase(ProviderTestBase):
                 with open(test_file, 'rb') as f:
                     self.assertEqual(target_stream.getvalue(), f.read())
 
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_explicit_multipart_upload_roundtrip(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "mpu-roundtrip.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                part1 = b"a" * MIN_PART_SIZE
+                part2 = b"b" * MIN_PART_SIZE
+                part3 = b"c" * 1024  # final part may be smaller than the min
+                expected = part1 + part2 + part3
+
+                upload = obj.create_multipart_upload()
+                parts = [upload.upload_part(1, part1),
+                         upload.upload_part(2, part2),
+                         upload.upload_part(3, part3)]
+                upload.complete(parts)
+
+                stored = test_bucket.objects.get(obj_name)
+                self.assertIsNotNone(
+                    stored, "Object should exist after multipart completion")
+                self.assertEqual(stored.size, len(expected))
+                target_stream = BytesIO()
+                stored.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), expected)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_multipart_upload_out_of_order_parts(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "mpu-ooo.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                part1 = b"1" * MIN_PART_SIZE
+                part2 = b"2" * MIN_PART_SIZE
+                part3 = b"3" * 1024
+                expected = part1 + part2 + part3
+
+                upload = obj.create_multipart_upload()
+                # Upload and collect parts out of order; complete must
+                # assemble them in ascending part-number order regardless.
+                p3 = upload.upload_part(3, part3)
+                p1 = upload.upload_part(1, part1)
+                p2 = upload.upload_part(2, part2)
+                upload.complete([p3, p1, p2])
+
+                stored = test_bucket.objects.get(obj_name)
+                target_stream = BytesIO()
+                stored.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), expected)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_multipart_upload_abort(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "mpu-abort.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            upload = obj.create_multipart_upload()
+            upload.upload_part(1, b"a" * MIN_PART_SIZE)
+            upload.abort()
+
+            # Aborting must not materialise the target object.
+            self.assertIsNone(
+                test_bucket.objects.get(obj_name),
+                "Object should not exist after a multipart upload is aborted")
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_transparent_upload_large_stream_uses_multipart(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "transparent.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                content = b"x" * (MIN_PART_SIZE * 2 + 1024)
+
+                # Lower the threshold/part size so a modest stream triggers
+                # the multipart path, and assert it is actually taken.
+                svc = self.provider.storage._bucket_objects
+                with mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_THRESHOLD',
+                        MIN_PART_SIZE), \
+                    mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_PART_SIZE',
+                        MIN_PART_SIZE), \
+                    mock.patch.object(
+                        svc, 'create_multipart_upload',
+                        wraps=svc.create_multipart_upload) as spy:
+                    obj.upload(BytesIO(content))
+
+                spy.assert_called_once()
+                stored = test_bucket.objects.get(obj_name)
+                self.assertEqual(stored.size, len(content))
+                target_stream = BytesIO()
+                stored.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), content)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_small_upload_stays_single_shot(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj = test_bucket.objects.create("small.txt")
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                content = b"a small payload below the multipart threshold"
+
+                # A payload below the threshold must not trigger multipart.
+                svc = self.provider.storage._bucket_objects
+                with mock.patch.object(
+                        svc, 'create_multipart_upload',
+                        wraps=svc.create_multipart_upload) as spy:
+                    obj.upload(content)
+
+                spy.assert_not_called()
+                target_stream = BytesIO()
+                obj.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), content)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_multipart_part_size_below_minimum_raises(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj = test_bucket.objects.create("badpartsize.bin")
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                content = b"x" * 4096
+
+                # A part size below the 5 MiB portable minimum is invalid.
+                with mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_THRESHOLD', 1024), \
+                    mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_PART_SIZE', 1024):
+                    with self.assertRaises(InvalidValueException):
+                        obj.upload(BytesIO(content))
+
     @skip("Skip unless you want to test objects bigger than 5GB")
     @helpers.skipIfNoService(['storage.buckets'])
     def test_upload_download_bucket_content_with_large_file(self):