Преглед изворни кода

integration: Move destination device initialization to the provider

Currently, we create a destination device in the test base. However, the
import provider is supposed to create the disks for transfered as required.

Moving this part into the test provider will make it easier for us to
swap in other providers later on.
Claudiu Belu пре 2 недеља
родитељ
комит
f7a9019993

+ 48 - 3
coriolis/tests/integration/base.py

@@ -67,8 +67,10 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
         cls._exp_platform = cls._harness.exp_provider_platform
         cls._exp_conn_info = cls._harness.exp_conn_info
 
+        cls._imp_provider = cls._harness.imp_provider
         cls._imp_platform = cls._harness.imp_provider_platform
         cls._imp_conn_info = cls._harness.imp_conn_info
+        cls._imp_env_options = cls._harness.imp_env_options
 
         cls._client = cls.get_client()
 
@@ -289,8 +291,6 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
 
         self._src_device = test_utils.add_scsi_debug_device()
         self.addCleanup(test_utils.remove_scsi_debug_device)
-        self._dst_device = test_utils.add_scsi_debug_device()
-        self.addCleanup(test_utils.remove_scsi_debug_device)
 
         # Write a test pattern on the src device.
         # Incremental transfer tests update the second chunk (offset=4096).
@@ -308,8 +308,11 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             instances=[self._instance_name],
             destination_minion_pool_id=self._pool_id,
             source_environment={"block_device_path": self._src_device},
-            destination_environment={"devices": [self._dst_device]},
         )
+        # Safety-net cleanup for destination devices allocated by the provider.
+        # Must be registered after the transfer, so it runs (LIFO) before the
+        # transfer delete, while the volumes_info is still in the DB.
+        self.addCleanup(self._cleanup_provider_dst_devices)
 
         # mock a few commands that are going to be ran through ssh; they won't
         # pass anyway.
@@ -325,6 +328,48 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             mocker.start()
             self.addCleanup(mocker.stop)
 
+    @property
+    def _dst_device(self):
+        """First destination dev path from the transfer's volumes_info."""
+        ctxt = self._get_db_context()
+
+        transfer = db_api.get_transfer(
+            ctxt, self._transfer.id, include_task_info=True)
+        info = transfer.get("info", {}).get(self._instance_name, {})
+        for vol in info.get("volumes_info", []):
+            if vol.get("volume_dev"):
+                return vol["volume_dev"]
+
+        return None
+
+    def _cleanup_provider_dst_devices(self):
+        """Remove any devices the provider allocated for this test."""
+        ctxt = self._get_db_context()
+
+        try:
+            transfer = db_api.get_transfer(
+                ctxt, self._transfer.id, include_task_info=True)
+            volumes_info = transfer.get("info", {}).get(
+                self._instance_name, {}).get('volumes_info', [])
+        except Exception as ex:
+            LOG.warn("Could not get volumes info for cleanup. Ex: %s", ex)
+            return
+
+        if not volumes_info:
+            LOG.info("No volume info. Nothing to cleanup.")
+            return
+
+        try:
+            self._imp_provider.delete_replica_disks(
+                ctxt,
+                self._imp_conn_info,
+                self._imp_env_options,
+                volumes_info,
+            )
+        except Exception as ex:
+            LOG.warn(
+                "Could not clean up provider dst devices. Ex: %s", ex)
+
     def _execute_and_wait(self, transfer_id, timeout=300):
         """Trigger one execution of *transfer_id* and wait for completion."""
         execution = self._client.transfer_executions.create(

+ 8 - 0
coriolis/tests/integration/harness.py

@@ -25,6 +25,7 @@ import shutil
 import socket
 import subprocess
 import tempfile
+from unittest import mock
 import uuid
 
 from cheroot.workers import threadpool as cheroot_threadpool
@@ -50,6 +51,7 @@ from coriolis.deployer_manager.rpc import server as deployer_manager_rpc_server
 from coriolis import exception
 from coriolis.minion_manager.rpc import server as minion_manager_rpc_server
 from coriolis import policy as policy_module
+from coriolis.providers import factory as providers_factory
 from coriolis import rpc as rpc_module
 from coriolis.scheduler.rpc import server as scheduler_rpc_server
 from coriolis import service
@@ -311,10 +313,16 @@ class _IntegrationHarness:
 
         self.imp_provider_class = _get_provider(_TEST_IMPORT_PROVIDER)
         self.imp_provider_platform = self.imp_provider_class.platform
+        self.imp_provider = providers_factory.get_provider(
+            self.imp_provider_platform,
+            constants.PROVIDER_TYPE_TRANSFER_IMPORT,
+            event_handler=mock.MagicMock(),
+        )
         self.imp_conn_info = {
             "pkey_path": self.ssh_key_path,
             "role": "destination",
         }
+        self.imp_env_options = {}
 
         self._wsgi_server = None
         self._wsgi_server_thread = None

+ 16 - 23
coriolis/tests/integration/test_provider/imp.py

@@ -55,7 +55,6 @@ class TestImportProvider(
     ``target_environment`` (per-transfer destination settings) has the form::
 
         {
-            "devices": ["/dev/sdY", ...],  # pre-allocated destination devs
         }
     """
 
@@ -86,12 +85,7 @@ class TestImportProvider(
     def get_target_environment_schema(self):
         return {
             "type": "object",
-            "properties": {
-                "devices": {
-                    "type": "array",
-                    "items": {"type": "string"},
-                },
-            },
+            "properties": {},
             "required": [],
         }
 
@@ -131,25 +125,14 @@ class TestImportProvider(
     def deploy_replica_disks(
             self, ctxt, connection_info, target_environment, instance_name,
             export_info, volumes_info):
-        """Map each source disk in export_info to a destination device.
-
-        Returns a volumes_info list where each entry has ``disk_id`` (from
-        the source) and ``volume_dev`` (the destination block device path).
-        """
-        dest_devices = list(target_environment["devices"])
+        """Allocate disks and return volumes_info."""
         src_disks = export_info.get("devices", {}).get("disks", [])
 
-        if len(src_disks) > len(dest_devices):
-            raise ValueError(
-                "Not enough destination devices (%d) for %d source disks"
-                % (len(dest_devices), len(src_disks))
-            )
-
         result = []
         for i, disk in enumerate(src_disks):
             result.append({
                 "disk_id": disk["id"],
-                "volume_dev": dest_devices[i],
+                "volume_dev": test_utils.add_scsi_debug_device(),
             })
 
         return result
@@ -221,7 +204,10 @@ class TestImportProvider(
 
     def delete_replica_disks(
             self, ctxt, connection_info, target_environment, volumes_info):
-        # scsi_debug devices are managed externally; nothing to delete here.
+        for vol in volumes_info:
+            device = vol.get('volume_dev')
+            if device and os.path.exists(device):
+                test_utils.remove_scsi_debug_device()
         return volumes_info
 
     def create_replica_disk_snapshots(
@@ -240,7 +226,14 @@ class TestImportProvider(
     def deploy_replica_instance(
             self, ctxt, connection_info, target_environment, instance_name,
             export_info, volumes_info, clone_disks):
-        return {"instance_deployment_info": {}}
+        devices = [
+            vol["volume_dev"] for vol in volumes_info if vol.get("volume_dev")
+        ]
+        return {
+            "instance_deployment_info": {
+                "devices": devices,
+            },
+        }
 
     def finalize_replica_instance_deployment(
             self, ctxt, connection_info, target_environment,
@@ -277,7 +270,7 @@ class TestImportProvider(
     def deploy_os_morphing_resources(
             self, ctxt, connection_info, target_environment,
             instance_deployment_info):
-        devices = list(target_environment.get("devices", []))
+        devices = list(instance_deployment_info.get("devices", []))
 
         # lsblk inside the container sees all the host block devices because
         # Docker containers share the host kernel's sysfs (/sys/block/).