Преглед изворни кода

Adds transfer schedule, execution, deployment integration tests

Tests CRUD APIs for transfer schedules. Adds a test that asserts that a
transfer is executed as scheduled.

Tests deployments.list(), get(), clone_disks=False deployment, and
deployments.cancel().
Claudiu Belu пре 3 недеља
родитељ
комит
7db8a2ec57

+ 55 - 8
coriolis/tests/integration/base.py

@@ -105,6 +105,8 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
 
 class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
 
+    _CREATE_SCSI_DBG_DEVS = True
+
     @classmethod
     def setUpClass(cls):
         result = subprocess.run(
@@ -124,10 +126,14 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
     def setUp(self):
         super().setUp()
 
-        self._src_device = test_utils.add_scsi_debug_device()
-        self.addCleanup(test_utils.remove_scsi_debug_device)
-        self._dst_device = test_utils.add_scsi_debug_device()
-        self.addCleanup(test_utils.remove_scsi_debug_device)
+        self._src_device = None
+        self._dst_device = None
+
+        if self._CREATE_SCSI_DBG_DEVS:
+            self._src_device = test_utils.add_scsi_debug_device()
+            self.addCleanup(test_utils.remove_scsi_debug_device)
+            self._dst_device = test_utils.add_scsi_debug_device()
+            self.addCleanup(test_utils.remove_scsi_debug_device)
 
         # Write a test pattern on the src device.
         test_utils.write_test_pattern(self._src_device)
@@ -187,7 +193,8 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             is_admin=True,
         )
 
-    def wait_for_execution(self, execution_id, timeout=300):
+    def wait_for_execution(self, execution_id, timeout=300,
+                           desired_statuses=None):
         """Block until *execution_id* reaches a terminal state.
 
         Polls the DB directly and yields on each iteration so in-process
@@ -196,17 +203,20 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
         Returns the finalised TasksExecution ORM object.
         Raises ``AssertionError`` on timeout.
         """
+        if not desired_statuses:
+            desired_statuses = constants.FINALIZED_EXECUTION_STATUSES
+
         ctxt = self._get_db_context()
         deadline = time.monotonic() + timeout
         while time.monotonic() < deadline:
             execution = db_api.get_tasks_execution(ctxt, execution_id)
-            if execution.status in constants.FINALIZED_EXECUTION_STATUSES:
+            if execution.status in desired_statuses:
                 return execution
             time.sleep(1)
         self.fail(
-            "Execution %s did not reach a terminal state within %ds "
+            "Execution %s did not reach one of the states %r within %ds"
             "(last status: %s)"
-            % (execution_id, timeout, execution.status)
+            % (execution_id, timeout, desired_statuses, execution.status)
         )
 
     def assertExecutionCompleted(self, execution_id, timeout=300):
@@ -243,6 +253,28 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             % (execution_id, execution.status),
         )
 
+    def wait_for_deployment(self, deployment_id, timeout=300,
+                            desired_statuses=None):
+        """Block until *deployment_id* reaches any terminal state.
+
+        Returns the finalised deployment ORM object.
+        Raises ``AssertionError`` on timeout.
+        """
+        if not desired_statuses:
+            desired_statuses = constants.FINALIZED_EXECUTION_STATUSES
+
+        ctxt = self._get_db_context()
+        deadline = time.monotonic() + timeout
+        while time.monotonic() < deadline:
+            deployment = db_api.get_deployment(ctxt, deployment_id)
+            if deployment.last_execution_status in desired_statuses:
+                return deployment
+            time.sleep(1)
+        self.fail(
+            "Deployment %s did not reach one of the states %r within %ds"
+            % (deployment_id, desired_statuses, timeout)
+        )
+
     def assertDeploymentCompleted(self, deployment_id, timeout=300):
         """Assert that *deployment_id* finishes with a completed status.
 
@@ -267,3 +299,18 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             "Deployment %s did not reach a terminal state within %ds"
             % (deployment_id, timeout)
         )
+
+    def _patch_add_delay(self, obj, method_name):
+        _orig = getattr(obj, method_name)
+
+        def _slow_call(*args, **kwargs):
+            time.sleep(10)
+            return _orig(*args, **kwargs)
+
+        self._execute_and_wait(self._transfer.id)
+
+        patcher = mock.patch.object(
+            obj, method_name, side_effect=_slow_call, autospec=True,
+        )
+        patcher.start()
+        self.addCleanup(patcher.stop)

+ 30 - 0
coriolis/tests/integration/harness.py

@@ -46,6 +46,7 @@ from coriolis import context
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import api as sqlalchemy_api
 from coriolis.db.sqlalchemy import migration as db_migration
+from coriolis.deployer_manager.rpc import server as deployer_manager_rpc_server
 from coriolis import exception
 from coriolis import policy as policy_module
 from coriolis import rpc as rpc_module
@@ -53,6 +54,7 @@ from coriolis.scheduler.rpc import server as scheduler_rpc_server
 from coriolis import service
 from coriolis.tasks import factory as task_runners_factory
 from coriolis.tests.integration import utils as test_utils
+from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server
 from coriolis import utils as coriolis_utils
 from coriolis.worker.rpc import server as worker_rpc_server
 
@@ -252,6 +254,9 @@ class _IntegrationHarness:
         coriolis_utils.setup_logging()
         test_utils.init_scsi_debug()
 
+        self._send_signal_patcher = mock.patch("psutil.Process.send_signal")
+        self._send_signal_patcher.start()
+
         # Policy enforcer: reset so it re-reads the new CONF (no policy file).
         policy_module.reset()
 
@@ -260,6 +265,8 @@ class _IntegrationHarness:
         self.api_port = None
         self._conductor_svc = None
         self._scheduler_svc = None
+        self._transfer_cron_svc = None
+        self._deployer_manager_svc = None
         self._worker_svc = None
         self._worker_host_svc = None
 
@@ -319,6 +326,26 @@ class _IntegrationHarness:
         )
         self._scheduler_svc.start()
 
+        # Transfer-cron: constructor makes an RPC call to the conductor to load
+        # existing schedules, so the conductor must be running first.
+        self._transfer_cron_svc = service.MessagingService(
+            constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC,
+            [transfer_cron_rpc_server.TransferCronServerEndpoint()],
+            transfer_cron_rpc_server.VERSION,
+            worker_count=1,
+            init_rpc=False,
+        )
+        self._transfer_cron_svc.start()
+
+        self._deployer_manager_svc = service.MessagingService(
+            constants.DEPLOYER_MANAGER_MAIN_MESSAGING_TOPIC,
+            [deployer_manager_rpc_server.DeployerManagerServerEndpoint()],
+            deployer_manager_rpc_server.VERSION,
+            worker_count=1,
+            init_rpc=False,
+        )
+        self._deployer_manager_svc.start()
+
         # Worker: constructor calls _register_worker_service() which makes a
         # blocking RPC call to the conductor, so the conductor must already be
         # listening.
@@ -380,6 +407,8 @@ class _IntegrationHarness:
     def _teardown(self):
         LOG.info("Teardown initiated.")
 
+        self._send_signal_patcher.stop()
+
         try:
             coriolis_utils.exec_process(
                 [
@@ -397,6 +426,7 @@ class _IntegrationHarness:
             pass
 
         for svc in [self._worker_host_svc, self._worker_svc,
+                    self._deployer_manager_svc, self._transfer_cron_svc,
                     self._scheduler_svc, self._conductor_svc]:
             if not svc:
                 continue

+ 64 - 8
coriolis/tests/integration/test_deployment.py

@@ -7,26 +7,82 @@ Integration test for the replica deployment.
 Runs a full replica execution to completion, then deploys (instantiates) the
 replica and asserts that the deployment execution reaches COMPLETED.
 
+Covers deployments.list(), get(), list(detail=True), clone_disks=False,
+deployments.cancel().
+
 Must be run as root; requires the scsi_debug kernel module.
 """
 
+from coriolis import constants
 from coriolis.tests.integration import base
+from coriolis.tests.integration.providers.test_provider import imp
 
 
 class ReplicaDeploymentIntegrationTest(base.ReplicaIntegrationTestBase):
-    """Deploy a previously-replicated replica."""
-
-    def test_replica_deployment(self):
-        """Execute a replica, then deploy it and assert COMPLETED."""
 
+    def _make_deployment(self, **kwargs):
         # replica execution
         self._execute_and_wait(self._transfer.id)
 
-        # deployment
         deployment = self._client.deployments.create_from_transfer(
-            self._transfer.id,
-            skip_os_morphing=True,
+            self._transfer.id, **kwargs)
+        self.addCleanup(
+            self._ignoreExc(self._client.deployments.delete), deployment.id)
+
+        return deployment
+
+    def test_replica_deployment_crud(self):
+        # Create.
+        deployment = self._make_deployment(skip_os_morphing=True)
+        self.assertDeploymentCompleted(deployment.id)
+
+        # Get.
+        fetched = self._client.deployments.get(deployment.id)
+        self.assertEqual(deployment.id, fetched.id)
+
+        # List.
+        deployments = self._client.deployments.list()
+        ids = [d.id for d in deployments]
+        self.assertIn(deployment.id, ids)
+
+        # Delete.
+        self._client.deployments.delete(deployment.id)
+
+        deployments = self._client.deployments.list()
+        ids = [s.id for s in deployments]
+        self.assertNotIn(deployment.id, ids)
+
+    def test_replica_deployment_without_disk_cloning(self):
+        # clone_disks=False reuses the already-transferred replica disks
+        # directly instead of cloning them first.
+        deployment = self._make_deployment(
+            clone_disks=False, skip_os_morphing=True)
+
+        self.assertDeploymentCompleted(deployment.id)
+
+    def test_cancel_deployment(self):
+        # Artificially bump the execution time of a deployment.
+        self._patch_add_delay(
+            imp.TestImportProvider,
+            "finalize_replica_instance_deployment"
         )
+
+        deployment = self._client.deployments.create_from_transfer(
+            self._transfer.id, skip_os_morphing=True)
         self.addCleanup(self._client.deployments.delete, deployment.id)
 
-        self.assertDeploymentCompleted(deployment.id)
+        # Wait until the deployment is RUNNING before issuing the cancel.
+        self.wait_for_deployment(
+            deployment.id, 30, [constants.EXECUTION_STATUS_RUNNING])
+
+        # Cancel the deployment.
+        self._client.deployments.cancel(deployment.id)
+        final = self.wait_for_deployment(deployment.id)
+        self.assertIn(
+            final.last_execution_status,
+            [
+                constants.EXECUTION_STATUS_CANCELED,
+                constants.EXECUTION_STATUS_ERROR,
+                constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
+            ],
+        )

+ 22 - 31
coriolis/tests/integration/transfers/test_executions.py

@@ -5,14 +5,8 @@
 Integration tests for the transfer executions.
 """
 
-import time
-from unittest import mock
-
 from coriolis import constants
-from coriolis import context
-from coriolis.db import api as db_api
 from coriolis.tests.integration import base
-from coriolis.tests.integration import harness
 from coriolis.tests.integration.providers.test_provider import imp
 
 
@@ -59,6 +53,25 @@ class TransferExecutionsTests(base.ReplicaIntegrationTestBase):
 
         self.assertExecutionCompleted(execution.id)
 
+    def test_execution_auto_deploy(self):
+        execution = self._client.transfer_executions.create(
+            self._transfer.id,
+            shutdown_instances=False,
+            auto_deploy=True,
+        )
+        self.addCleanup(
+            self._client.transfer_executions.delete,
+            self._transfer.id, execution.id)
+        self.assertExecutionCompleted(execution.id)
+
+        deployments = self._client.deployments.list()
+        transfer_deployments = [
+            d for d in deployments if d.transfer_id == self._transfer.id
+        ]
+        self.assertEqual(1, len(transfer_deployments))
+        self.addCleanup(
+            self._client.deployments.delete, transfer_deployments[0].id)
+
     def test_cancel_running_execution(self):
         self._test_cancel_running_execution(False)
 
@@ -72,20 +85,10 @@ class TransferExecutionsTests(base.ReplicaIntegrationTestBase):
         and that the execution reaches a finalized (CANCELED or ERROR) state.
         """
         # Artificially bump the execution time of a transfer.
-        _orig = imp.TestImportProvider.deploy_replica_target_resources
-
-        def _slow_deploy(*args, **kwargs):
-            time.sleep(10)
-            return _orig(*args, **kwargs)
-
-        patcher = mock.patch.object(
+        self._patch_add_delay(
             imp.TestImportProvider,
             "deploy_replica_target_resources",
-            side_effect=_slow_deploy,
-            autospec=True,
         )
-        patcher.start()
-        self.addCleanup(patcher.stop)
 
         execution = self._client.transfer_executions.create(
             self._transfer.id, shutdown_instances=False)
@@ -94,20 +97,8 @@ class TransferExecutionsTests(base.ReplicaIntegrationTestBase):
             self._transfer.id, execution.id)
 
         # Wait until the execution is RUNNING before issuing the cancel.
-        ctxt = context.RequestContext(
-            user='int-test',
-            project_id=harness._TEST_PROJECT_ID,
-            is_admin=True)
-        deadline = time.monotonic() + 30
-        while time.monotonic() < deadline:
-            db_exec = db_api.get_tasks_execution(ctxt, execution.id)
-            if db_exec.status == constants.EXECUTION_STATUS_RUNNING:
-                break
-            time.sleep(0.5)
-        else:
-            self.fail(
-                "Execution %s did not reach RUNNING within 30s "
-                "(last status: %s)" % (execution.id, db_exec.status))
+        self.wait_for_execution(
+            execution.id, 30, [constants.EXECUTION_STATUS_RUNNING])
 
         # Cancel the execution.
         self._client.transfer_executions.cancel(

+ 102 - 0
coriolis/tests/integration/transfers/test_schedules.py

@@ -0,0 +1,102 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""Integration tests for transfer schedule CRUD.
+
+Creates, lists, gets, updates, and deletes schedules attached to a replica
+transfer. Does not require the transfer-cron service: schedule execution is
+not tested here, only the management API.
+"""
+
+import datetime
+import time
+
+from oslo_utils import timeutils
+
+from coriolis.tests.integration import base
+
+
+class _TransferScheduleTestBase(base.ReplicaIntegrationTestBase):
+
+    def _create_schedule(self, **overrides):
+        defaults = {
+            "transfer": self._transfer.id,
+            "schedule": {"hour": 3, "minute": 0},
+            "enabled": True,
+            "expiration_date": None,
+            "shutdown_instance": False,
+            "auto_deploy": False,
+        }
+        defaults.update(overrides)
+        sched = self._client.transfer_schedules.create(**defaults)
+        self.addCleanup(
+            self._ignoreExc(self._client.transfer_schedules.delete),
+            self._transfer.id, sched.id)
+
+        return sched
+
+
+class TransferScheduleBasicTests(_TransferScheduleTestBase):
+
+    _CREATE_SCSI_DBG_DEVS = False
+
+    def test_schedule_crud(self):
+        # Create.
+        sched = self._create_schedule()
+
+        # Get.
+        fetched = self._client.transfer_schedules.get(
+            self._transfer.id, sched.id)
+        self.assertEqual(sched.id, fetched.id)
+        self.assertTrue(fetched.enabled)
+
+        # List.
+        schedules = self._client.transfer_schedules.list(self._transfer.id)
+        ids = [s.id for s in schedules]
+        self.assertIn(sched.id, ids)
+
+        # Update.
+        updated = self._client.transfer_schedules.update(
+            self._transfer.id, sched.id, {"enabled": False})
+        self.assertFalse(updated.enabled)
+
+        # Delete.
+        self._client.transfer_schedules.delete(self._transfer.id, sched.id)
+
+        schedules = self._client.transfer_schedules.list(self._transfer.id)
+        ids = [s.id for s in schedules]
+        self.assertNotIn(sched.id, ids)
+
+
+class TransferScheduleTests(_TransferScheduleTestBase):
+
+    def test_scheduled_execution(self):
+        """Verify that a schedule triggers a transfer execution.
+
+        Creates a schedule targeting the next wall-clock minute so the
+        in-process transfer-cron service fires it within ~120 seconds.
+        The cron checks jobs every 60s.
+        """
+        now = timeutils.utcnow()
+        target = now + datetime.timedelta(seconds=10)
+        self._create_schedule(
+            schedule={"minute": target.minute, "hour": target.hour},
+            enabled=True,
+        )
+
+        # Poll until an execution appears (up to 120 s).
+        deadline = time.monotonic() + 120
+        execution = None
+        while time.monotonic() < deadline:
+            executions = self._client.transfer_executions.list(
+                self._transfer.id)
+            if executions:
+                execution = executions[0]
+                break
+            time.sleep(2)
+
+        self.assertIsNotNone(
+            execution,
+            "No transfer execution was triggered within 120s by the schedule")
+
+        self.assertExecutionCompleted(execution.id)