Procházet zdrojové kódy

Add per-call UploadConfig for upload() and upload_from_file()

Introduce a provider-agnostic UploadConfig value object (threshold,
part_size, max_concurrency) that callers may pass to upload() and
upload_from_file() to tune a single transfer. It is deliberately not boto3's
TransferConfig, so the abstraction stays provider-neutral; each provider maps
the fields onto its native mechanism (boto3 TransferConfig, Azure
max_concurrency, the base clone-pool driver for GCP/Swift).

The three multipart resolvers now follow precedence: explicit UploadConfig
field -> provider/global config -> class default. Providers whose
upload_from_file uses a native uploader that manages its own segmenting
(GCP resumable, Swift SwiftService) accept the argument for interface
consistency but document that it does not affect that path.
Nuwan Goonasekera před 7 hodinami
rodič
revize
70f0f84488

+ 21 - 15
cloudbridge/base/resources.py

@@ -802,20 +802,26 @@ class BaseBucketObject(BaseCloudResource, BucketObject):
     def save_content(self, target_stream):
         shutil.copyfileobj(self.iter_content(), target_stream)
 
-    @property
-    def _multipart_threshold(self):
+    # The three resolvers below pick, in order of precedence: an explicit
+    # per-call UploadConfig field, the provider/global config, then the class
+    # default constant.
+    def _multipart_threshold(self, config=None):
+        if config is not None and config.threshold is not None:
+            return int(config.threshold)
         # pylint:disable=protected-access
         return int(self._provider._get_config_value(
             'multipart_threshold', self.CB_MULTIPART_THRESHOLD))
 
-    @property
-    def _multipart_part_size(self):
+    def _multipart_part_size(self, config=None):
+        if config is not None and config.part_size is not None:
+            return int(config.part_size)
         # pylint:disable=protected-access
         return int(self._provider._get_config_value(
             'multipart_part_size', self.CB_MULTIPART_PART_SIZE))
 
-    @property
-    def _multipart_max_concurrency(self):
+    def _multipart_max_concurrency(self, config=None):
+        if config is not None and config.max_concurrency is not None:
+            return int(config.max_concurrency)
         # pylint:disable=protected-access
         return int(self._provider._get_config_value(
             'multipart_max_concurrency', self.CB_MULTIPART_MAX_CONCURRENCY))
@@ -849,19 +855,19 @@ class BaseBucketObject(BaseCloudResource, BucketObject):
             return io.BytesIO(data)
         return data
 
-    def upload(self, data):
+    def upload(self, data, config=None):
         size = self._data_size(data)
-        if size is not None and size > self._multipart_threshold:
-            return self._upload_multipart(self._as_stream(data))
+        if size is not None and size > self._multipart_threshold(config):
+            return self._upload_multipart(self._as_stream(data), config)
         return self._upload_single_shot(data)
 
-    def upload_from_file(self, path):
-        if os.path.getsize(path) > self._multipart_threshold:
+    def upload_from_file(self, path, config=None):
+        if os.path.getsize(path) > self._multipart_threshold(config):
             with open(path, 'rb') as f:
-                return self._upload_multipart(f)
+                return self._upload_multipart(f, config)
         return self._upload_from_file_single_shot(path)
 
-    def _upload_multipart(self, stream):
+    def _upload_multipart(self, stream, config=None):
         """
         Drive the explicit multipart lifecycle over a stream, reading it one
         part at a time so the whole payload is never held in memory.
@@ -875,11 +881,11 @@ class BaseBucketObject(BaseCloudResource, BucketObject):
         Providers with an efficient, thread-safe native uploader (e.g. AWS via
         boto3's ``upload_fileobj``) override this method to use it directly.
         """
-        part_size = self._multipart_part_size
+        part_size = self._multipart_part_size(config)
         if part_size < self.CB_MULTIPART_MIN_PART_SIZE:
             raise InvalidValueException('multipart_part_size', part_size)
 
-        concurrency = max(1, self._multipart_max_concurrency)
+        concurrency = max(1, self._multipart_max_concurrency(config))
         upload = self.create_multipart_upload()
         try:
             if concurrency == 1:

+ 45 - 2
cloudbridge/interfaces/resources.py

@@ -2125,6 +2125,39 @@ class VMFirewallRule(CloudResource):
         pass
 
 
+class UploadConfig(object):
+    """
+    Provider-agnostic, per-call tuning for an object upload.
+
+    Passed optionally to :meth:`.BucketObject.upload` and
+    :meth:`.BucketObject.upload_from_file`. Any field left as ``None`` falls
+    back to the provider/global configuration (the ``CB_MULTIPART_*`` settings).
+    Each provider maps these fields onto its native transfer mechanism.
+    """
+
+    def __init__(self, threshold=None, part_size=None, max_concurrency=None):
+        """
+        :type threshold: ``int``
+        :param threshold: Size in bytes above which the upload is split into
+            multiple parts.
+
+        :type part_size: ``int``
+        :param part_size: Size in bytes of each part. Must be at least the
+            provider minimum (5 MiB on S3) for all but the final part.
+
+        :type max_concurrency: ``int``
+        :param max_concurrency: Maximum number of parts to upload in parallel.
+        """
+        self.threshold = threshold
+        self.part_size = part_size
+        self.max_concurrency = max_concurrency
+
+    def __repr__(self):
+        return ("<CB-UploadConfig: threshold={0}, part_size={1}, "
+                "max_concurrency={2}>".format(
+                    self.threshold, self.part_size, self.max_concurrency))
+
+
 class UploadPart(object):
     """
     A handle for a single part uploaded as part of a multipart upload.
@@ -2297,17 +2330,22 @@ class BucketObject(CloudResource):
         pass
 
     @abstractmethod
-    def upload(self, source_stream):
+    def upload(self, source_stream, config=None):
         """
         Set the contents of the object to the data read from the source stream.
 
+        :type config: :class:`.UploadConfig`
+        :param config: Optional per-call upload tuning (multipart threshold,
+            part size, concurrency). Any field left unset falls back to the
+            provider/global configuration.
+
         :rtype: ``bool``
         :return: ``True`` if successful.
         """
         pass
 
     @abstractmethod
-    def upload_from_file(self, path):
+    def upload_from_file(self, path, config=None):
         """
         Store the contents of the file pointed by the "path" variable.
 
@@ -2316,6 +2354,11 @@ class BucketObject(CloudResource):
 
         :type path: ``str``
         :param path: Absolute path to the file to be uploaded to S3.
+
+        :type config: :class:`.UploadConfig`
+        :param config: Optional per-call upload tuning (multipart threshold,
+            part size, concurrency). Any field left unset falls back to the
+            provider/global configuration.
         """
         pass
 

+ 12 - 12
cloudbridge/providers/aws/resources.py

@@ -879,26 +879,26 @@ class AWSBucketObject(BaseBucketObject):
     def _upload_single_shot(self, data):
         self._obj.put(Body=data)
 
-    def _upload_multipart(self, stream):
+    def _upload_multipart(self, stream, config=None):
         # boto3's TransferManager uploads parts concurrently with a thread-safe
         # client, so the transparent multipart path delegates to it rather than
         # CloudBridge's generic clone-pool driver.
-        config = TransferConfig(
-            multipart_threshold=self._multipart_part_size,
-            multipart_chunksize=self._multipart_part_size,
-            max_concurrency=self._multipart_max_concurrency)
-        self._obj.upload_fileobj(stream, Config=config)
+        transfer_config = TransferConfig(
+            multipart_threshold=self._multipart_part_size(config),
+            multipart_chunksize=self._multipart_part_size(config),
+            max_concurrency=self._multipart_max_concurrency(config))
+        self._obj.upload_fileobj(stream, Config=transfer_config)
 
-    def upload_from_file(self, path):
+    def upload_from_file(self, path, config=None):
         # boto3's upload_file streams large files in parts via its
         # TransferManager. Drive it with CloudBridge's multipart knobs so that
         # upload_from_file and upload() honour the same configuration rather
         # than boto3's own defaults.
-        config = TransferConfig(
-            multipart_threshold=self._multipart_threshold,
-            multipart_chunksize=self._multipart_part_size,
-            max_concurrency=self._multipart_max_concurrency)
-        self._obj.upload_file(path, Config=config)
+        transfer_config = TransferConfig(
+            multipart_threshold=self._multipart_threshold(config),
+            multipart_chunksize=self._multipart_part_size(config),
+            max_concurrency=self._multipart_max_concurrency(config))
+        self._obj.upload_file(path, Config=transfer_config)
 
     def delete(self):
         self._obj.delete()

+ 2 - 2
cloudbridge/providers/azure/resources.py

@@ -247,14 +247,14 @@ class AzureBucketObject(BaseBucketObject):
             log.exception(azureEx)
             return False
 
-    def _upload_multipart(self, stream):
+    def _upload_multipart(self, stream, config=None):
         # The Azure SDK's upload_blob stages blocks concurrently (max_concurrency
         # workers) over a thread-safe client, so the transparent multipart path
         # delegates to it rather than CloudBridge's generic clone-pool driver.
         try:
             self._provider.azure_client.upload_blob(
                 self._container.id, self.id, stream,
-                max_concurrency=self._multipart_max_concurrency)
+                max_concurrency=self._multipart_max_concurrency(config))
             return True
         except AzureException as azureEx:
             log.exception(azureEx)

+ 5 - 1
cloudbridge/providers/gcp/resources.py

@@ -1974,9 +1974,13 @@ class GCPBucketObject(BaseBucketObject):
         if response:
             self._obj = response
 
-    def upload_from_file(self, path):
+    def upload_from_file(self, path, config=None):
         """
         Upload a binary file.
+
+        GCP uses a resumable upload here, which streams the file in chunks on a
+        single session; the ``config`` argument is accepted for interface
+        consistency but does not affect this path.
         """
         with open(path, 'rb') as f:
             media_body = googleapiclient.http.MediaIoBaseUpload(

+ 5 - 1
cloudbridge/providers/openstack/resources.py

@@ -1329,11 +1329,15 @@ class OpenStackBucketObject(BaseBucketObject):
         self._provider.swift.put_object(self.cbcontainer.name, self.name,
                                         data)
 
-    def upload_from_file(self, path):
+    def upload_from_file(self, path, config=None):
         """
         Stores the contents of the file pointed by the ``path`` variable.
         If the file is bigger than 5 Gig, it will be broken into segments.
 
+        Swift uses ``SwiftService`` here, which manages its own segmenting and
+        concurrency; the ``config`` argument is accepted for interface
+        consistency but does not affect this path.
+
         :type path: ``str``
         :param path: Absolute path to the file to be uploaded to Swift.
         :rtype: ``bool``

+ 18 - 4
tests/test_multipart_driver.py

@@ -16,6 +16,7 @@ from cloudbridge.base.resources import BaseBucketObject
 from cloudbridge.base.resources import BaseMultipartUpload
 from cloudbridge.base.resources import BaseUploadPart
 from cloudbridge.interfaces.exceptions import InvalidValueException
+from cloudbridge.interfaces.resources import UploadConfig
 
 
 class _Recorder:
@@ -118,12 +119,14 @@ class _DriverObject(BaseBucketObject):
     def bucket(self):
         return "BUCKET"
 
-    @property
-    def _multipart_part_size(self):
+    def _multipart_part_size(self, config=None):
+        if config is not None and config.part_size is not None:
+            return config.part_size
         return self._part_size
 
-    @property
-    def _multipart_max_concurrency(self):
+    def _multipart_max_concurrency(self, config=None):
+        if config is not None and config.max_concurrency is not None:
+            return config.max_concurrency
         return self._concurrency
 
 
@@ -184,6 +187,17 @@ class MultipartDriverTestCase(unittest.TestCase):
         self.assertEqual(recorder.clone_count, 0)
         self.assertEqual(recorder.max_active, 1)
 
+    def test_per_call_config_overrides_concurrency(self):
+        recorder = _Recorder()
+        # The driver's default concurrency is 1 (no clones); a per-call config
+        # bumps it to 3, engaging the clone-per-worker pool.
+        driver = self._driver(recorder, part_size=1, concurrency=1)
+        driver._upload_multipart(BytesIO(b"0123456789ab"),
+                                 UploadConfig(max_concurrency=3))
+        self.assertEqual(recorder.clone_count, 3)
+        self.assertGreater(recorder.max_active, 1)
+        self.assertLessEqual(recorder.max_active, 3)
+
     def test_aborts_and_raises_on_part_failure(self):
         recorder = _Recorder()
         recorder.fail_on_part = 2

+ 30 - 0
tests/test_object_store_service.py

@@ -14,6 +14,7 @@ from cloudbridge.interfaces.exceptions import DuplicateResourceException
 from cloudbridge.interfaces.provider import TestMockHelperMixin
 from cloudbridge.interfaces.resources import Bucket
 from cloudbridge.interfaces.resources import BucketObject
+from cloudbridge.interfaces.resources import UploadConfig
 
 from tests import helpers
 from tests.helpers import ProviderTestBase
@@ -416,6 +417,35 @@ class CloudObjectStoreServiceTestCase(ProviderTestBase):
                 self.assertEqual(config.multipart_chunksize, 7 * 1024 * 1024)
                 self.assertEqual(config.max_concurrency, 3)
 
+    @helpers.skipIfNoService(['storage.buckets'])
+    def test_per_call_upload_config_overrides_defaults(self):
+        # A per-call UploadConfig takes precedence over the global knobs.
+        if self.provider.PROVIDER_ID not in ('aws', 'mock'):
+            self.skipTest("TransferConfig wiring is specific to AWS")
+        from boto3.s3.transfer import TransferConfig
+
+        name = "cbtest-mpu-{0}".format(helpers.get_uuid())
+        test_bucket = self.provider.storage.buckets.create(name)
+
+        with cb_helpers.cleanup_action(lambda: test_bucket.delete()):
+            obj = test_bucket.objects.create("percall.bin")
+
+            with cb_helpers.cleanup_action(lambda: obj.delete()):
+                test_file = os.path.join(
+                    helpers.get_test_fixtures_folder(), 'logo.jpg')
+                cfg = UploadConfig(part_size=8 * 1024 * 1024,
+                                   max_concurrency=2)
+                # pylint:disable=protected-access
+                with mock.patch.object(
+                        obj._obj, 'upload_file',
+                        wraps=obj._obj.upload_file) as spy:
+                    obj.upload_from_file(test_file, config=cfg)
+
+                config = spy.call_args.kwargs['Config']
+                self.assertIsInstance(config, TransferConfig)
+                self.assertEqual(config.multipart_chunksize, 8 * 1024 * 1024)
+                self.assertEqual(config.max_concurrency, 2)
+
     @skip("Skip unless you want to test objects bigger than 5GB")
     @helpers.skipIfNoService(['storage.buckets'])
     def test_upload_download_bucket_content_with_large_file(self):