Просмотр исходного кода

conductor: fix on-error cleanup skipped when source task cancelled-after-completion

When concurrently deploying resources on the source (deploy_source)
and on the target (deploy_target), if deploy_target fails while deploy_source
is still running, the conductor marks deploy_source as `CANCELLING` and
sends it a cancel signal.

If deploy_source finishes its work before the signal kills it,
`task_completed()` is sent for a task in the `CANCELLING` state. The
conductor saves the result (so source_resources, including the resource
ID is saved in the DB) and marks the task `CANCELED_AFTER_COMPLETION`.

The on-error cleanup scheduling logic in _advance_execution_state checks:

```
elif TASK_STATUS_COMPLETED in non_error_parents.values():
```

`CANCELED_AFTER_COMPLETION` != `COMPLETED`, so the check is False.
`DELETE_TRANSFER_SOURCE_RESOURCES` is unscheduled and the source
resource is leaked.

Adding `CLEANUP_TASK_TRIGGER_STATUSES`, covering both `COMPLETED` and
`CANCELED_AFTER_COMPLETION`, since both mean the task ran to completion and
may have created resources that need cleaning up. Use that constant in the
on-error scheduling check.

Adding assertions in `coriolis/tests/integration/test_failure_recovery.py`,
that check in the DB that `DELETE_TRANSFER_SOURCE_RESOURCES` /
`DELETE_TRANSFER_TARGET_RESOURCES` reached a completed status, and that
the source /target resources were zeroed out in the action info.
Claudiu Belu 2 дней назад
Родитель
Сommit
8dc0bd6dbe

+ 7 - 3
coriolis/conductor/rpc/server.py

@@ -2722,9 +2722,13 @@ class ConductorServerEndpoint(object):
                                 parent_task_statuses)
                             task_statuses[task.id] = _start_task(task)
                         # start on-error tasks only if at least one non-error
-                        # parent task has completed successfully:
-                        elif constants.TASK_STATUS_COMPLETED in (
-                                non_error_parents.values()):
+                        # parent task has completed (COMPLETED or
+                        # CANCELED_AFTER_COMPLETION both mean the task
+                        # finished its work and may have created resources
+                        # that need cleaning up):
+                        elif any(
+                                s in constants.CLEANUP_TASK_TRIGGER_STATUSES
+                                for s in non_error_parents.values()):
                             LOG.info(
                                 "Starting on-error task '%s' as all parent "
                                 "tasks have been finalized and at least one "

+ 10 - 0
coriolis/constants.py

@@ -75,6 +75,16 @@ CANCELED_TASK_STATUSES = [
     TASK_STATUS_FAILED_TO_CANCEL
 ]
 
+# Statuses that indicate a non-error parent task completed its work and
+# may have created resources that on-error cleanup tasks must destroy.
+# CANCELED_AFTER_COMPLETION means the task ran to completion (result saved,
+# resources created) but was marked cancelled because a cancellation was
+# already in flight when the completion arrived.
+CLEANUP_TASK_TRIGGER_STATUSES = [
+    TASK_STATUS_COMPLETED,
+    TASK_STATUS_CANCELED_AFTER_COMPLETION,
+]
+
 FINALIZED_TASK_STATUSES = [
     TASK_STATUS_COMPLETED,
     TASK_STATUS_ERROR,

+ 99 - 8
coriolis/tests/integration/test_failure_recovery.py

@@ -4,32 +4,123 @@
 """
 Integration test for failure handling and cleanup.
 
-Patches the active import provider's deploy_replica_target_resources to raise
-an exception, triggers a transfer execution, and asserts that:
-  1. The execution reaches ERROR status.
-  2. Cleanup tasks (delete_replica_source_resources) ran so the replicator
-     process is no longer alive.
+Tests two symmetrical error-path scenarios:
+
+- deploy_replica_target_resources is delayed then raises -> source task
+  completes first (creating real source resources) -> execution errors ->
+  source cleanup (DELETE_TRANSFER_SOURCE_RESOURCES) must run,
+  source_resources zeroed out.
+
+- deploy_replica_source_resources is delayed then raises -> target task
+  completes first (creating real target resources) -> execution errors ->
+  destination cleanup (DELETE_TRANSFER_TARGET_RESOURCES) must run,
+  target_resources zeroed out.
 
 Must be run as root; requires the scsi_debug kernel module.
 """
 
+import time
 from unittest import mock
 
+from coriolis import constants
+from coriolis.db import api as db_api
 from coriolis.tests.integration import base
 
 
 class TransferFailureIntegrationTest(base.ReplicaIntegrationTestBase):
     """Error path and resource cleanup."""
 
-    def test_error_status_on_provider_failure(self):
-        """Execution reaches ERROR when target resource deployment fails."""
+    def _assertResourcesCleaned(self, execution_id, task_type, resource_key):
+        ctxt = self._get_db_context()
+        execution = db_api.get_tasks_execution(ctxt, execution_id)
+
+        cleanup_task = next(
+            (t for t in execution.tasks if t.task_type == task_type),
+            None,
+        )
+        self.assertIsNotNone(
+            cleanup_task,
+            "%s task not found in execution %s" % (task_type, execution_id),
+        )
+
+        self.assertIn(
+            cleanup_task.status,
+            constants.CLEANUP_TASK_TRIGGER_STATUSES,
+            "%s task did not complete (status: %s); %s may have leaked"
+            % (task_type, cleanup_task.status, resource_key),
+        )
+
+        action = db_api.get_action(
+            ctxt, execution.action_id, include_task_info=True)
+        for instance in execution.action.instances:
+            self.assertIsNone(
+                action.info.get(instance, {}).get(resource_key),
+                "%s not cleared for instance '%s' after cleanup task completed"
+                % (resource_key, instance),
+            )
+
+    def assertSourceResourcesCleaned(self, execution_id):
+        self._assertResourcesCleaned(
+            execution_id,
+            constants.TASK_TYPE_DELETE_TRANSFER_SOURCE_RESOURCES,
+            "source_resources",
+        )
+
+    def assertTargetResourcesCleaned(self, execution_id):
+        self._assertResourcesCleaned(
+            execution_id,
+            constants.TASK_TYPE_DELETE_TRANSFER_TARGET_RESOURCES,
+            "target_resources",
+        )
+
+    def test_source_resources_cleaned_on_target_failure(self):
+        """Source cleanup runs when target resource deployment fails.
+
+        deploy_replica_target_resources is delayed so that
+        deploy_replica_source_resources has time to finish (creating real
+        source resources) before the target task eventually raises. The
+        conductor must schedule DELETE_TRANSFER_SOURCE_RESOURCES and zero out
+        source_resources in the action info.
+        """
         injected_error = Exception("injected target resource failure")
 
+        def _slow_then_fail(self_provider, *args, **kwargs):
+            time.sleep(15)
+            raise injected_error
+
         with mock.patch.object(
             self._harness.imp_provider_class,
             "deploy_replica_target_resources",
-            side_effect=injected_error,
+            _slow_then_fail,
         ):
             execution = self._client.transfer_executions.create(
                 self._transfer.id, shutdown_instances=False)
             self.assertExecutionErrored(execution.id)
+
+        self.assertSourceResourcesCleaned(execution.id)
+
+    def test_target_resources_cleaned_on_source_failure(self):
+        """Target cleanup runs when source resource deployment fails.
+
+        deploy_replica_source_resources is delayed so that
+        deploy_replica_target_resources has time to finish (creating real
+        target resources) before the source task eventually raises. The
+        conductor must schedule DELETE_TRANSFER_TARGET_RESOURCES and zero out
+        target_resources in the action info.
+        """
+        injected_error = Exception("injected source resource failure")
+
+        def _slow_then_fail(self_provider, *args, **kwargs):
+            time.sleep(15)
+            raise injected_error
+
+        with mock.patch.object(
+            self._harness.exp_provider_class,
+            "deploy_replica_source_resources",
+            _slow_then_fail,
+        ):
+            execution = self._client.transfer_executions.create(
+                self._transfer.id, shutdown_instances=False)
+            self.assertExecutionErrored(execution.id)
+
+        self.assertTargetResourcesCleaned(execution.id)