Parcourir la source

Add 'confirm_task_cancellation' conductor call.

Nashwan Azhari il y a 6 ans
Parent
commit
6ba1389665

+ 5 - 0
coriolis/conductor/rpc/client.py

@@ -240,6 +240,11 @@ class ConductorClient(object):
         self._client.call(
             ctxt, 'task_completed', task_id=task_id, task_result=task_result)
 
+    def confirm_task_cancellation(self, ctxt, task_id, cancellation_details):
+        self._client.call(
+            ctxt, 'confirm_task_cancellation', task_id=task_id,
+            cancellation_details=cancellation_details)
+
     def set_task_error(self, ctxt, task_id, exception_details):
         self._client.call(ctxt, 'set_task_error', task_id=task_id,
                           exception_details=exception_details)

+ 73 - 18
coriolis/conductor/rpc/server.py

@@ -1275,7 +1275,9 @@ class ConductorServerEndpoint(object):
                     task.id, task.status,
                     constants.TASK_STATUS_FORCE_CANCELED)
                 db_api.set_task_status(
-                    ctxt, task.id, constants.TASK_STATUS_FORCE_CANCELED)
+                    ctxt, task.id, constants.TASK_STATUS_FORCE_CANCELED,
+                    exception_details=(
+                        "This task was force-canceled at user request."))
                 continue
 
             if task.status in (
@@ -1877,10 +1879,10 @@ class ConductorServerEndpoint(object):
             if not task.on_error:
                 LOG.warn(
                     "Non-error task '%s' was marked as %s although it should "
-                    "not have. Running to completion anyway.",
+                    "not have. It was run to completion anyway.",
                     task.id, task.status)
             LOG.info(
-                "On-error task '%s' which was in '%s' has just completed "
+                "On-error task '%s' which was '%s' has just completed "
                 "successfully.  Marking it as '%s' as a final status, "
                 "but processing its result as if it completed successfully.",
                 task_id, task.status,
@@ -1922,9 +1924,10 @@ class ConductorServerEndpoint(object):
         else:
             if task.status != constants.TASK_STATUS_RUNNING:
                 LOG.warn(
-                    "Task '%s' was in '%s' state instead of the expected "
-                    "RUNNING state. Marking as COMPLETED anyway.",
-                    task_id, task.status)
+                    "Just-completed task '%s' was in '%s' state instead of "
+                    "the expected '%s' state. Marking as '%s' anyway.",
+                    task_id, task.status, constants.TASK_STATUS_RUNNING,
+                    constants.TASK_STATUS_COMPLETED)
             db_api.set_task_status(
                 ctxt, task_id, constants.TASK_STATUS_COMPLETED)
 
@@ -1996,10 +1999,64 @@ class ConductorServerEndpoint(object):
                     ctxt, subtask.id,
                     constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
 
+    @tasks_execution_synchronized
+    def confirm_task_cancellation(self, ctxt, task_id, cancellation_details):
+        LOG.info(
+            "Received confirmation of cancellation for task '%s': %s",
+            task_id, cancellation_details)
+        task = db_api.get_task(ctxt, task_id)
+
+        final_status = constants.TASK_STATUS_CANCELED
+        exception_details = (
+            "This task was user-cancelled. Additional cancellation "
+            "info: '%s'", cancellation_details)
+        if task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
+            LOG.error(
+                "Received cancellation confirmation for task '%s' which was "
+                "in '%s' state. This likely means that a double-cancellation "
+                "occurred. Marking task as '%s' either way.",
+                task.id, task.status, final_status)
+        elif task.status == constants.TASK_STATUS_FORCE_CANCELED:
+            # it means a force cancel has been issued before the
+            # confirmation that the task was canceled came in:
+            LOG.warn(
+                "Only just received error confirmation for force-cancelled "
+                "task '%s'. Leaving marked as force-cancelled.", task.id)
+            final_status = constants.TASK_STATUS_FORCE_CANCELED
+            exception_details = (
+                "This task was force-cancelled. Confirmation of its "
+                "cancellation did eventually come in. Additional details on "
+                "the cancellation: %s" % cancellation_details)
+        elif task.status in constants.FINALIZED_TASK_STATUSES:
+            LOG.warn(
+                "Received confirmation of cancellation for already finalized "
+                "task '%s' (status '%s') from host '%s'. NOT modifying "
+                "its status.", task.id, task.status, task.host)
+            final_status = task.status
+
+        if final_status == task.status:
+            LOG.debug(
+                "NOT altering state of task '%s' ('%s') following confirmation"
+                " of cancellation. Updating its exception details though.")
+            db_api.set_task_status(
+                ctxt, task.id, final_status,
+                exception_details=exception_details)
+        else:
+            LOG.info(
+                "Transitioning canceled task '%s' from '%s' to '%s' following "
+                "confirmation of its cancellation.",
+                task.id, task.status, final_status)
+            db_api.set_task_status(
+                ctxt, task.id, final_status,
+                exception_details=exception_details)
+            execution = db_api.get_tasks_execution(ctxt, task.execution_id)
+            self._advance_execution_state(ctxt, execution, requery=False)
+
     @task_and_execution_synchronized
     def set_task_error(self, ctxt, task_id, exception_details):
-        LOG.error("Task error: %(task_id)s - %(ex)s",
-                  {"task_id": task_id, "ex": exception_details})
+        LOG.error(
+            "Received error confirmation for task: %(task_id)s - %(ex)s",
+            {"task_id": task_id, "ex": exception_details})
 
         task = db_api.get_task(ctxt, task_id)
 
@@ -2007,34 +2064,31 @@ class ConductorServerEndpoint(object):
         if task.status == constants.TASK_STATUS_CANCELLING:
             final_status = constants.TASK_STATUS_CANCELED
         elif task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
+            final_status = constants.TASK_STATUS_CANCELED
             if not task.on_error:
                 LOG.warn(
                     "Non-error '%s' was in '%s' status although it should "
                     "never have been marked as such. Marking as '%s' anyway.",
-                    task.id, task.status,
-                    constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
+                    task.id, task.status, final_status)
             else:
                 LOG.warn(
                     "On-error task '%s' which was in '%s' status ended up "
                     "error-ing. Marking as '%s'",
-                    task.id, task.status,
-                    constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
+                    task.id, task.status, final_status)
                 exception_details = (
                     "This is a cleanup task so was allowed to complete "
                     "following user-cancellation, but encountered an "
                     "error: %s" % exception_details)
-            final_status = constants.TASK_STATUS_CANCELED_AFTER_COMPLETION
         elif task.status == constants.TASK_STATUS_FORCE_CANCELED:
-            # it means a force cancel has been issued before the
-            # confirmation that the task was canceled came in:
+            # it means a force cancel has been issued priorly but
+            # the task has error'd anyway:
             LOG.warn(
                 "Only just received error confirmation for force-cancelled "
                 "task '%s'. Leaving marked as force-cancelled.", task.id)
             final_status = constants.TASK_STATUS_FORCE_CANCELED
             exception_details = (
-                "This task was force-cancelled. Confirmation of its "
-                "cancellation did eventually come in. The exception message "
-                "was: %s" % exception_details)
+                "This task was force-cancelled but ended up errorring anyway. "
+                "The error details were: '%s'" % exception_details)
         elif task.status in constants.FINALIZED_TASK_STATUSES:
             LOG.error(
                 "Error confirmation on task '%s' arrived despite it being "
@@ -2047,6 +2101,7 @@ class ConductorServerEndpoint(object):
                 "occurence and share the Conductor and Worker logs. "
                 "Error message in confirmation was: %s" % (
                     task.status, exception_details))
+
         LOG.debug(
             "Transitioning errored task '%s' from '%s' to '%s'",
             task.id, task.status, final_status)

+ 4 - 0
coriolis/exception.py

@@ -326,6 +326,10 @@ class TaskProcessException(CoriolisException):
     safe = True
 
 
+class TaskProcessCanceledException(TaskProcessException):
+    pass
+
+
 class OperatingSystemNotFound(NotFound):
     pass
 

+ 14 - 4
coriolis/worker/rpc/server.py

@@ -89,9 +89,14 @@ class WorkerServerEndpoint(object):
                 LOG.info("Sending SIGINT to process: %s", process_id)
                 p.send_signal(signal.SIGINT)
         except psutil.NoSuchProcess:
-            err_msg = "Task process not found: %s" % process_id
-            LOG.info(err_msg)
-            self._rpc_conductor_client.set_task_error(ctxt, task_id, err_msg)
+            msg = (
+                "Unable to find process '%s' for task '%s' for cancellation. "
+                "Presuming it was already canceled or had "
+                "completed/error'd." % (
+                    process_id, task_id))
+            LOG.error(msg)
+            self._rpc_conductor_client.confirm_task_cancellation(
+                ctxt, task_id, cancellation_details=msg)
 
     def _handle_mp_log_events(self, p, mp_log_q):
         while True:
@@ -199,7 +204,8 @@ class WorkerServerEndpoint(object):
         p.join()
 
         if result is None:
-            raise exception.CoriolisException("Task canceled")
+            raise exception.TaskProcessCanceledException(
+                "Task was canceled.")
 
         if isinstance(result, str):
             raise exception.TaskProcessException(result)
@@ -219,6 +225,10 @@ class WorkerServerEndpoint(object):
 
             self._rpc_conductor_client.task_completed(
                 ctxt, task_id, task_result)
+        except exception.TaskProcessCanceledException as ex:
+            LOG.exception(ex)
+            self._rpc_conductor_client.confirm_task_cancellation(
+                ctxt, task_id, cancellation_details=str(ex))
         except Exception as ex:
             LOG.exception(ex)
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))