Quellcode durchsuchen

Always allow on_error=True tasks to execute.

Without this PR, if there are multiple on_error tasks running in
parallel, and one of them fails, it would lead to the cancellation of
the other cleanup jobs.
Nashwan Azhari vor 6 Jahren
Ursprung
Commit
7886d9a14e
1 geänderte Dateien mit 14 neuen und 3 gelöschten Zeilen
  1. 14 3
      coriolis/conductor/rpc/server.py

+ 14 - 3
coriolis/conductor/rpc/server.py

@@ -882,14 +882,20 @@ class ConductorServerEndpoint(object):
         self._check_delete_reservation_for_transfer(migration)
 
     def _cancel_tasks_execution(self, ctxt, execution, force=False):
+        has_error_tasks = False
         has_running_tasks = False
         for task in execution.tasks:
+            if task.on_error:
+                # NOTE: always allow on_error tasks to execute
+                # as they may do required cleanup:
+                has_error_tasks = True
+                continue
+
             if task.status == constants.TASK_STATUS_RUNNING:
                 self._rpc_worker_client.cancel_task(
                     ctxt, task.host, task.id, task.process_id, force)
                 has_running_tasks = True
-            elif (task.status == constants.TASK_STATUS_PENDING and
-                    not task.on_error):
+            elif task.status == constants.TASK_STATUS_PENDING:
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
@@ -921,7 +927,12 @@ class ConductorServerEndpoint(object):
                 LOG.error("A required endpoint could not be found")
                 LOG.exception(ex)
 
-        if not has_running_tasks:
+        # NOTE: only set the whole execution to 'ERROR' if nothing's
+        # running and no on_error=True tasks are pending.
+        # Otherwise, the lifecycle of the rest of the execution will
+        # be governed in `task_completed` when any of the
+        # running/pending tasks finish:
+        if not has_running_tasks and not has_error_tasks:
             self._set_tasks_execution_status(
                 ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)