Răsfoiți Sursa

Improve reliability of task cancellation.

Nashwan Azhari 5 ani în urmă
părinte
comite
09dc4d773c
1 a modificat fișierele cu 36 adăugiri și 9 ștergeri
  1. 36 9
      coriolis/conductor/rpc/server.py

+ 36 - 9
coriolis/conductor/rpc/server.py

@@ -2448,16 +2448,24 @@ class ConductorServerEndpoint(object):
         for task in sorted(execution.tasks, key=lambda t: t.index):
 
             # if force is provided, force-cancel tasks directly:
-            if force and task.status in constants.ACTIVE_TASK_STATUSES:
+            if force and task.status in itertools.chain(
+                    constants.ACTIVE_TASK_STATUSES,
+                    [constants.TASK_STATUS_FAILED_TO_CANCEL]):
                 LOG.warn(
                     "Task '%s' is in %s state, but forcibly setting to "
-                    "'%s' because 'force' flag was provided",
+                    "'%s' as part of cancellation of execution '%s' "
+                    "because the 'force' flag was provided.",
                     task.id, task.status,
-                    constants.TASK_STATUS_FORCE_CANCELED)
+                    constants.TASK_STATUS_FORCE_CANCELED,
+                    execution.id)
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_FORCE_CANCELED,
                     exception_details=(
-                        "This task was force-canceled at user request."))
+                        "This task was force-canceled at user request. "
+                        "Its state prior to its cancellation was '%s'. "
+                        "Its error details prior to its cancellation "
+                        "were: %s" % (
+                            task.status, task.exception_details)))
                 continue
 
             if task.status in (
@@ -2478,8 +2486,10 @@ class ConductorServerEndpoint(object):
                         "This task was already pending execution but was "
                         "unscheduled during the cancellation of the parent "
                         "tasks execution."))
-            elif task.status == constants.TASK_STATUS_RUNNING:
-                # cancel any currently running/pending non-error tasks:
+            elif task.status in (
+                    constants.TASK_STATUS_RUNNING,
+                    constants.TASK_STATUS_FAILED_TO_CANCEL):
+                # cancel any currently running non-error tasks:
                 if not task.on_error:
                     LOG.debug(
                         "Sending cancellation request for  %s non-error task  "
@@ -2493,8 +2503,10 @@ class ConductorServerEndpoint(object):
                             # cancellation call to prevent the conductor from
                             # hanging an excessive amount of time:
                             task.host, timeout=10)
+                        # NOTE: the RPC client is already prepped with the
+                        # right topic so we pass 'None' as the server host:
                         worker_rpc.cancel_task(
-                            ctxt, task.host, task.id, task.process_id, force)
+                            ctxt, None, task.id, task.process_id, force)
                     except (Exception, KeyboardInterrupt):
                         msg = (
                             "Failed to send cancellation request for task '%s'"
@@ -3530,7 +3542,7 @@ class ConductorServerEndpoint(object):
                 "cancel this task when it was initially asked to or did not "
                 "receive the cancellation request, but it did eventually "
                 "confirm the task's cancellation with the following "
-                "details: %s" % cancellation_details)
+                "details: %s" % (task.host, cancellation_details))
         elif task.status in constants.FINALIZED_TASK_STATUSES:
             LOG.warn(
                 "Received confirmation of cancellation for already finalized "
@@ -3610,7 +3622,7 @@ class ConductorServerEndpoint(object):
                 "The worker service for this task (%s) has failed to cancel "
                 "this task when it was asked to, and the task eventually "
                 "exited with an error. The error details were: %s" % (
-                    task.id, exception_details))
+                    task.host, exception_details))
         elif task.status in constants.FINALIZED_TASK_STATUSES:
             LOG.error(
                 "Error confirmation on task '%s' arrived despite it being "
@@ -3688,12 +3700,27 @@ class ConductorServerEndpoint(object):
     @task_synchronized
     def task_event(self, ctxt, task_id, level, message):
         LOG.info("Task event: %s", task_id)
+        task = db_api.get_task(ctxt, task_id)
+        if task.status not in constants.ACTIVE_TASK_STATUSES:
+            raise exception.InvalidTaskState(
+                "Task with ID '%s' is in a non-running state ('%s') but it "
+                "has received a task event from its task host ('%s'). "
+                "Refusing task event. The event string was: %s" % (
+                    task.id, task.status, task.host, message))
         db_api.add_task_event(ctxt, task_id, level, message)
 
     @task_synchronized
     def task_progress_update(self, ctxt, task_id, current_step, total_steps,
                              message):
         LOG.info("Task progress update: %s", task_id)
+        task = db_api.get_task(ctxt, task_id)
+        if task.status not in constants.ACTIVE_TASK_STATUSES:
+            raise exception.InvalidTaskState(
+                "Task with ID '%s' is in a non-running state ('%s') but it "
+                "has received a progress update from its task host ('%s'). "
+                "Refusing progress update. The progress update string "
+                "was: %s" % (
+                    task.id, task.status, task.host, message))
         db_api.add_task_progress_update(ctxt, task_id, current_step,
                                         total_steps, message)