Преглед изворни кода

Merge pull request #333 from CloudVE/add-multipart-upload

Add cross-provider multipart upload support
Nuwan Goonasekera пре 21 часа
родитељ
комит
e5fb48527a

+ 268 - 0
cloudbridge/base/resources.py

@@ -2,18 +2,24 @@
 Base implementation for data objects exposed through a provider or service
 """
 import inspect
+import io
 import itertools
 import logging
 import os
+import queue
 import re
 import shutil
 import time
 import uuid
+from concurrent.futures import FIRST_COMPLETED
+from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import wait
 
 from cloudbridge.interfaces.exceptions import \
     InvalidConfigurationException
 from cloudbridge.interfaces.exceptions import InvalidLabelException
 from cloudbridge.interfaces.exceptions import InvalidNameException
+from cloudbridge.interfaces.exceptions import InvalidValueException
 from cloudbridge.interfaces.exceptions import WaitStateException
 from cloudbridge.interfaces.resources import AttachmentInfo
 from cloudbridge.interfaces.resources import Bucket
@@ -31,6 +37,7 @@ from cloudbridge.interfaces.resources import KeyPair
 from cloudbridge.interfaces.resources import LaunchConfig
 from cloudbridge.interfaces.resources import MachineImage
 from cloudbridge.interfaces.resources import MachineImageState
+from cloudbridge.interfaces.resources import MultipartUpload
 from cloudbridge.interfaces.resources import Network
 from cloudbridge.interfaces.resources import NetworkState
 from cloudbridge.interfaces.resources import ObjectLifeCycleMixin
@@ -43,6 +50,7 @@ from cloudbridge.interfaces.resources import Snapshot
 from cloudbridge.interfaces.resources import SnapshotState
 from cloudbridge.interfaces.resources import Subnet
 from cloudbridge.interfaces.resources import SubnetState
+from cloudbridge.interfaces.resources import UploadPart
 from cloudbridge.interfaces.resources import VMFirewall
 from cloudbridge.interfaces.resources import VMFirewallRule
 from cloudbridge.interfaces.resources import VMType
@@ -680,6 +688,76 @@ class BaseRegion(BaseCloudResource, Region):
         return next(iter(self.zones))
 
 
+class BaseUploadPart(UploadPart):
+    """
+    A simple, serializable handle for a single uploaded part. Concrete
+    providers return these from ``upload_part`` and consume them in
+    ``complete_multipart_upload``.
+    """
+
+    def __init__(self, part_number, etag):
+        self._part_number = part_number
+        self._etag = etag
+
+    @property
+    def part_number(self):
+        return self._part_number
+
+    @property
+    def etag(self):
+        return self._etag
+
+    def __repr__(self):
+        return "<CB-{0}: {1} ({2})>".format(
+            self.__class__.__name__, self._part_number, self._etag)
+
+
+class BaseMultipartUpload(BaseCloudResource, MultipartUpload):
+    """
+    Base implementation of an in-progress multipart upload. It is a thin
+    handle that delegates the actual work to the provider's bucket-object
+    service, mirroring how other base resources delegate to their service
+    (e.g. ``BaseBucket.delete``).
+    """
+
+    def __init__(self, provider, bucket, object_name, upload_id):
+        super(BaseMultipartUpload, self).__init__(provider)
+        self._bucket = bucket
+        self._object_name = object_name
+        self._upload_id = upload_id
+
+    @property
+    def id(self):
+        return self._upload_id
+
+    @property
+    def name(self):
+        return self._object_name
+
+    @property
+    def bucket(self):
+        return self._bucket
+
+    @property
+    def object_name(self):
+        return self._object_name
+
+    def upload_part(self, part_number, data):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects.upload_part(
+            self._bucket, self, part_number, data)
+
+    def complete(self, parts):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects\
+            .complete_multipart_upload(self._bucket, self, parts)
+
+    def abort(self):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects\
+            .abort_multipart_upload(self._bucket, self)
+
+
 class BaseBucketObject(BaseCloudResource, BucketObject):
 
     # Regular expression for valid bucket keys.
@@ -690,6 +768,19 @@ class BaseBucketObject(BaseCloudResource, BucketObject):
     # s/537772/what-is-the-most-correct-regular-expression-for-a-unix-file-path
     CB_NAME_PATTERN = re.compile(r"[^\0]+")
 
+    # Uploads larger than this many bytes are split into parts.
+    CB_MULTIPART_THRESHOLD = int(os.environ.get(
+        'CB_MULTIPART_THRESHOLD', 100 * 1024 * 1024))   # 100 MiB
+    # The size of each part for multipart uploads.
+    CB_MULTIPART_PART_SIZE = int(os.environ.get(
+        'CB_MULTIPART_PART_SIZE', 50 * 1024 * 1024))    # 50 MiB
+    # Portable floor: S3 and Swift reject non-final parts smaller than 5 MiB,
+    # so part sizes below this are rejected up-front.
+    CB_MULTIPART_MIN_PART_SIZE = 5 * 1024 * 1024
+    # Number of parts uploaded in parallel by the transparent multipart path.
+    CB_MULTIPART_MAX_CONCURRENCY = int(os.environ.get(
+        'CB_MULTIPART_MAX_CONCURRENCY', 5))
+
     def __init__(self, provider):
         super(BaseBucketObject, self).__init__(provider)
 
@@ -711,6 +802,183 @@ class BaseBucketObject(BaseCloudResource, BucketObject):
     def save_content(self, target_stream):
         shutil.copyfileobj(self.iter_content(), target_stream)
 
+    # The three resolvers below pick, in order of precedence: an explicit
+    # per-call UploadConfig field, the provider/global config, then the class
+    # default constant.
+    def _multipart_threshold(self, config=None):
+        if config is not None and config.threshold is not None:
+            return int(config.threshold)
+        # pylint:disable=protected-access
+        return int(self._provider._get_config_value(
+            'multipart_threshold', self.CB_MULTIPART_THRESHOLD))
+
+    def _multipart_part_size(self, config=None):
+        if config is not None and config.part_size is not None:
+            return int(config.part_size)
+        # pylint:disable=protected-access
+        return int(self._provider._get_config_value(
+            'multipart_part_size', self.CB_MULTIPART_PART_SIZE))
+
+    def _multipart_max_concurrency(self, config=None):
+        if config is not None and config.max_concurrency is not None:
+            return int(config.max_concurrency)
+        # pylint:disable=protected-access
+        return int(self._provider._get_config_value(
+            'multipart_max_concurrency', self.CB_MULTIPART_MAX_CONCURRENCY))
+
+    @staticmethod
+    def _data_size(data):
+        """
+        Best-effort size of an upload payload, or ``None`` if it cannot be
+        determined without consuming the data (e.g. a non-seekable stream).
+        """
+        if isinstance(data, str):
+            return len(data.encode('utf-8'))
+        if isinstance(data, (bytes, bytearray)):
+            return len(data)
+        if hasattr(data, 'seek') and hasattr(data, 'tell'):
+            try:
+                pos = data.tell()
+                data.seek(0, os.SEEK_END)
+                size = data.tell()
+                data.seek(pos)
+                return size
+            except (OSError, ValueError):
+                return None
+        return None
+
+    @staticmethod
+    def _as_stream(data):
+        if isinstance(data, str):
+            data = data.encode('utf-8')
+        if isinstance(data, (bytes, bytearray)):
+            return io.BytesIO(data)
+        return data
+
+    def upload(self, data, config=None):
+        size = self._data_size(data)
+        if size is not None and size > self._multipart_threshold(config):
+            return self._upload_multipart(self._as_stream(data), config)
+        return self._upload_single_shot(data)
+
+    def upload_from_file(self, path, config=None):
+        if os.path.getsize(path) > self._multipart_threshold(config):
+            with open(path, 'rb') as f:
+                return self._upload_multipart(f, config)
+        return self._upload_from_file_single_shot(path)
+
+    def _upload_multipart(self, stream, config=None):
+        """
+        Drive the explicit multipart lifecycle over a stream, reading it one
+        part at a time so the whole payload is never held in memory.
+
+        Parts are uploaded across a bounded thread pool. To stay safe even on
+        providers whose SDK client/connection is not thread-safe, each worker
+        uploads through its own cloned provider (see :meth:`.CloudProvider.
+        clone`), so no provider state is shared between threads. Any failure
+        aborts the upload to avoid leaking staged parts.
+
+        Providers with an efficient, thread-safe native uploader (e.g. AWS via
+        boto3's ``upload_fileobj``) override this method to use it directly.
+        """
+        part_size = self._multipart_part_size(config)
+        if part_size < self.CB_MULTIPART_MIN_PART_SIZE:
+            raise InvalidValueException('multipart_part_size', part_size)
+
+        concurrency = max(1, self._multipart_max_concurrency(config))
+        upload = self.create_multipart_upload()
+        try:
+            if concurrency == 1:
+                parts = self._upload_parts_serially(upload, stream, part_size)
+            else:
+                parts = self._upload_parts_concurrently(
+                    upload, stream, part_size, concurrency)
+            return upload.complete(parts)
+        except Exception:
+            upload.abort()
+            raise
+
+    def _upload_parts_serially(self, upload, stream, part_size):
+        parts = []
+        part_number = 1
+        while True:
+            chunk = self._read_part(stream, part_size)
+            if not chunk:
+                break
+            parts.append(upload.upload_part(part_number, chunk))
+            part_number += 1
+        return parts
+
+    def _upload_parts_concurrently(self, upload, stream, part_size,
+                                   concurrency):
+        # A pool of cloned bucket-object services, one per worker, so each
+        # thread touches an isolated provider/connection.
+        clones = queue.Queue()
+        for _ in range(concurrency):
+            # pylint:disable=protected-access
+            clones.put(self._provider.clone().storage._bucket_objects)
+
+        def upload_one(part_number, chunk):
+            service = clones.get()
+            try:
+                return service.upload_part(
+                    upload.bucket, upload, part_number, chunk)
+            finally:
+                clones.put(service)
+
+        parts = []
+        in_flight = set()
+        part_number = 1
+        depleted = False
+        with ThreadPoolExecutor(max_workers=concurrency) as executor:
+            while not depleted or in_flight:
+                # Keep the pool fed but never read more than ``concurrency``
+                # parts ahead, bounding memory to ~concurrency * part_size.
+                while not depleted and len(in_flight) < concurrency:
+                    chunk = self._read_part(stream, part_size)
+                    if not chunk:
+                        depleted = True
+                        break
+                    in_flight.add(
+                        executor.submit(upload_one, part_number, chunk))
+                    part_number += 1
+                if not in_flight:
+                    break
+                done, in_flight = wait(
+                    in_flight, return_when=FIRST_COMPLETED)
+                for future in done:
+                    parts.append(future.result())
+        return parts
+
+    @staticmethod
+    def _read_part(stream, part_size):
+        """
+        Read exactly ``part_size`` bytes from ``stream`` (fewer only at EOF),
+        coalescing short reads so non-final parts always meet the provider
+        minimum part size.
+        """
+        buffer = bytearray()
+        while len(buffer) < part_size:
+            chunk = stream.read(part_size - len(buffer))
+            if not chunk:
+                break
+            buffer.extend(chunk)
+        return bytes(buffer)
+
+    def _upload_from_file_single_shot(self, path):
+        """
+        Default small-file upload: read the file and hand it to the provider's
+        single-shot upload. Providers with a more efficient native file upload
+        (e.g. AWS ``upload_file``) override :meth:`upload_from_file` directly.
+        """
+        with open(path, 'rb') as f:
+            return self._upload_single_shot(f)
+
+    def create_multipart_upload(self):
+        # pylint:disable=protected-access
+        return self._provider.storage._bucket_objects.create_multipart_upload(
+            self.bucket, self.name)
+
     def __eq__(self, other):
         return (isinstance(other, BucketObject) and
                 # pylint:disable=protected-access

+ 181 - 2
cloudbridge/interfaces/resources.py

@@ -2125,6 +2125,151 @@ class VMFirewallRule(CloudResource):
         pass
 
 
+class UploadConfig(object):
+    """
+    Provider-agnostic, per-call tuning for an object upload.
+
+    Passed optionally to :meth:`.BucketObject.upload` and
+    :meth:`.BucketObject.upload_from_file`. Any field left as ``None`` falls
+    back to the provider/global configuration (the ``CB_MULTIPART_*`` settings).
+    Each provider maps these fields onto its native transfer mechanism.
+    """
+
+    def __init__(self, threshold=None, part_size=None, max_concurrency=None):
+        """
+        :type threshold: ``int``
+        :param threshold: Size in bytes above which the upload is split into
+            multiple parts.
+
+        :type part_size: ``int``
+        :param part_size: Size in bytes of each part. Must be at least the
+            provider minimum (5 MiB on S3) for all but the final part.
+
+        :type max_concurrency: ``int``
+        :param max_concurrency: Maximum number of parts to upload in parallel.
+        """
+        self.threshold = threshold
+        self.part_size = part_size
+        self.max_concurrency = max_concurrency
+
+    def __repr__(self):
+        return ("<CB-UploadConfig: threshold={0}, part_size={1}, "
+                "max_concurrency={2}>".format(
+                    self.threshold, self.part_size, self.max_concurrency))
+
+
+class UploadPart(object):
+    """
+    A handle for a single part uploaded as part of a multipart upload.
+
+    Returned by :meth:`.MultipartUpload.upload_part`. Callers must retain the
+    parts they receive and pass them to :meth:`.MultipartUpload.complete`.
+    The handle is a simple, serializable value object so that parts may be
+    collected across threads or processes when uploading in parallel.
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractproperty
+    def part_number(self):
+        """
+        The 1-based index of this part within the upload.
+
+        :rtype: ``int``
+        :return: The part number supplied to ``upload_part``.
+        """
+        pass
+
+    @abstractproperty
+    def etag(self):
+        """
+        Opaque provider handle identifying the stored part.
+
+        Its concrete form varies by provider (e.g. an S3 ETag, an Azure block
+        id, a GCS temp-object name, or a Swift segment descriptor) and should
+        not be interpreted by clients.
+
+        :rtype: ``object``
+        :return: The provider-specific part handle.
+        """
+        pass
+
+
+class MultipartUpload(CloudResource):
+    """
+    Represents an in-progress, multi-part upload to a :class:`.BucketObject`.
+
+    Created via :meth:`.BucketObject.create_multipart_upload`. Parts are
+    uploaded with :meth:`upload_part` (in any order, optionally in parallel)
+    and the upload is finalized with :meth:`complete` or cancelled with
+    :meth:`abort`.
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractproperty
+    def bucket(self):
+        """
+        The bucket this upload targets.
+
+        :rtype: :class:`.Bucket`
+        :return: The target Bucket.
+        """
+        pass
+
+    @abstractproperty
+    def object_name(self):
+        """
+        The key of the object being uploaded.
+
+        :rtype: ``str``
+        :return: The target object name.
+        """
+        pass
+
+    @abstractmethod
+    def upload_part(self, part_number, data):
+        """
+        Upload a single part of this multipart upload.
+
+        :type part_number: ``int``
+        :param part_number: 1-based index that determines the part's position
+            in the final object. Part numbers must be unique within an upload.
+
+        :type data: ``bytes`` or file-like object
+        :param data: The part payload. Every part except the last must be at
+            least the provider minimum part size (5 MiB on S3).
+
+        :rtype: :class:`.UploadPart`
+        :return: A part handle that MUST be retained and passed to
+            :meth:`complete`.
+        """
+        pass
+
+    @abstractmethod
+    def complete(self, parts):
+        """
+        Finalize the upload, assembling parts in ascending ``part_number``.
+
+        :type parts: ``list`` of :class:`.UploadPart`
+        :param parts: The handles returned by :meth:`upload_part`. They may be
+            supplied in any order.
+
+        :rtype: :class:`.BucketObject`
+        :return: The completed object.
+        """
+        pass
+
+    @abstractmethod
+    def abort(self):
+        """
+        Cancel the upload, releasing any staged parts or temporary objects.
+
+        ``abort`` is idempotent and safe to call after a partial upload. On
+        Azure it is best-effort: there is no server-side cancel, so uncommitted
+        blocks are left to expire (after ~7 days) rather than being deleted.
+        """
+        pass
+
+
 class BucketObject(CloudResource):
     """
     Represents an object stored within a bucket.
@@ -2185,22 +2330,56 @@ class BucketObject(CloudResource):
         pass
 
     @abstractmethod
-    def upload(self, source_stream):
+    def upload(self, source_stream, config=None):
         """
         Set the contents of the object to the data read from the source stream.
 
+        :type config: :class:`.UploadConfig`
+        :param config: Optional per-call upload tuning (multipart threshold,
+            part size, concurrency). Any field left unset falls back to the
+            provider/global configuration.
+
         :rtype: ``bool``
         :return: ``True`` if successful.
         """
         pass
 
     @abstractmethod
-    def upload_from_file(self, path):
+    def upload_from_file(self, path, config=None):
         """
         Store the contents of the file pointed by the "path" variable.
 
+        Files larger than the configured multipart threshold are streamed to
+        the provider in parts, so the whole file is never held in memory.
+
         :type path: ``str``
         :param path: Absolute path to the file to be uploaded to S3.
+
+        :type config: :class:`.UploadConfig`
+        :param config: Optional per-call upload tuning (multipart threshold,
+            part size, concurrency). Any field left unset falls back to the
+            provider/global configuration.
+        """
+        pass
+
+    @abstractmethod
+    def create_multipart_upload(self):
+        """
+        Begin an explicit, multi-part upload to this object.
+
+        Returns a :class:`.MultipartUpload` handle whose lifecycle is::
+
+            upload = obj.create_multipart_upload()
+            parts = [upload.upload_part(n, chunk) for n, chunk in ...]
+            upload.complete(parts)   # or upload.abort()
+
+        Parts may be uploaded in any order and in parallel; every part except
+        the last must be at least the provider minimum part size (5 MiB on
+        S3). :meth:`.MultipartUpload.complete` assembles them in ascending
+        ``part_number`` order.
+
+        :rtype: :class:`.MultipartUpload`
+        :return: An in-progress multipart upload handle for this object.
         """
         pass
 

+ 71 - 0
cloudbridge/interfaces/services.py

@@ -1225,6 +1225,77 @@ class BucketObjectService(CloudService):
         """
         pass
 
+    @abstractmethod
+    def create_multipart_upload(self, bucket, object_name):
+        """
+        Begin an explicit, multi-part upload of an object to a bucket.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket to upload into.
+
+        :type object_name: ``str``
+        :param object_name: The key of the object being uploaded.
+
+        :rtype: :class:`.MultipartUpload`
+        :return: An in-progress multipart upload handle.
+        """
+        pass
+
+    @abstractmethod
+    def upload_part(self, bucket, upload, part_number, data):
+        """
+        Upload a single part of an in-progress multipart upload.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket being uploaded into.
+
+        :type upload: :class:`.MultipartUpload`
+        :param upload: The in-progress upload, as returned by
+            :meth:`create_multipart_upload`.
+
+        :type part_number: ``int``
+        :param part_number: 1-based index of the part.
+
+        :type data: ``bytes`` or file-like object
+        :param data: The part payload.
+
+        :rtype: :class:`.UploadPart`
+        :return: A handle for the uploaded part.
+        """
+        pass
+
+    @abstractmethod
+    def complete_multipart_upload(self, bucket, upload, parts):
+        """
+        Finalize a multipart upload, assembling parts in part-number order.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket being uploaded into.
+
+        :type upload: :class:`.MultipartUpload`
+        :param upload: The in-progress upload to finalize.
+
+        :type parts: ``list`` of :class:`.UploadPart`
+        :param parts: The part handles returned by :meth:`upload_part`.
+
+        :rtype: :class:`.BucketObject`
+        :return: The completed object.
+        """
+        pass
+
+    @abstractmethod
+    def abort_multipart_upload(self, bucket, upload):
+        """
+        Cancel a multipart upload and release any staged parts.
+
+        :type bucket: :class:`.Bucket`
+        :param bucket: The bucket being uploaded into.
+
+        :type upload: :class:`.MultipartUpload`
+        :param upload: The in-progress upload to cancel.
+        """
+        pass
+
 
 class SecurityService(CloudService):
 

+ 28 - 3
cloudbridge/providers/aws/resources.py

@@ -5,6 +5,8 @@ import hashlib
 import inspect
 import logging
 
+from boto3.s3.transfer import TransferConfig
+
 from botocore.exceptions import ClientError
 
 import tenacity
@@ -866,14 +868,37 @@ class AWSBucketObject(BaseBucketObject):
     def last_modified(self):
         return self._obj.last_modified.strftime("%Y-%m-%dT%H:%M:%S.%f")
 
+    @property
+    def bucket(self):
+        return AWSBucket(self._provider,
+                         self._provider.s3_conn.Bucket(self._obj.bucket_name))
+
     def iter_content(self):
         return self.BucketObjIterator(self._obj.get().get('Body'))
 
-    def upload(self, data):
+    def _upload_single_shot(self, data):
         self._obj.put(Body=data)
 
-    def upload_from_file(self, path):
-        self._obj.upload_file(path)
+    def _upload_multipart(self, stream, config=None):
+        # boto3's TransferManager uploads parts concurrently with a thread-safe
+        # client, so the transparent multipart path delegates to it rather than
+        # CloudBridge's generic clone-pool driver.
+        transfer_config = TransferConfig(
+            multipart_threshold=self._multipart_part_size(config),
+            multipart_chunksize=self._multipart_part_size(config),
+            max_concurrency=self._multipart_max_concurrency(config))
+        self._obj.upload_fileobj(stream, Config=transfer_config)
+
+    def upload_from_file(self, path, config=None):
+        # boto3's upload_file streams large files in parts via its
+        # TransferManager. Drive it with CloudBridge's multipart knobs so that
+        # upload_from_file and upload() honour the same configuration rather
+        # than boto3's own defaults.
+        transfer_config = TransferConfig(
+            multipart_threshold=self._multipart_threshold(config),
+            multipart_chunksize=self._multipart_part_size(config),
+            max_concurrency=self._multipart_max_concurrency(config))
+        self._obj.upload_file(path, Config=transfer_config)
 
     def delete(self):
         self._obj.delete()

+ 36 - 0
cloudbridge/providers/aws/services.py

@@ -8,6 +8,8 @@ from botocore.exceptions import ClientError
 
 import cloudbridge.base.helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
+from cloudbridge.base.resources import BaseMultipartUpload
+from cloudbridge.base.resources import BaseUploadPart
 from cloudbridge.base.resources import ClientPagedResultList
 from cloudbridge.base.resources import ServerPagedResultList
 from cloudbridge.base.services import BaseBucketObjectService
@@ -613,6 +615,40 @@ class AWSBucketObjectService(BaseBucketObjectService):
         obj = bucket._bucket.Object(name)
         return AWSBucketObject(self.provider, obj)
 
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        response = self.provider.s3_conn.meta.client.create_multipart_upload(
+            Bucket=bucket.name, Key=object_name)
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   response['UploadId'])
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        response = self.provider.s3_conn.meta.client.upload_part(
+            Bucket=bucket.name, Key=upload.object_name,
+            UploadId=upload.id, PartNumber=part_number, Body=data)
+        return BaseUploadPart(part_number, response['ETag'])
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        self.provider.s3_conn.meta.client.complete_multipart_upload(
+            Bucket=bucket.name, Key=upload.object_name, UploadId=upload.id,
+            MultipartUpload={'Parts': [
+                {'PartNumber': p.part_number, 'ETag': p.etag}
+                for p in ordered]})
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        self.provider.s3_conn.meta.client.abort_multipart_upload(
+            Bucket=bucket.name, Key=upload.object_name, UploadId=upload.id)
+
 
 class AWSComputeService(BaseComputeService):
 

+ 15 - 16
cloudbridge/providers/azure/azure_client.py

@@ -30,8 +30,8 @@ from azure.mgmt.resource.resources.models import ResourceGroup
 from azure.mgmt.storage import StorageManagementClient
 from azure.mgmt.storage.models import Sku, StorageAccountCreateParameters
 from azure.mgmt.subscription import SubscriptionClient
-from azure.storage.blob import (BlobSasPermissions, BlobServiceClient,
-                                generate_blob_sas)
+from azure.storage.blob import (BlobBlock, BlobSasPermissions,
+                                BlobServiceClient, generate_blob_sas)
 
 from . import helpers as azure_helpers
 
@@ -469,25 +469,24 @@ class AzureClient(object):
         container_client = self.get_container(container_name)
         return container_client.list_blobs(name_starts_with=prefix, include=include)
 
-    def upload_blob(self, container_name, blob_name, data, length=None):
+    def upload_blob(self, container_name, blob_name, data, length=None,
+                    max_concurrency=1):
         blob_client = self.blob_client(container_name, blob_name)
-        blob_client.upload_blob(data=data, length=length, overwrite=True)
+        blob_client.upload_blob(data=data, length=length, overwrite=True,
+                                max_concurrency=max_concurrency)
 
-    def get_blob(self, container_name, blob_name):
+    def stage_block(self, container_name, blob_name, block_id, data):
         blob_client = self.blob_client(container_name, blob_name)
-        return blob_client.get_blob_properties(container_name, blob_name)
+        blob_client.stage_block(block_id, data)
 
-    def create_blob_from_text(self, container_name, blob_name, text):
-        if isinstance(text, bytes):
-            length = len(text)
-        else:
-            length = len(text.encode())
-        self.upload_blob(container_name, blob_name, text, length)
+    def commit_block_list(self, container_name, blob_name, block_ids):
+        blob_client = self.blob_client(container_name, blob_name)
+        block_list = [BlobBlock(block_id=block_id) for block_id in block_ids]
+        blob_client.commit_block_list(block_list)
 
-    def create_blob_from_file(self, container_name, blob_name, file_path, length=None):
-        with open(file_path, 'rb') as data:
-            data = data.read()
-            self.upload_blob(container_name, blob_name, data, len(data))
+    def get_blob(self, container_name, blob_name):
+        blob_client = self.blob_client(container_name, blob_name)
+        return blob_client.get_blob_properties(container_name, blob_name)
 
     def delete_blob(self, container_name, blob_name, delete_snapshots="include"):
         blob_client = self.blob_client(container_name, blob_name)

+ 17 - 10
cloudbridge/providers/azure/resources.py

@@ -228,26 +228,33 @@ class AzureBucketObject(BaseBucketObject):
 
         return iterable_to_stream(blob_iterator())
 
-    def upload(self, data):
+    @property
+    def bucket(self):
+        return self._container
+
+    def _upload_single_shot(self, data):
         """
-        Set the contents of this object to the data read from the source
-        string.
+        Upload the object in a single request. ``data`` may be text, bytes or
+        a file-like object; the Azure SDK streams file-like data rather than
+        buffering it all in memory. Larger uploads are handled transparently
+        by the base class via the multipart path.
         """
         try:
-            self._provider.azure_client.create_blob_from_text(
+            self._provider.azure_client.upload_blob(
                 self._container.id, self.id, data)
             return True
         except AzureException as azureEx:
             log.exception(azureEx)
             return False
 
-    def upload_from_file(self, path):
-        """
-        Store the contents of the file pointed by the "path" variable.
-        """
+    def _upload_multipart(self, stream, config=None):
+        # The Azure SDK's upload_blob stages blocks concurrently (max_concurrency
+        # workers) over a thread-safe client, so the transparent multipart path
+        # delegates to it rather than CloudBridge's generic clone-pool driver.
         try:
-            self._provider.azure_client.create_blob_from_file(
-                self._container.name, self.name, path)
+            self._provider.azure_client.upload_blob(
+                self._container.id, self.id, stream,
+                max_concurrency=self._multipart_max_concurrency(config))
             return True
         except AzureException as azureEx:
             log.exception(azureEx)

+ 44 - 1
cloudbridge/providers/azure/services.py

@@ -4,7 +4,8 @@ import uuid
 
 import cloudbridge.base.helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
-from cloudbridge.base.resources import (ClientPagedResultList,
+from cloudbridge.base.resources import (BaseMultipartUpload, BaseUploadPart,
+                                        ClientPagedResultList,
                                         ServerPagedResultList)
 from cloudbridge.base.services import (BaseBucketObjectService,
                                        BaseBucketService, BaseComputeService,
@@ -593,6 +594,48 @@ class AzureBucketObjectService(BaseBucketObjectService):
         blob_client.upload_blob('')
         return AzureBucketObject(self.provider, bucket, blob_client.get_blob_properties())
 
+    @staticmethod
+    def _block_id(upload_id, part_number):
+        # Azure requires every block id for a blob to be the same length and
+        # base64-encoded. The upload id is a fixed-length uuid hex, so the
+        # encoded ids are always equal length.
+        raw = "{0}-{1:08d}".format(upload_id, part_number)
+        return base64.b64encode(raw.encode()).decode()
+
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        # Azure block blobs have no server-side "initiate" step; the upload id
+        # only namespaces this upload's block ids.
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   uuid.uuid4().hex)
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        block_id = self._block_id(upload.id, part_number)
+        self.provider.azure_client.stage_block(
+            bucket.name, upload.object_name, block_id, data)
+        return BaseUploadPart(part_number, block_id)
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        self.provider.azure_client.commit_block_list(
+            bucket.name, upload.object_name, [p.etag for p in ordered])
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        # Azure has no server-side abort: uncommitted blocks are garbage
+        # collected automatically (after ~7 days), so there is nothing to do.
+        log.debug("Azure has no multipart abort; uncommitted blocks for "
+                  "%s/%s will expire automatically.",
+                  bucket.name, upload.object_name)
+
 
 class AzureComputeService(BaseComputeService):
     def __init__(self, provider):

+ 17 - 5
cloudbridge/providers/gcp/resources.py

@@ -1949,14 +1949,22 @@ class GCPBucketObject(BaseBucketObject):
                                          object=self.name)
                               .execute())
 
-    def upload(self, data):
+    @property
+    def bucket(self):
+        return self._bucket
+
+    def _upload_single_shot(self, data):
         """
-        Set the contents of this object to the given text.
+        Set the contents of this object in a single request. ``data`` may be
+        text, bytes or a seekable file-like object. Larger uploads are handled
+        transparently by the base class via the multipart (compose) path.
         """
-        if type(data) is str:
+        if isinstance(data, str):
             data = data.encode()
+        if isinstance(data, (bytes, bytearray)):
+            data = io.BytesIO(data)
         media_body = googleapiclient.http.MediaIoBaseUpload(
-                io.BytesIO(data), mimetype='plain/text')
+                data, mimetype='application/octet-stream')
         # pylint:disable=protected-access
         response = (self._provider
                         .storage._bucket_objects
@@ -1966,9 +1974,13 @@ class GCPBucketObject(BaseBucketObject):
         if response:
             self._obj = response
 
-    def upload_from_file(self, path):
+    def upload_from_file(self, path, config=None):
         """
         Upload a binary file.
+
+        GCP uses a resumable upload here, which streams the file in chunks on a
+        single session; the ``config`` argument is accepted for interface
+        consistency but does not affect this path.
         """
         with open(path, 'rb') as f:
             media_body = googleapiclient.http.MediaIoBaseUpload(

+ 118 - 0
cloudbridge/providers/gcp/services.py

@@ -9,6 +9,8 @@ import googleapiclient
 
 from cloudbridge.base import helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
+from cloudbridge.base.resources import BaseMultipartUpload
+from cloudbridge.base.resources import BaseUploadPart
 from cloudbridge.base.resources import ClientPagedResultList
 from cloudbridge.base.resources import ServerPagedResultList
 from cloudbridge.base.services import BaseBucketObjectService
@@ -1497,6 +1499,122 @@ class GCPBucketObjectService(BaseBucketObjectService):
                                bucket,
                                response) if response else None
 
+    # GCS has no independent "upload part" API, so multipart is emulated by
+    # uploading each part as a temporary object and assembling them with the
+    # compose API. ``compose`` accepts at most this many source objects per
+    # call, so larger uploads are composed in chained batches.
+    _MAX_COMPOSE_SOURCES = 32
+
+    @staticmethod
+    def _temp_prefix(upload):
+        return ".cb-mpu/{0}/{1}/".format(upload.object_name, upload.id)
+
+    @classmethod
+    def _temp_part_name(cls, upload, part_number):
+        return "{0}part-{1:05d}".format(cls._temp_prefix(upload), part_number)
+
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        # No server-side initiation; the upload id namespaces this upload's
+        # temporary part objects.
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   uuid.uuid4().hex)
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        if isinstance(data, str):
+            data = data.encode()
+        if isinstance(data, (bytes, bytearray)):
+            data = io.BytesIO(data)
+        media_body = googleapiclient.http.MediaIoBaseUpload(
+            data, mimetype='application/octet-stream')
+        temp_name = self._temp_part_name(upload, part_number)
+        self._create_object_with_media_body(bucket, temp_name, media_body)
+        return BaseUploadPart(part_number, temp_name)
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        sources = [p.etag for p in ordered]
+        intermediates = self._compose(bucket, upload, upload.object_name,
+                                      sources)
+        # The temporary part objects and any compose intermediates are real,
+        # billable objects, so remove them once assembled.
+        self._delete_objects(bucket, sources + intermediates,
+                             ignore_missing=True)
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        self._delete_objects(bucket, self._list_temp_objects(bucket, upload),
+                             ignore_missing=True)
+
+    def _compose(self, bucket, upload, destination, sources):
+        """
+        Compose ``sources`` into ``destination``, chaining through
+        intermediate objects when there are more than ``_MAX_COMPOSE_SOURCES``.
+        Returns the list of intermediate object names created (to be cleaned
+        up by the caller).
+        """
+        intermediates = []
+        level = sources
+        batch = 0
+        while len(level) > self._MAX_COMPOSE_SOURCES:
+            next_level = []
+            for i in range(0, len(level), self._MAX_COMPOSE_SOURCES):
+                group = level[i:i + self._MAX_COMPOSE_SOURCES]
+                name = "{0}compose-{1:05d}".format(
+                    self._temp_prefix(upload), batch)
+                self._compose_once(bucket, name, group)
+                intermediates.append(name)
+                next_level.append(name)
+                batch += 1
+            level = next_level
+        self._compose_once(bucket, destination, level)
+        return intermediates
+
+    def _compose_once(self, bucket, destination, sources):
+        (self.provider
+             .gcp_storage
+             .objects()
+             .compose(destinationBucket=bucket.name,
+                      destinationObject=destination,
+                      body={'sourceObjects': [{'name': s} for s in sources]})
+             .execute())
+
+    def _list_temp_objects(self, bucket, upload):
+        prefix = self._temp_prefix(upload)
+        names = []
+        page_token = None
+        while True:
+            response = (self.provider
+                            .gcp_storage
+                            .objects()
+                            .list(bucket=bucket.name, prefix=prefix,
+                                  pageToken=page_token)
+                            .execute())
+            names.extend(o['name'] for o in response.get('items', []))
+            page_token = response.get('nextPageToken')
+            if not page_token:
+                return names
+
+    def _delete_objects(self, bucket, names, ignore_missing=False):
+        for name in names:
+            try:
+                (self.provider
+                     .gcp_storage
+                     .objects()
+                     .delete(bucket=bucket.name, object=name)
+                     .execute())
+            except googleapiclient.errors.HttpError:
+                if not ignore_missing:
+                    raise
+
 
 class GCPGatewayService(BaseGatewayService):
     _DEFAULT_GATEWAY_NAME = 'default-internet-gateway'

+ 14 - 5
cloudbridge/providers/openstack/resources.py

@@ -1314,21 +1314,30 @@ class OpenStackBucketObject(BaseBucketObject):
             self.cbcontainer.name, self.name, resp_chunk_size=65536)
         return content
 
-    def upload(self, data):
+    @property
+    def bucket(self):
+        return self.cbcontainer
+
+    def _upload_single_shot(self, data):
         """
-        Set the contents of this object to the data read from the source
-        string.
+        Set the contents of this object in a single request.
 
-        .. warning:: Will fail if the data is larger than 5 Gig.
+        Inputs larger than the multipart threshold are handled transparently
+        by the base class via the Static Large Object (SLO) multipart path, so
+        this single-request path only runs for smaller payloads.
         """
         self._provider.swift.put_object(self.cbcontainer.name, self.name,
                                         data)
 
-    def upload_from_file(self, path):
+    def upload_from_file(self, path, config=None):
         """
         Stores the contents of the file pointed by the ``path`` variable.
         If the file is bigger than 5 Gig, it will be broken into segments.
 
+        Swift uses ``SwiftService`` here, which manages its own segmenting and
+        concurrency; the ``config`` argument is accepted for interface
+        consistency but does not affect this path.
+
         :type path: ``str``
         :param path: Absolute path to the file to be uploaded to Swift.
         :rtype: ``bool``

+ 59 - 0
cloudbridge/providers/openstack/services.py

@@ -1,7 +1,9 @@
 """
 Services implemented by the OpenStack provider.
 """
+import json
 import logging
+import uuid
 
 from neutronclient.common.exceptions import NeutronClientException
 from neutronclient.common.exceptions import PortNotFoundClient
@@ -18,6 +20,8 @@ from swiftclient import ClientException as SwiftClientException
 import cloudbridge.base.helpers as cb_helpers
 from cloudbridge.base.middleware import dispatch
 from cloudbridge.base.resources import BaseLaunchConfig
+from cloudbridge.base.resources import BaseMultipartUpload
+from cloudbridge.base.resources import BaseUploadPart
 from cloudbridge.base.resources import ClientPagedResultList
 from cloudbridge.base.services import BaseBucketObjectService
 from cloudbridge.base.services import BaseBucketService
@@ -653,6 +657,61 @@ class OpenStackBucketObjectService(BaseBucketObjectService):
         self.provider.swift.put_object(bucket.name, object_name, None)
         return self.get(bucket, object_name)
 
+    @staticmethod
+    def _segment_prefix(upload):
+        return "{0}/slo/{1}/".format(upload.object_name, upload.id)
+
+    @classmethod
+    def _segment_name(cls, upload, part_number):
+        return "{0}{1:08d}".format(cls._segment_prefix(upload), part_number)
+
+    @dispatch(event="provider.storage._bucket_objects.create_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def create_multipart_upload(self, bucket, object_name):
+        # Swift has no server-side initiation; the upload id namespaces this
+        # upload's Static Large Object segments.
+        return BaseMultipartUpload(self.provider, bucket, object_name,
+                                   uuid.uuid4().hex)
+
+    @dispatch(event="provider.storage._bucket_objects.upload_part",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def upload_part(self, bucket, upload, part_number, data):
+        if isinstance(data, str):
+            data = data.encode()
+        if not isinstance(data, (bytes, bytearray)):
+            data = data.read()
+        segment_name = self._segment_name(upload, part_number)
+        etag = self.provider.swift.put_object(
+            bucket.name, segment_name, data)
+        # Retain the manifest entry needed to assemble the SLO on complete.
+        return BaseUploadPart(part_number, {
+            'path': "/{0}/{1}".format(bucket.name, segment_name),
+            'etag': etag,
+            'size_bytes': len(data)})
+
+    @dispatch(event="provider.storage._bucket_objects."
+                    "complete_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        manifest = [p.etag for p in ordered]
+        self.provider.swift.put_object(
+            bucket.name, upload.object_name, json.dumps(manifest),
+            query_string='multipart-manifest=put')
+        return self.get(bucket, upload.object_name)
+
+    @dispatch(event="provider.storage._bucket_objects.abort_multipart_upload",
+              priority=BaseBucketObjectService.STANDARD_EVENT_PRIORITY)
+    def abort_multipart_upload(self, bucket, upload):
+        prefix = self._segment_prefix(upload)
+        _, object_list = self.provider.swift.get_container(
+            bucket.name, prefix=prefix)
+        for obj in object_list:
+            try:
+                self.provider.swift.delete_object(bucket.name, obj.get('name'))
+            except SwiftClientException:
+                pass  # idempotent: ignore already-deleted segments
+
 
 class OpenStackComputeService(BaseComputeService):
 

+ 223 - 0
tests/test_multipart_driver.py

@@ -0,0 +1,223 @@
+"""
+Provider-agnostic unit tests for the base multipart upload driver
+(``BaseBucketObject._upload_multipart``).
+
+The driver is the engine behind transparent large uploads on providers that do
+not override it (GCP, OpenStack Swift). Because the mock provider is AWS-backed
+and AWS overrides the driver with boto3's native uploader, the driver is
+exercised here directly against in-memory fakes so it has coverage in CI
+without cloud credentials.
+"""
+import threading
+import unittest
+from io import BytesIO
+
+from cloudbridge.base.resources import BaseBucketObject
+from cloudbridge.base.resources import BaseMultipartUpload
+from cloudbridge.base.resources import BaseUploadPart
+from cloudbridge.interfaces.exceptions import InvalidValueException
+from cloudbridge.interfaces.resources import UploadConfig
+
+
+class _Recorder:
+    """Thread-safe sink shared by the original and all cloned fake services."""
+
+    def __init__(self):
+        self._lock = threading.Lock()
+        self.parts = {}            # part_number -> bytes
+        self.services_used = set()  # id() of each service that uploaded a part
+        self.clone_count = 0
+        self.completed_order = None
+        self.aborted = False
+        self.active = 0
+        self.max_active = 0
+        self.fail_on_part = None    # part_number that should raise
+
+    def record_part(self, service, part_number, data):
+        with self._lock:
+            self.active += 1
+            self.max_active = max(self.max_active, self.active)
+        try:
+            if self.fail_on_part == part_number:
+                raise RuntimeError("boom on part %d" % part_number)
+            # Hold briefly so concurrent uploads genuinely overlap.
+            time_to_sleep = 0.02
+            _sleep(time_to_sleep)
+            with self._lock:
+                self.parts[part_number] = bytes(data)
+                self.services_used.add(id(service))
+        finally:
+            with self._lock:
+                self.active -= 1
+
+
+def _sleep(seconds):
+    # Indirection so the deterministic tests can monkeypatch if needed; a plain
+    # sleep is fine here and keeps the overlap window small.
+    threading.Event().wait(seconds)
+
+
+class _FakeService:
+    def __init__(self, recorder, provider):
+        self._recorder = recorder
+        self._provider = provider
+
+    def create_multipart_upload(self, bucket, object_name):
+        return BaseMultipartUpload(self._provider, bucket, object_name, "upl")
+
+    def upload_part(self, bucket, upload, part_number, data):
+        self._recorder.record_part(self, part_number, data)
+        return BaseUploadPart(part_number, "etag-%d" % part_number)
+
+    def complete_multipart_upload(self, bucket, upload, parts):
+        ordered = sorted(parts, key=lambda p: p.part_number)
+        self._recorder.completed_order = [p.part_number for p in ordered]
+        return b"".join(self._recorder.parts[p.part_number] for p in ordered)
+
+    def abort_multipart_upload(self, bucket, upload):
+        self._recorder.aborted = True
+
+
+class _FakeStorage:
+    def __init__(self, service):
+        self._bucket_objects = service
+
+
+class _FakeProvider:
+    def __init__(self, recorder):
+        self._recorder = recorder
+        self.storage = _FakeStorage(_FakeService(recorder, self))
+
+    def clone(self, zone=None):
+        self._recorder.clone_count += 1
+        return _FakeProvider(self._recorder)
+
+    def _get_config_value(self, key, default_value=None):
+        return default_value
+
+
+class _DriverObject(BaseBucketObject):
+    """A BaseBucketObject wired to fakes, with a tiny minimum part size so
+    tests can use small payloads."""
+
+    CB_MULTIPART_MIN_PART_SIZE = 1
+
+    def __init__(self, provider, part_size, concurrency):
+        super(_DriverObject, self).__init__(provider)
+        self._part_size = part_size
+        self._concurrency = concurrency
+
+    @property
+    def id(self):
+        return "obj"
+
+    @property
+    def name(self):
+        return "obj"
+
+    @property
+    def bucket(self):
+        return "BUCKET"
+
+    def _multipart_part_size(self, config=None):
+        if config is not None and config.part_size is not None:
+            return config.part_size
+        return self._part_size
+
+    def _multipart_max_concurrency(self, config=None):
+        if config is not None and config.max_concurrency is not None:
+            return config.max_concurrency
+        return self._concurrency
+
+
+class MultipartDriverTestCase(unittest.TestCase):
+
+    def _driver(self, recorder, part_size, concurrency):
+        return _DriverObject(_FakeProvider(recorder), part_size, concurrency)
+
+    def test_reassembles_payload_in_order(self):
+        recorder = _Recorder()
+        driver = self._driver(recorder, part_size=4, concurrency=3)
+        content = b"abcdefghijABCDEFGHIJ0123456789x"  # 31 bytes -> 8 parts
+        result = driver._upload_multipart(BytesIO(content))
+        self.assertEqual(result, content)
+        self.assertEqual(recorder.completed_order, list(range(1, 9)))
+        # Final part is the short remainder (3 bytes).
+        self.assertEqual(recorder.parts[8], content[28:])
+
+    def test_handles_short_reads_without_undersized_parts(self):
+        recorder = _Recorder()
+        driver = self._driver(recorder, part_size=8, concurrency=2)
+
+        class _DripStream:
+            """Returns at most 3 bytes per read to simulate a socket-like
+            stream; the driver must coalesce reads up to the part size."""
+            def __init__(self, data):
+                self._buf = BytesIO(data)
+
+            def read(self, size):
+                return self._buf.read(min(size, 3))
+
+        content = bytes(range(20))  # 20 bytes, part_size 8 -> 8,8,4
+        result = driver._upload_multipart(_DripStream(content))
+        self.assertEqual(result, content)
+        self.assertEqual(len(recorder.parts[1]), 8)
+        self.assertEqual(len(recorder.parts[2]), 8)
+        self.assertEqual(len(recorder.parts[3]), 4)
+
+    def test_uploads_parts_concurrently_via_cloned_services(self):
+        recorder = _Recorder()
+        concurrency = 4
+        driver = self._driver(recorder, part_size=1, concurrency=concurrency)
+        # 12 parts (one byte each) across a pool of 4 clones.
+        content = b"0123456789ab"
+        driver._upload_multipart(BytesIO(content))
+
+        # A clone per worker, reused across parts.
+        self.assertEqual(recorder.clone_count, concurrency)
+        self.assertEqual(len(recorder.services_used), concurrency)
+        # Real parallelism happened, bounded by the configured concurrency.
+        self.assertGreater(recorder.max_active, 1)
+        self.assertLessEqual(recorder.max_active, concurrency)
+
+    def test_single_concurrency_does_not_clone(self):
+        recorder = _Recorder()
+        driver = self._driver(recorder, part_size=4, concurrency=1)
+        driver._upload_multipart(BytesIO(b"abcdefghij"))
+        self.assertEqual(recorder.clone_count, 0)
+        self.assertEqual(recorder.max_active, 1)
+
+    def test_per_call_config_overrides_concurrency(self):
+        recorder = _Recorder()
+        # The driver's default concurrency is 1 (no clones); a per-call config
+        # bumps it to 3, engaging the clone-per-worker pool.
+        driver = self._driver(recorder, part_size=1, concurrency=1)
+        driver._upload_multipart(BytesIO(b"0123456789ab"),
+                                 UploadConfig(max_concurrency=3))
+        self.assertEqual(recorder.clone_count, 3)
+        self.assertGreater(recorder.max_active, 1)
+        self.assertLessEqual(recorder.max_active, 3)
+
+    def test_aborts_and_raises_on_part_failure(self):
+        recorder = _Recorder()
+        recorder.fail_on_part = 2
+        driver = self._driver(recorder, part_size=4, concurrency=3)
+        with self.assertRaises(Exception):
+            driver._upload_multipart(BytesIO(b"abcdefghijklmnop"))
+        self.assertTrue(recorder.aborted)
+
+    def test_part_size_below_minimum_raises(self):
+        recorder = _Recorder()
+        # Use the real minimum here (not the test override) by going through a
+        # driver whose part size is one below the production floor.
+        driver = _DriverObject(_FakeProvider(recorder), part_size=4,
+                               concurrency=2)
+        driver.CB_MULTIPART_MIN_PART_SIZE = 5
+        with self.assertRaises(InvalidValueException):
+            driver._upload_multipart(BytesIO(b"abc"))
+        # Nothing should have been created before validation failed.
+        self.assertEqual(recorder.completed_order, None)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 206 - 0
tests/test_object_store_service.py

@@ -3,20 +3,27 @@ import os
 import tempfile
 from datetime import datetime
 from io import BytesIO
+from unittest import mock
 from unittest import skip
 
 import requests
 
 from cloudbridge.base import helpers as cb_helpers
+from cloudbridge.base.resources import BaseBucketObject
 from cloudbridge.interfaces.exceptions import DuplicateResourceException
 from cloudbridge.interfaces.provider import TestMockHelperMixin
 from cloudbridge.interfaces.resources import Bucket
 from cloudbridge.interfaces.resources import BucketObject
+from cloudbridge.interfaces.resources import UploadConfig
 
 from tests import helpers
 from tests.helpers import ProviderTestBase
 from tests.helpers import standard_interface_tests as sit
 
+# S3 (and Swift) require every part except the last to be >= 5 MiB. Tests use
+# this size so they remain valid against real cloud providers, not just moto.
+MIN_PART_SIZE = 5 * 1024 * 1024
+
 
 class CloudObjectStoreServiceTestCase(ProviderTestBase):
 
@@ -240,6 +247,205 @@ class CloudObjectStoreServiceTestCase(ProviderTestBase):
                 with open(test_file, 'rb') as f:
                     self.assertEqual(target_stream.getvalue(), f.read())
 
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_explicit_multipart_upload_roundtrip(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "mpu-roundtrip.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                part1 = b"a" * MIN_PART_SIZE
+                part2 = b"b" * MIN_PART_SIZE
+                part3 = b"c" * 1024  # final part may be smaller than the min
+                expected = part1 + part2 + part3
+
+                upload = obj.create_multipart_upload()
+                parts = [upload.upload_part(1, part1),
+                         upload.upload_part(2, part2),
+                         upload.upload_part(3, part3)]
+                upload.complete(parts)
+
+                stored = test_bucket.objects.get(obj_name)
+                self.assertIsNotNone(
+                    stored, "Object should exist after multipart completion")
+                self.assertEqual(stored.size, len(expected))
+                target_stream = BytesIO()
+                stored.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), expected)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_multipart_upload_out_of_order_parts(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "mpu-ooo.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                part1 = b"1" * MIN_PART_SIZE
+                part2 = b"2" * MIN_PART_SIZE
+                part3 = b"3" * 1024
+                expected = part1 + part2 + part3
+
+                upload = obj.create_multipart_upload()
+                # Upload and collect parts out of order; complete must
+                # assemble them in ascending part-number order regardless.
+                p3 = upload.upload_part(3, part3)
+                p1 = upload.upload_part(1, part1)
+                p2 = upload.upload_part(2, part2)
+                upload.complete([p3, p1, p2])
+
+                stored = test_bucket.objects.get(obj_name)
+                target_stream = BytesIO()
+                stored.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), expected)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_multipart_upload_abort(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "mpu-abort.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                upload = obj.create_multipart_upload()
+                upload.upload_part(1, b"a" * MIN_PART_SIZE)
+                upload.abort()
+
+                # Aborting must not commit any part data. Some providers
+                # pre-materialise an empty placeholder on objects.create(), so
+                # the target may be absent or empty afterwards -- but it must
+                # never hold the uploaded part.
+                stored = test_bucket.objects.get(obj_name)
+                self.assertTrue(
+                    stored is None or stored.size == 0,
+                    "Aborted multipart upload must not commit any part data")
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_transparent_upload_large_stream_uses_multipart(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj_name = "transparent.bin"
+            obj = test_bucket.objects.create(obj_name)
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                content = b"x" * (MIN_PART_SIZE * 2 + 1024)
+
+                # Lower the threshold/part size so a modest stream crosses it,
+                # and assert the multipart path is taken (each provider routes
+                # its own way underneath) and the object round-trips exactly.
+                with mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_THRESHOLD',
+                        MIN_PART_SIZE), \
+                    mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_PART_SIZE',
+                        MIN_PART_SIZE), \
+                    mock.patch.object(
+                        obj, '_upload_multipart',
+                        wraps=obj._upload_multipart) as spy:
+                    obj.upload(BytesIO(content))
+
+                spy.assert_called_once()
+                stored = test_bucket.objects.get(obj_name)
+                self.assertEqual(stored.size, len(content))
+                target_stream = BytesIO()
+                stored.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), content)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_small_upload_stays_single_shot(self):
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj = test_bucket.objects.create("small.txt")
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                content = b"a small payload below the multipart threshold"
+
+                # A payload below the threshold must not trigger multipart.
+                svc = self.provider.storage._bucket_objects
+                with mock.patch.object(
+                        svc, 'create_multipart_upload',
+                        wraps=svc.create_multipart_upload) as spy:
+                    obj.upload(content)
+
+                spy.assert_not_called()
+                target_stream = BytesIO()
+                obj.save_content(target_stream)
+                self.assertEqual(target_stream.getvalue(), content)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_upload_from_file_uses_multipart_config(self):
+        # AWS drives boto3's TransferManager with CloudBridge's multipart
+        # knobs (not boto3's own defaults). This wiring is AWS-specific.
+        if self.provider.PROVIDER_ID not in ('aws', 'mock'):
+            self.skipTest("TransferConfig wiring is specific to AWS")
+        from boto3.s3.transfer import TransferConfig
+
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj = test_bucket.objects.create("config.bin")
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                test_file = os.path.join(
+                    helpers.get_test_fixtures_folder(), 'logo.jpg')
+                # pylint:disable=protected-access
+                with mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_PART_SIZE',
+                        7 * 1024 * 1024), \
+                    mock.patch.object(
+                        BaseBucketObject, 'CB_MULTIPART_MAX_CONCURRENCY', 3), \
+                    mock.patch.object(
+                        obj._obj, 'upload_file',
+                        wraps=obj._obj.upload_file) as spy:
+                    obj.upload_from_file(test_file)
+
+                spy.assert_called_once()
+                config = spy.call_args.kwargs['Config']
+                self.assertIsInstance(config, TransferConfig)
+                self.assertEqual(config.multipart_chunksize, 7 * 1024 * 1024)
+                self.assertEqual(config.max_concurrency, 3)
+
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_per_call_upload_config_overrides_defaults(self):
+        # A per-call UploadConfig takes precedence over the global knobs.
+        if self.provider.PROVIDER_ID not in ('aws', 'mock'):
+            self.skipTest("TransferConfig wiring is specific to AWS")
+        from boto3.s3.transfer import TransferConfig
+
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj = test_bucket.objects.create("percall.bin")
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                test_file = os.path.join(
+                    helpers.get_test_fixtures_folder(), 'logo.jpg')
+                cfg = UploadConfig(part_size=8 * 1024 * 1024,
+                                   max_concurrency=2)
+                # pylint:disable=protected-access
+                with mock.patch.object(
+                        obj._obj, 'upload_file',
+                        wraps=obj._obj.upload_file) as spy:
+                    obj.upload_from_file(test_file, config=cfg)
+
+                config = spy.call_args.kwargs['Config']
+                self.assertIsInstance(config, TransferConfig)
+                self.assertEqual(config.multipart_chunksize, 8 * 1024 * 1024)
+                self.assertEqual(config.max_concurrency, 2)
+
     @skip("Skip unless you want to test objects bigger than 5GB")
     @helpers.skipIfNoService(['storage.buckets'])
     def test_upload_download_bucket_content_with_large_file(self):