Przeglądaj źródła

Adds transfer-related integration tests

Adds a basic transfer test, exercising transfers CRUD.
Adds executions test, exercising executions CRUD.
Adds a test for canceling execution. For it, we artificially bump the
execution time of a transfer, so we have enough time to cancel it
through the Coriolis API.
Claudiu Belu 3 tygodni temu
rodzic
commit
42accd7471

+ 10 - 0
coriolis/tests/integration/base.py

@@ -97,6 +97,16 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
 
         return transfer
 
+    def _ignoreExc(self, func):
+        """Wrap the given function, ignoring exceptions."""
+        def f(*args, **kwargs):
+            try:
+                return func(*args, **kwargs)
+            except Exception as ex:
+                LOG.warn("Exception encountered: %s", ex)
+
+        return f
+
 
 class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
     def setUp(self):

+ 23 - 0
coriolis/tests/integration/harness.py

@@ -103,6 +103,29 @@ class _TestAPIRouter(api_v1_router.APIRouter):
         self._setup_extensions(ext_mgr)
         base_wsgi.Router.__init__(self, mapper)
 
+    def _setup_routes(self, mapper, ext_mgr):
+        super()._setup_routes(mapper, ext_mgr)
+
+        # super()._setup_routes registers action routes with a hardcoded
+        # /{project_id}/ prefix via mapper.connect(). The test client sends
+        # paths without that prefix, so add a matching prefix-less route here.
+        action_url_pairs = [
+            ('minion_pool_actions', '/minion_pools/{id}/actions'),
+            ('endpoint_actions', '/endpoints/{id}/actions'),
+            ('deployment_actions', '/deployments/{id}/actions'),
+            ('transfer_actions', '/transfers/{id}/actions'),
+            ('transfer_tasks_execution_actions',
+             '/transfers/{transfer_id}/executions/{id}/actions'),
+        ]
+        for action, url in action_url_pairs:
+            mapper.connect(
+                action,
+                url,
+                controller=self.resources[action],
+                action='action',
+                conditions={'method': 'POST'},
+            )
+
 
 class _InProcessWorkerServerEndpoint(worker_rpc_server.WorkerServerEndpoint):
     """Worker endpoint that runs tasks as greenlets instead of subprocesses.

+ 0 - 0
coriolis/tests/integration/transfers/__init__.py


+ 127 - 0
coriolis/tests/integration/transfers/test_executions.py

@@ -0,0 +1,127 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Integration tests for the transfer executions.
+"""
+
+import time
+from unittest import mock
+
+from coriolis import constants
+from coriolis import context
+from coriolis.db import api as db_api
+from coriolis.tests.integration import base
+from coriolis.tests.integration import harness
+from coriolis.tests.integration.providers.test_provider import imp
+
+
+class TransferExecutionsTests(base.ReplicaIntegrationTestBase):
+
+    def test_executions(self):
+        # We didn't start the execution yet.
+        executions = self._client.transfer_executions.list(self._transfer.id)
+        self.assertIsInstance(executions, list)
+        self.assertEqual(0, len(executions))
+
+        # Start the execution.
+        execution = self._client.transfer_executions.create(
+            self._transfer.id, shutdown_instances=False)
+        self.addCleanup(
+            self._ignoreExc(self._client.transfer_executions.delete),
+            self._transfer.id, execution.id)
+
+        self.assertExecutionCompleted(execution.id)
+        executions = self._client.transfer_executions.list(self._transfer.id)
+        ids = [e.id for e in executions]
+        self.assertIn(execution.id, ids)
+
+        # Get the execution.
+        fetched = self._client.transfer_executions.get(
+            self._transfer.id, execution.id)
+        self.assertEqual(execution.id, fetched.id)
+
+        # Delete the execution.
+        self._client.transfer_executions.delete(
+            self._transfer.id, execution.id)
+
+        executions = self._client.transfer_executions.list(self._transfer.id)
+        ids = [e.id for e in executions]
+        self.assertNotIn(execution.id, ids)
+
+    def test_shutdown_instances(self):
+        # shutdown_instances=True calls provider.shutdown_instance().
+        execution = self._client.transfer_executions.create(
+            self._transfer.id, shutdown_instances=True)
+        self.addCleanup(
+            self._client.transfer_executions.delete,
+            self._transfer.id, execution.id)
+
+        self.assertExecutionCompleted(execution.id)
+
+    def test_cancel_running_execution(self):
+        self._test_cancel_running_execution(False)
+
+    def test_force_cancel_running_execution(self):
+        self._test_cancel_running_execution(True)
+
+    def _test_cancel_running_execution(self, force):
+        """Test execution cancellation.
+
+        Verifies that a RUNNING transfer execution can be cancelled via the API
+        and that the execution reaches a finalized (CANCELED or ERROR) state.
+        """
+        # Artificially bump the execution time of a transfer.
+        _orig = imp.TestImportProvider.deploy_replica_target_resources
+
+        def _slow_deploy(*args, **kwargs):
+            time.sleep(10)
+            return _orig(*args, **kwargs)
+
+        patcher = mock.patch.object(
+            imp.TestImportProvider,
+            "deploy_replica_target_resources",
+            side_effect=_slow_deploy,
+            autospec=True,
+        )
+        patcher.start()
+        self.addCleanup(patcher.stop)
+
+        execution = self._client.transfer_executions.create(
+            self._transfer.id, shutdown_instances=False)
+        self.addCleanup(
+            self._client.transfer_executions.delete,
+            self._transfer.id, execution.id)
+
+        # Wait until the execution is RUNNING before issuing the cancel.
+        ctxt = context.RequestContext(
+            user='int-test',
+            project_id=harness._TEST_PROJECT_ID,
+            is_admin=True)
+        deadline = time.monotonic() + 30
+        while time.monotonic() < deadline:
+            db_exec = db_api.get_tasks_execution(ctxt, execution.id)
+            if db_exec.status == constants.EXECUTION_STATUS_RUNNING:
+                break
+            time.sleep(0.5)
+        else:
+            self.fail(
+                "Execution %s did not reach RUNNING within 30s "
+                "(last status: %s)" % (execution.id, db_exec.status))
+
+        # Cancel the execution.
+        self._client.transfer_executions.cancel(
+            self._transfer.id, execution.id, force=force)
+
+        final = self.wait_for_execution(execution.id)
+        expected_statuses = [
+            constants.EXECUTION_STATUS_CANCELED,
+            constants.EXECUTION_STATUS_ERROR,
+            constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
+        ]
+        self.assertIn(
+            final.status,
+            expected_statuses,
+            "Expected a canceled/error status after cancel, got %s"
+            % final.status,
+        )

+ 32 - 11
coriolis/tests/integration/test_transfer.py → coriolis/tests/integration/transfers/test_transfer.py

@@ -4,16 +4,6 @@
 """
 Integration tests for the replica transfer pipeline.
 
-The test has the following steps:
-  - Write a known byte pattern to the source loop device.
-  - Create source / destination endpoints and a Replica transfer via the
-    Coriolis REST API (using coriolisclient).
-  - Execute the transfer and wait for it to complete.
-  - Assert that the destination device contains the same data as the source.
-  - Overwrite a single chunk on the source device.
-  - Execute a second transfer run (incremental=True).
-  - Assert that the destination now matches the updated source.
-
 Must be run as root.
 """
 
@@ -24,8 +14,39 @@ from coriolis.tests.integration import utils as test_utils
 class ReplicaTransferIntegrationTest(base.ReplicaIntegrationTestBase):
     """Full-pipeline replica transfer integration tests."""
 
+    def test_transfer(self):
+        # List the transfer
+        transfers = self._client.transfers.list(detail=True)
+        ids = [t.id for t in transfers]
+        self.assertIn(self._transfer.id, ids)
+
+        self._execute_and_wait(self._transfer.id)
+
+        # Update the transfer
+        execution = self._client.transfers.update(
+            self._transfer.id, {"notes": "updated by integration test"})
+        self.assertExecutionCompleted(execution.id)
+
+        updated = self._client.transfers.get(self._transfer.id)
+        self.assertEqual("updated by integration test", updated.notes)
+
+        # Delete the disk
+        execution = self._client.transfers.delete_disks(self._transfer.id)
+        self.assertExecutionCompleted(execution.id)
+
     def test_incremental_replica_transfer(self):
-        """Full transfer followed by incremental after source modification."""
+        """Full transfer followed by incremental after source modification.
+
+        - Write a known byte pattern to the source loop device.
+        - Create source / destination endpoints and a Replica transfer via the
+          Coriolis REST API (using coriolisclient).
+        - Execute the transfer and wait for it to complete.
+        - Assert that the destination device contains the same data as the
+          source.
+        - Overwrite a single chunk on the source device.
+        - Execute a second transfer run (incremental=True).
+        - Assert that the destination now matches the updated source.
+        """
         # First run: full transfer
         self._execute_and_wait(self._transfer.id)