Selaa lähdekoodia

Adds integration replica transfer tests

Adds shared utilities for the scsi_debug-backed test provider.
Adds Replica transfer tests, including incremental replica transfer.
Adds Replica deployment test.
Adds transfer failure test.
Claudiu Belu 1 kuukausi sitten
vanhempi
sitoutus
88c2c17350

+ 154 - 0
coriolis/tests/integration/base.py

@@ -13,7 +13,9 @@ Subclasses must be run as root.
 """
 
 import os
+import time
 import unittest
+from unittest import mock
 
 from coriolisclient import client as coriolis_client
 from keystoneauth1 import session as ks_session
@@ -22,7 +24,10 @@ from oslo_config import cfg
 from oslo_log import log as logging
 
 from coriolis import constants
+from coriolis import context
+from coriolis.db import api as db_api
 from coriolis.tests.integration import harness
+from coriolis.tests.integration import utils as test_utils
 from coriolis.tests import test_base
 
 CONF = cfg.CONF
@@ -86,3 +91,152 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
         self.addCleanup(self._client.transfers.delete, transfer.id)
 
         return transfer
+
+
+class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
+    def setUp(self):
+        super().setUp()
+
+        self._src_device = test_utils.add_scsi_debug_device()
+        self.addCleanup(test_utils.remove_scsi_debug_device)
+        self._dst_device = test_utils.add_scsi_debug_device()
+        self.addCleanup(test_utils.remove_scsi_debug_device)
+
+        # Write a test pattern on the src device.
+        test_utils.write_test_pattern(self._src_device)
+
+        # Create endpoints.
+        self._src_endpoint = self._create_endpoint(
+            name="test-src",
+            endpoint_type="test-src",
+            description="integration source endpoint",
+            connection_info={
+                "block_device_path": self._src_device,
+                "pkey_path": "/home/ubuntu/.ssh/id_rsa",
+            },
+        )
+
+        self._dst_endpoint = self._create_endpoint(
+            name="test-dest",
+            endpoint_type="test-dest",
+            description="integration destination endpoint",
+            connection_info={
+                "devices": [self._dst_device],
+                "pkey_path": "/home/ubuntu/.ssh/id_rsa",
+            },
+        )
+
+        # Create transfer replica.
+        self._transfer = self._create_transfer(
+            self._src_endpoint.id,
+            self._dst_endpoint.id,
+            instances=[self._src_device],
+        )
+
+        # mock a few commands that are going to be ran through ssh; they won't
+        # pass anyway.
+        bkup = "coriolis.providers.backup_writers.HTTPBackupWriterBootstrapper"
+        repl = "coriolis.providers.replicator.Replicator"
+        for prop in [
+            "coriolis.providers.backup_writers._disable_lvm2_lvmetad",
+            f"{bkup}._add_firewalld_port",
+            f"{bkup}._change_binary_se_context",
+            f"{repl}._change_binary_se_context",
+        ]:
+            mocker = mock.patch(prop)
+            mocker.start()
+            self.addCleanup(mocker.stop)
+
+    def _execute_and_wait(self, transfer_id, timeout=300):
+        """Trigger one execution of *transfer_id* and wait for completion."""
+        execution = self._client.transfer_executions.create(
+            transfer_id, shutdown_instances=False)
+        self.assertExecutionCompleted(execution.id, timeout=timeout)
+
+    def _get_db_context(self):
+        return context.RequestContext(
+            user='int-test',
+            project_id=harness._TEST_PROJECT_ID,
+            is_admin=True,
+        )
+
+    def wait_for_execution(self, execution_id, timeout=300):
+        """Block until *execution_id* reaches a terminal state.
+
+        Polls the DB directly and yields on each iteration so in-process
+        services can make progress.
+
+        Returns the finalised TasksExecution ORM object.
+        Raises ``AssertionError`` on timeout.
+        """
+        ctxt = self._get_db_context()
+        deadline = time.monotonic() + timeout
+        while time.monotonic() < deadline:
+            execution = db_api.get_tasks_execution(ctxt, execution_id)
+            if execution.status in constants.FINALIZED_EXECUTION_STATUSES:
+                return execution
+            time.sleep(1)
+        self.fail(
+            "Execution %s did not reach a terminal state within %ds "
+            "(last status: %s)"
+            % (execution_id, timeout, execution.status)
+        )
+
+    def assertExecutionCompleted(self, execution_id, timeout=300):
+        """Assert that *execution_id* completes successfully."""
+        execution = self.wait_for_execution(execution_id, timeout=timeout)
+        self.assertEqual(
+            constants.EXECUTION_STATUS_COMPLETED,
+            execution.status,
+            "Execution %s ended with status %s; task details: %s"
+            % (
+                execution_id,
+                execution.status,
+                [
+                    (t.task_type, t.status, t.exception_details)
+                    for t in execution.tasks
+                    if t.status not in (
+                        constants.TASK_STATUS_COMPLETED,
+                        constants.TASK_STATUS_CANCELED,
+                    )
+                ],
+            ),
+        )
+
+    def assertExecutionErrored(self, execution_id, timeout=300):
+        """Assert that *execution_id* ends in an error state."""
+        execution = self.wait_for_execution(execution_id, timeout=timeout)
+        self.assertIn(
+            execution.status,
+            [
+                constants.EXECUTION_STATUS_ERROR,
+                constants.EXECUTION_STATUS_DEADLOCKED,
+            ],
+            "Expected an error status for execution %s, got %s"
+            % (execution_id, execution.status),
+        )
+
+    def assertDeploymentCompleted(self, deployment_id, timeout=300):
+        """Assert that *deployment_id* finishes with a completed status.
+
+        Polls last_execution_status from the DB (the API view does not expose
+        the execution ID directly, so DB polling is used for status tracking).
+        """
+        ctxt = self._get_db_context()
+        deadline = time.monotonic() + timeout
+        while time.monotonic() < deadline:
+            deployment = db_api.get_deployment(ctxt, deployment_id)
+            status = deployment.last_execution_status
+            if status in constants.FINALIZED_EXECUTION_STATUSES:
+                self.assertEqual(
+                    constants.EXECUTION_STATUS_COMPLETED,
+                    status,
+                    "Deployment %s ended with status %s"
+                    % (deployment_id, status),
+                )
+                return
+            time.sleep(1)
+        self.fail(
+            "Deployment %s did not reach a terminal state within %ds"
+            % (deployment_id, timeout)
+        )

+ 8 - 0
coriolis/tests/integration/harness.py

@@ -13,6 +13,8 @@ Tasks are executed in-process as greenlets rather than subprocesses. The
 fake:// oslo.messaging transport is in-memory and process-local; subprocess
 tasks would initialise their own isolated transport with no conductor listener,
 causing every event-handler RPC call from the task to block indefinitely.
+
+Must be run as root (scsi_debug block device setup requires it).
 """
 
 import atexit
@@ -49,6 +51,7 @@ from coriolis import rpc as rpc_module
 from coriolis.scheduler.rpc import server as scheduler_rpc_server
 from coriolis import service
 from coriolis.tasks import factory as task_runners_factory
+from coriolis.tests.integration import utils as test_utils
 from coriolis import utils as coriolis_utils
 from coriolis.worker.rpc import server as worker_rpc_server
 
@@ -212,6 +215,7 @@ class _IntegrationHarness:
         cfg.CONF.set_override(
             'lock_path', self.lock_path, group='oslo_concurrency')
         coriolis_utils.setup_logging()
+        test_utils.init_scsi_debug()
 
         # Policy enforcer: reset so it re-reads the new CONF (no policy file).
         policy_module.reset()
@@ -330,3 +334,7 @@ class _IntegrationHarness:
                 pass
 
         shutil.rmtree(self.workdir, True)
+        try:
+            test_utils.destroy_scsi_debug()
+        except Exception:
+            pass

+ 32 - 0
coriolis/tests/integration/test_deployment.py

@@ -0,0 +1,32 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Integration test for the replica deployment.
+
+Runs a full replica execution to completion, then deploys (instantiates) the
+replica and asserts that the deployment execution reaches COMPLETED.
+
+Must be run as root; requires the scsi_debug kernel module.
+"""
+
+from coriolis.tests.integration import base
+
+
+class ReplicaDeploymentIntegrationTest(base.ReplicaIntegrationTestBase):
+    """Deploy a previously-replicated replica."""
+
+    def test_replica_deployment(self):
+        """Execute a replica, then deploy it and assert COMPLETED."""
+
+        # replica execution
+        self._execute_and_wait(self._transfer.id)
+
+        # deployment
+        deployment = self._client.deployments.create_from_transfer(
+            self._transfer.id,
+            skip_os_morphing=True,
+        )
+        self.addCleanup(self._client.deployments.delete, deployment.id)
+
+        self.assertDeploymentCompleted(deployment.id)

+ 37 - 0
coriolis/tests/integration/test_failure_recovery.py

@@ -0,0 +1,37 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Integration test for failure handling and cleanup.
+
+Patches TestImportProvider.deploy_replica_target_resources to raise an
+exception, triggers a transfer execution, and asserts that:
+  1. The execution reaches ERROR status.
+  2. Cleanup tasks (delete_replica_source_resources) ran so the replicator
+     process is no longer alive.
+
+Must be run as root; requires the scsi_debug kernel module.
+"""
+
+from unittest import mock
+
+from coriolis.tests.integration import base
+from coriolis.tests.integration.providers.test_provider import imp
+
+
+class TransferFailureIntegrationTest(base.ReplicaIntegrationTestBase):
+    """Error path and resource cleanup."""
+
+    def test_error_status_on_provider_failure(self):
+        """Execution reaches ERROR when target resource deployment fails."""
+
+        injected_error = Exception("injected target resource failure")
+
+        with mock.patch.object(
+            imp.TestImportProvider,
+            "deploy_replica_target_resources",
+            side_effect=injected_error,
+        ):
+            execution = self._client.transfer_executions.create(
+                self._transfer.id, shutdown_instances=False)
+            self.assertExecutionErrored(execution.id)

+ 54 - 0
coriolis/tests/integration/test_transfer.py

@@ -0,0 +1,54 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Integration tests for the replica transfer pipeline.
+
+The test has the following steps:
+  - Write a known byte pattern to the source loop device.
+  - Create source / destination endpoints and a Replica transfer via the
+    Coriolis REST API (using coriolisclient).
+  - Execute the transfer and wait for it to complete.
+  - Assert that the destination device contains the same data as the source.
+  - Overwrite a single chunk on the source device.
+  - Execute a second transfer run (incremental=True).
+  - Assert that the destination now matches the updated source.
+
+Must be run as root.
+"""
+
+from coriolis.tests.integration import base
+from coriolis.tests.integration import utils as test_utils
+
+
+class ReplicaTransferIntegrationTest(base.ReplicaIntegrationTestBase):
+    """Full-pipeline replica transfer integration tests."""
+
+    def test_incremental_replica_transfer(self):
+        """Full transfer followed by incremental after source modification."""
+        # First run: full transfer
+        self._execute_and_wait(self._transfer.id)
+
+        self.assertTrue(
+            test_utils.devices_match(self._src_device, self._dst_device),
+            "Devices do not match after initial full transfer",
+        )
+
+        # Mutate source: write a different pattern at the second chunk
+        test_utils.write_bytes_at_offset(
+            self._src_device,
+            offset=4096,
+            data=b"\xff\xfe\xfd\xfc" * 1024,
+        )
+        self.assertFalse(
+            test_utils.devices_match(self._src_device, self._dst_device),
+            "Devices should differ after mutating the source",
+        )
+
+        # Second run: incremental
+        self._execute_and_wait(self._transfer.id)
+
+        self.assertTrue(
+            test_utils.devices_match(self._src_device, self._dst_device),
+            "Destination does not match source after incremental transfer",
+        )

+ 147 - 0
coriolis/tests/integration/utils.py

@@ -0,0 +1,147 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Integration test utils.
+"""
+
+import json
+import os
+import subprocess
+import tempfile
+import time
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+_SETTLE_TIMEOUT = 15
+_POLL_INTERVAL = 1
+
+# Sysfs knob for adding / removing scsi_debug hosts. Writing "1" adds a new
+# host with its own independent backing store (requires per_host_store=1);
+# writing "-1" removes the most-recently added host (LIFO).
+_SCSI_DEBUG_ADD_HOST = "/sys/bus/pseudo/drivers/scsi_debug/add_host"
+
+
+def _lsblk_disk_names() -> set:
+    """Return the set of disk-type block device names visible to lsblk."""
+    result = _run(["lsblk", "-Jb", "-o", "NAME,TYPE"], check=False)
+    if result.returncode != 0:
+        return set()
+
+    data = json.loads(result.stdout)
+    return {
+        d["name"] for d in data.get("blockdevices", [])
+        if d["type"] == "disk"
+    }
+
+
+def _poll_for_new_disks(before, count, timeout=_SETTLE_TIMEOUT):
+    """Block until *count* new disk names appear beyond *before*.
+
+    :returns: sorted list of new names.
+    :raises: ``AssertionError`` on timeout.
+    """
+    deadline = time.monotonic() + timeout
+    while time.monotonic() < deadline:
+        subprocess.call(["udevadm", "settle"])
+        new = sorted(_lsblk_disk_names() - before)
+        if len(new) >= count:
+            return new[:count]
+        time.sleep(_POLL_INTERVAL)
+    raise AssertionError(
+        "Only %d new disk(s) appeared within %ds (expected %d)"
+        % (len(sorted(_lsblk_disk_names() - before)), timeout, count)
+    )
+
+
+def init_scsi_debug(size_mb=64):
+    """Load scsi_debug with per_host_store=1.
+
+    Must be called once per process before any ``add_scsi_debug_device``
+    calls. With ``per_host_store=1`` every host added via the sysfs knob
+    gets its own independent backing store, so devices never share storage.
+    """
+    _run([
+        "modprobe",
+        "scsi_debug",
+        "per_host_store=1",
+        "num_tgts=1",
+        f"dev_size_mb={size_mb}",
+    ])
+
+
+def destroy_scsi_debug():
+    """Unload the scsi_debug module."""
+    _run(["modprobe", "-r", "scsi_debug"])
+
+
+def add_scsi_debug_device() -> str:
+    """Add one scsi_debug host and return its /dev/sdX path.
+
+    Each call creates an independent backing store (per_host_store=1), so
+    writing to one device is never visible through another.
+    """
+    before = _lsblk_disk_names()
+    with open(_SCSI_DEBUG_ADD_HOST, "w") as fh:
+        fh.write("1\n")
+
+    new = _poll_for_new_disks(before, count=1)
+    path = os.path.join("/dev", new[0])
+    LOG.info("scsi_debug device added: %s", path)
+
+    return path
+
+
+def remove_scsi_debug_device():
+    """Remove the most-recently added scsi_debug host."""
+    with open(_SCSI_DEBUG_ADD_HOST, "w") as fh:
+        fh.write("-1\n")
+
+
+def write_test_pattern(device_path, chunk_size=4096):
+    """Fill *device_path* with a repeating 4-byte test pattern.
+
+    Returns the pattern bytes so callers can verify the destination later.
+    The write is done with ``dd`` so it works on raw block devices.
+    """
+    pattern = b"\xde\xad\xbe\xef"
+    # Write the pattern to a temp file, then dd it onto the device.
+    with tempfile.NamedTemporaryFile(delete=False) as tmp:
+        tmp_path = tmp.name
+        tmp.write(pattern * (chunk_size // len(pattern)))
+
+    try:
+        _run(
+            ["dd", "if=%s" % tmp_path, "of=%s" % device_path,
+             "bs=%d" % chunk_size, "conv=notrunc"],
+        )
+        _run(["sync"])
+    finally:
+        os.unlink(tmp_path)
+
+    return pattern
+
+
+def write_bytes_at_offset(device_path, offset, data):
+    """Write *data* at *offset* bytes into *device_path*."""
+    with open(device_path, "r+b") as fh:
+        fh.seek(offset)
+        fh.write(data)
+
+
+def devices_match(path_a, path_b):
+    """Return True if the contents of two block devices are identical."""
+    result = _run(["cmp", "--silent", path_a, path_b], check=False)
+    return result.returncode == 0
+
+
+def _run(cmd, check=True):
+    LOG.debug("Running: %s", " ".join(str(c) for c in cmd))
+    return subprocess.run(
+        cmd,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.DEVNULL,
+        check=check,
+    )

+ 3 - 1
tox.ini

@@ -28,11 +28,13 @@ commands =
 
 [testenv:integration]
 # Must be run as root: sudo tox -e integration
+# Requires the scsi_debug kernel module: modinfo scsi_debug
+# Requires kernel version 5.11 or newer (scsi_debug: per_host_store=1 parameter)
 setenv = {[testenv]setenv}
 deps =
   {[testenv]deps}
   git+https://github.com/cloudbase/python-coriolisclient.git
-commands = stestr run --slowest --test-path coriolis/tests/integration/ {posargs}
+commands = stestr run --slowest --concurrency=1 --test-path coriolis/tests/integration/ {posargs}
 
 [testenv:venv]
 commands = {posargs}