ソースを参照

Adds integration test providers

Adds TestExportProvider and TestImportProvider.

Implements BaseReplicaExportProvider / BaseReplicaExportValidationProvider
backed by a local scsi_debug block device.

Implements BaseReplicaImportProvider backed by pre-allocated scsi_debug block
devices.
Claudiu Belu 1 ヶ月 前
コミット
da2c6da7d4

+ 0 - 1
coriolis/tests/integration/base.py

@@ -60,7 +60,6 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
 
     def _create_endpoint(self, **kwargs):
         endpoint_kwargs = {
-            "endpoint_type": "foo",
             "description": "",
             "regions": [],
         }

+ 9 - 4
coriolis/tests/integration/harness.py

@@ -55,9 +55,13 @@ from coriolis.worker.rpc import server as worker_rpc_server
 CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
-# Path to the test provider class.
-TEST_PROVIDER_CLASS = (
-    "coriolis.tests.integration.providers.test_provider.TestProvider"
+# Dotted paths to the export (source) and import (destination) provider
+# classes.
+_TEST_EXPORT_PROVIDER = (
+    "coriolis.tests.integration.providers.test_provider.exp.TestExportProvider"
+)
+_TEST_IMPORT_PROVIDER = (
+    "coriolis.tests.integration.providers.test_provider.imp.TestImportProvider"
 )
 
 # Fixed project used for all test requests.
@@ -201,7 +205,8 @@ class _IntegrationHarness:
         cfg.CONF([], project='coriolis', version='1.0.0',
                  default_config_files=[], default_config_dirs=[])
         cfg.CONF.set_override('messaging_transport_url', 'fake://')
-        cfg.CONF.set_override('providers', [TEST_PROVIDER_CLASS])
+        cfg.CONF.set_override(
+            'providers', [_TEST_EXPORT_PROVIDER, _TEST_IMPORT_PROVIDER])
         cfg.CONF.set_override(
             'connection', 'sqlite:///%s' % self.db_path, group='database')
         cfg.CONF.set_override(

+ 0 - 31
coriolis/tests/integration/providers/test_provider/__init__.py

@@ -1,31 +0,0 @@
-# Copyright 2026 Cloudbase Solutions Srl
-# All Rights Reserved.
-
-"""
-TestProvider registered in CONF.providers for integration tests.
-
-The Coriolis provider factory resolves a single dotted class path and queries
-it for every provider type it implements. A single entry in CONF.providers
-satisfies both the source and destination side of a replica execution.
-"""
-
-
-from coriolis.providers import base
-
-
-class TestProvider(
-    base.BaseEndpointProvider,
-    base.BaseReplicaExportProvider,
-    base.BaseReplicaImportProvider,
-):
-    platform = "foo"
-
-    def __new__(cls, *args, **kwargs):
-        cls.__abstractmethods__ = set()
-        return super().__new__(cls)
-
-    def get_source_environment_schema(self):
-        return {}
-
-    def get_target_environment_schema(self):
-        return {}

+ 218 - 0
coriolis/tests/integration/providers/test_provider/exp.py

@@ -0,0 +1,218 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Export-side (source) implementation of the test provider.
+
+Uses Replicator (via SSH to 127.0.0.1) to deploy and manage the
+coriolis-replicator service and perform disk replication.
+"""
+
+import os
+
+from oslo_config import cfg
+from oslo_log import log as logging
+import paramiko
+
+from coriolis import events
+from coriolis.providers import backup_writers
+from coriolis.providers.base import BaseEndpointProvider
+from coriolis.providers.base import BaseReplicaExportProvider
+from coriolis.providers.base import BaseReplicaExportValidationProvider
+from coriolis.providers import replicator as replicator_module
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+class TestExportProvider(
+        BaseEndpointProvider,
+        BaseReplicaExportProvider,
+        BaseReplicaExportValidationProvider):
+    """Source-side provider backed by a local `scsi_debug` block device.
+
+    ``connection_info`` (the source endpoint's connection info) has the form::
+
+        {
+            "block_device_path": "/dev/sdX",           # source block device
+            "pkey_path":         "/root/.ssh/id_rsa",  # key for localhost SSH
+        }
+    """
+
+    platform = "test-src"
+
+    def __init__(self, event_handler):
+        self._event_handler = event_handler
+
+    def _event_manager(self):
+        return events.EventManager(self._event_handler)
+
+    def _make_replicator(self, pkey_path, event_mgr, volumes_info, repl_state):
+        # TODO(claudiub): Use containers instead of using 127.0.0.1.
+        pkey = paramiko.RSAKey.from_private_key_file(pkey_path)
+        conn_info = {
+            "ip": "127.0.0.1",
+            "username": "root",
+            "pkey": pkey,
+        }
+        return replicator_module.Replicator(
+            conn_info, event_mgr, volumes_info, repl_state)
+
+    # BaseProvider / BaseEndpointProvider
+
+    def get_connection_info_schema(self):
+        return {
+            "type": "object",
+            "properties": {
+                "block_device_path": {"type": "string"},
+                "pkey_path": {"type": "string"},
+            },
+            "required": ["block_device_path", "pkey_path"],
+        }
+
+    def validate_connection(self, ctxt, connection_info):
+        block_device_path = connection_info["block_device_path"]
+        if not os.path.exists(block_device_path):
+            raise ValueError("Source device not found: %s" % block_device_path)
+        pkey_path = connection_info["pkey_path"]
+        if not os.path.exists(pkey_path):
+            raise ValueError("SSH private key not found: %s" % pkey_path)
+
+    # BaseExportInstanceProvider
+
+    def get_source_environment_schema(self):
+        return {"type": "object", "properties": {}}
+
+    def get_os_morphing_tools(self, os_type, osmorphing_info):
+        return []
+
+    # BaseReplicaExportProvider
+
+    def get_replica_instance_info(
+            self, ctxt, connection_info, source_environment, instance_name):
+        """Return minimal export info describing the source block device."""
+        block_device_path = connection_info["block_device_path"]
+        size_bytes = _get_block_device_size(block_device_path)
+        disk_id = os.path.basename(block_device_path)
+
+        return {
+            "id": instance_name,
+            "name": instance_name,
+            "instance_name": instance_name,
+            "num_cpu": 1,
+            "memory_mb": 512,
+            "os_type": "linux",
+            "nested_virtualization": False,
+            "devices": {
+                "disks": [
+                    {
+                        "id": disk_id,
+                        "format": "raw",
+                        "size_bytes": size_bytes,
+                    }
+                ],
+                "nics": [],
+                "cdroms": [],
+                "serial_ports": [],
+                "floppies": [],
+                "controllers": [],
+            },
+        }
+
+    def deploy_replica_source_resources(
+            self, ctxt, connection_info, export_info, source_environment):
+        block_device_path = connection_info["block_device_path"]
+        pkey_path = connection_info["pkey_path"]
+
+        replicator = self._make_replicator(
+            pkey_path, self._event_manager(), [], None)
+        replicator.init_replicator()
+
+        disk_id = os.path.basename(block_device_path)
+        return {
+            "connection_info": {
+                "ip": "127.0.0.1",
+                "port": 22,
+                "username": "root",
+                "pkey_path": pkey_path,
+            },
+            "migr_resources": {
+                "disk_mappings": {disk_id: block_device_path},
+            },
+        }
+
+    def delete_replica_source_resources(
+            self, ctxt, connection_info, source_environment,
+            migr_resources_dict):
+        pkey_path = connection_info.get("pkey_path")
+        if not pkey_path:
+            return
+        replicator = self._make_replicator(
+            pkey_path, self._event_manager(), [], None)
+        replicator.stop()
+
+    def replicate_disks(
+            self, ctxt, connection_info, source_environment, instance_name,
+            source_resources, source_conn_info, target_conn_info,
+            volumes_info, incremental):
+        pkey_path = source_conn_info["pkey_path"]
+        repl_state = _extract_repl_state(volumes_info) if incremental else None
+
+        replicator = self._make_replicator(
+            pkey_path, self._event_manager(), volumes_info, repl_state)
+        replicator.init_replicator()
+        replicator.wait_for_chunks()
+
+        disk_mappings = source_resources.get("disk_mappings", {})
+        source_volumes_info = [
+            {
+                "disk_id": vol["disk_id"],
+                "disk_path": disk_mappings.get(vol["disk_id"], vol["disk_id"]),
+            }
+            for vol in volumes_info
+        ]
+
+        backup_writer = backup_writers.BackupWritersFactory(
+            target_conn_info, volumes_info).get_writer()
+
+        replicator.replicate_disks(source_volumes_info, backup_writer)
+        return volumes_info
+
+    def delete_replica_source_snapshots(
+            self, ctxt, connection_info, source_environment, volumes_info):
+        # scsi_debug devices have no snapshots.
+        return volumes_info
+
+    def shutdown_instance(
+            self, ctxt, connection_info, source_environment, instance_name):
+        # Nothing to shut down for a block device.
+        pass
+
+    # BaseReplicaExportValidationProvider
+
+    def validate_replica_export_input(
+            self, ctxt, connection_info, instance_name, source_environment):
+        return {}
+
+
+# Helpers
+def _get_block_device_size(device):
+    """Return the size in bytes of *device* using its sysfs entry."""
+    dev_name = os.path.basename(device)
+
+    size_sectors_path = "/sys/block/%s/size" % dev_name
+    with open(size_sectors_path) as fh:
+        sectors = int(fh.read().strip())
+
+    return sectors * 512
+
+
+def _extract_repl_state(volumes_info):
+    """Collect per-disk replicator state stored in volumes_info entries."""
+    state = []
+    for vol in volumes_info:
+        rs = vol.get("replica_state")
+        if rs:
+            state.append(rs)
+
+    return state

+ 232 - 0
coriolis/tests/integration/providers/test_provider/imp.py

@@ -0,0 +1,232 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Import-side (destination) implementation of the test provider.
+
+Uses HTTPBackupWriterBootstrapper (via SSH to 127.0.0.1) to deploy and manage
+the coriolis-writer service and provides the target_conn_info that
+BackupWritersFactory expects.
+"""
+
+import os
+
+from oslo_log import log as logging
+import paramiko
+
+from coriolis.providers import backup_writers
+from coriolis.providers.base import BaseEndpointProvider
+from coriolis.providers.base import BaseReplicaImportProvider
+from coriolis.providers.base import BaseReplicaImportValidationProvider
+from coriolis import utils
+
+LOG = logging.getLogger(__name__)
+
+# Port used by the test writer binary. Chosen to avoid collision with the
+# production default (6677).
+WRITER_TEST_PORT = 16677
+
+
+class TestImportProvider(
+        BaseEndpointProvider,
+        BaseReplicaImportProvider,
+        BaseReplicaImportValidationProvider):
+    """Destination-side provider backed by a local `scsi_debug` block device.
+
+    ``connection_info`` (the destination endpoint's connection info) has the
+    form::
+
+        {
+            "devices":   ["/dev/sdY", ...],   # pre-allocated destination devs
+            "pkey_path": "/root/.ssh/id_rsa", # key for localhost SSH
+        }
+    """
+
+    platform = "test-dest"
+
+    def __init__(self, event_handler):
+        self._event_handler = event_handler
+
+    # BaseProvider / BaseEndpointProvider
+
+    def get_connection_info_schema(self):
+        return {
+            "type": "object",
+            "properties": {
+                "devices": {
+                    "type": "array",
+                    "items": {"type": "string"},
+                },
+                "pkey_path": {"type": "string"},
+            },
+            "required": ["devices", "pkey_path"],
+        }
+
+    def validate_connection(self, ctxt, connection_info):
+        for dev in connection_info.get("devices", []):
+            if not os.path.exists(dev):
+                raise ValueError("Destination device not found: %s" % dev)
+        pkey_path = connection_info["pkey_path"]
+        if not os.path.exists(pkey_path):
+            raise ValueError("SSH private key not found: %s" % pkey_path)
+
+    # BaseImportInstanceProvider
+
+    def get_target_environment_schema(self):
+        return {"type": "object", "properties": {}}
+
+    # BaseReplicaImportProvider
+
+    def deploy_replica_disks(
+            self, ctxt, connection_info, target_environment, instance_name,
+            export_info, volumes_info):
+        """Map each source disk in export_info to a destination device.
+
+        Returns a volumes_info list where each entry has ``disk_id`` (from
+        the source) and ``volume_dev`` (the destination block device path).
+        """
+        dest_devices = list(connection_info["devices"])
+        src_disks = export_info.get("devices", {}).get("disks", [])
+
+        if len(src_disks) > len(dest_devices):
+            raise ValueError(
+                "Not enough destination devices (%d) for %d source disks"
+                % (len(dest_devices), len(src_disks))
+            )
+
+        result = []
+        for i, disk in enumerate(src_disks):
+            result.append({
+                "disk_id": disk["id"],
+                "volume_dev": dest_devices[i],
+            })
+
+        return result
+
+    def deploy_replica_target_resources(
+            self, ctxt, connection_info, target_environment, volumes_info):
+        pkey_path = connection_info["pkey_path"]
+        pkey = paramiko.RSAKey.from_private_key_file(pkey_path)
+        ssh_conn_info = {
+            "ip": "127.0.0.1",
+            "port": 22,
+            "username": "root",
+            "pkey": pkey,
+        }
+
+        bootstrapper = backup_writers.HTTPBackupWriterBootstrapper(
+            ssh_conn_info, WRITER_TEST_PORT)
+        writer_conn_details = bootstrapper.setup_writer()
+
+        return {
+            "volumes_info": volumes_info,
+            "connection_info": {
+                "backend": "http_backup_writer",
+                "connection_details": writer_conn_details,
+            },
+            "migr_resources": {},
+        }
+
+    def delete_replica_target_resources(
+            self, ctxt, connection_info, target_environment,
+            migr_resources_dict):
+        pkey_path = connection_info.get("pkey_path")
+        if not pkey_path:
+            return
+        ssh = _ssh_connect(pkey_path)
+        try:
+            utils.stop_service(
+                ssh, backup_writers._CORIOLIS_HTTP_WRITER_CMD)
+        finally:
+            ssh.close()
+
+    def delete_replica_disks(
+            self, ctxt, connection_info, target_environment, volumes_info):
+        # scsi_debug devices are managed externally; nothing to delete here.
+        return volumes_info
+
+    def create_replica_disk_snapshots(
+            self, ctxt, connection_info, target_environment, volumes_info):
+        # scsi_debug has no snapshot support.
+        return volumes_info
+
+    def delete_replica_target_disk_snapshots(
+            self, ctxt, connection_info, target_environment, volumes_info):
+        return volumes_info
+
+    def restore_replica_disk_snapshots(
+            self, ctxt, connection_info, target_environment, volumes_info):
+        return volumes_info
+
+    def deploy_replica_instance(
+            self, ctxt, connection_info, target_environment, instance_name,
+            export_info, volumes_info, clone_disks):
+        return {"instance_deployment_info": {}}
+
+    def finalize_replica_instance_deployment(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
+        return {
+            "id": "test-instance",
+            "name": "test-instance",
+            "num_cpu": 1,
+            "memory_mb": 512,
+            "os_type": "linux",
+            "nested_virtualization": False,
+            "devices": {
+                "disks": [],
+                "cdroms": [],
+                "nics": [],
+                "serial_ports": [],
+                "floppies": [],
+                "controllers": [],
+            },
+        }
+
+    def cleanup_failed_replica_instance_deployment(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
+        pass
+
+    # BaseInstanceProvider
+
+    def get_os_morphing_tools(self, os_type, osmorphing_info):
+        return []
+
+    # BaseImportInstanceProvider
+
+    def deploy_os_morphing_resources(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
+        return {}
+
+    def delete_os_morphing_resources(
+            self, ctxt, connection_info, target_environment,
+            os_morphing_resources):
+        pass
+
+    # BaseReplicaImportValidationProvider
+
+    def validate_replica_import_input(
+            self, ctxt, connection_info, target_environment, export_info,
+            check_os_morphing_resources=False, check_final_vm_params=False):
+        return {}
+
+    def validate_replica_deployment_input(
+            self, ctxt, connection_info, target_environment, export_info):
+        return {}
+
+
+# Helpers
+def _ssh_connect(pkey_path):
+    pkey = paramiko.RSAKey.from_private_key_file(pkey_path)
+    ssh = paramiko.SSHClient()
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    ssh.connect(hostname="127.0.0.1", username="root", pkey=pkey)
+    return ssh
+
+
+def _read_file(path):
+    """Return the contents of *path* as a string."""
+    with open(path) as fh:
+        return fh.read()

+ 3 - 0
coriolis/tests/integration/test_smoke.py

@@ -24,6 +24,7 @@ class HarnessSmokeTest(base.CoriolisIntegrationTestBase):
         """Endpoint create / get exercises."""
         endpoint = self._create_endpoint(
             name="smoke-test-endpoint",
+            endpoint_type="test-src",
             description="harness smoke test",
             connection_info={"key": "value"},
         )
@@ -37,11 +38,13 @@ class HarnessSmokeTest(base.CoriolisIntegrationTestBase):
 
         src = self._create_endpoint(
             name="smoke-src",
+            endpoint_type="test-src",
             connection_info={"foo": "lish"},
         )
 
         dst = self._create_endpoint(
             name="smoke-dst",
+            endpoint_type="test-dest",
             connection_info={"bar": "tender"},
         )