Browse Source

Correctly cancel execution if cancellation is done during on-error tasks.

Nashwan Azhari 6 years ago
parent
commit
3ba776feb7
4 changed files with 131 additions and 34 deletions
  1. 97 22
      coriolis/conductor/rpc/server.py
  2. 7 2
      coriolis/constants.py
  3. 2 1
      coriolis/tasks/base.py
  4. 25 9
      coriolis/utils.py

+ 97 - 22
coriolis/conductor/rpc/server.py

@@ -1243,19 +1243,35 @@ class ConductorServerEndpoint(object):
                     ctxt, task.id, constants.TASK_STATUS_FORCE_CANCELED)
                     ctxt, task.id, constants.TASK_STATUS_FORCE_CANCELED)
                 continue
                 continue
 
 
-            if not task.on_error:
-                # cancel any currently running/pending tasks:
-                if task.status in (
-                        constants.TASK_STATUS_RUNNING,
-                        constants.TASK_STATUS_PENDING):
+            if task.status in (
+                    constants.TASK_STATUS_RUNNING,
+                    constants.TASK_STATUS_PENDING):
+                # cancel any currently running/pending non-error tasks:
+                if not task.on_error:
                     LOG.debug(
                     LOG.debug(
-                        "Killing %s task '%s' as part of "
+                        "Killing %s non-error task '%s' as part of "
                         "cancellation of execution '%s'",
                         "cancellation of execution '%s'",
                         task.status, task.id, execution.id)
                         task.status, task.id, execution.id)
                     db_api.set_task_status(
                     db_api.set_task_status(
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
                     self._rpc_worker_client.cancel_task(
                     self._rpc_worker_client.cancel_task(
                         ctxt, task.host, task.id, task.process_id, force)
                         ctxt, task.host, task.id, task.process_id, force)
+                # let any on-error tasks run to completion but mark
+                # them as CANCELLING_AFTER_COMPLETION so they will
+                # be marked as cancelled once they are completed:
+                else:
+                    LOG.debug(
+                        "Marking %s on-error task %s as %s as part of "
+                        "cancellation of execution  %s",
+                        task.status, task.id,
+                        constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION,
+                        execution.id)
+                    db_api.set_task_status(
+                        ctxt, task.id,
+                        constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION,
+                        exception_details=(
+                            "Task will be marked as cancelled after completion"
+                            " as it is a cleanup task."))
             elif task.status == constants.TASK_STATUS_ON_ERROR_ONLY:
             elif task.status == constants.TASK_STATUS_ON_ERROR_ONLY:
                 # mark all on-error-only tasks as scheduled:
                 # mark all on-error-only tasks as scheduled:
                 LOG.debug(
                 LOG.debug(
@@ -1296,8 +1312,24 @@ class ConductorServerEndpoint(object):
         """ Saves the ID of the worker host which has accepted and started
         """ Saves the ID of the worker host which has accepted and started
         the task to the DB and marks the task as 'RUNNING'. """
         the task to the DB and marks the task as 'RUNNING'. """
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
+        new_status = constants.TASK_STATUS_RUNNING
+        exception_details = None
         if task.status == constants.TASK_STATUS_CANCELLING:
         if task.status == constants.TASK_STATUS_CANCELLING:
             raise exception.TaskIsCancelling(task_id=task_id)
             raise exception.TaskIsCancelling(task_id=task_id)
+        elif task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
+            if not task.on_error:
+                LOG.warn(
+                    "Non-error task '%s' was in '%s' status although it should"
+                    " not have been. Setting a task host for it anyway.",
+                    task.id, task.status)
+            LOG.debug(
+                "Task '%s' is in %s status, so it will be allowed to "
+                "have a host set for it and run to completion.",
+                task.id, task.status)
+            new_status = constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION
+            exception_details = (
+                "This is a cleanup task so it will be allowed to run to "
+                "completion despite user-cancellation.")
         elif task.status != constants.TASK_STATUS_PENDING:
         elif task.status != constants.TASK_STATUS_PENDING:
             raise exception.InvalidTaskState(
             raise exception.InvalidTaskState(
                 "Task with ID '%s' is in '%s' status instead of the "
                 "Task with ID '%s' is in '%s' status instead of the "
@@ -1305,7 +1337,8 @@ class ConductorServerEndpoint(object):
                     task_id, task.status, constants.TASK_STATUS_PENDING))
                     task_id, task.status, constants.TASK_STATUS_PENDING))
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_status(
         db_api.set_task_status(
-            ctxt, task_id, constants.TASK_STATUS_RUNNING)
+            ctxt, task_id, new_status,
+            exception_details=exception_details)
 
 
     def _check_clean_execution_deadlock(
     def _check_clean_execution_deadlock(
             self, ctxt, execution, task_statuses=None, requery=True):
             self, ctxt, execution, task_statuses=None, requery=True):
@@ -1380,7 +1413,9 @@ class ConductorServerEndpoint(object):
                 is_canceled = True
                 is_canceled = True
             if task.status == constants.TASK_STATUS_ERROR:
             if task.status == constants.TASK_STATUS_ERROR:
                 is_errord = True
                 is_errord = True
-            if task.status == constants.TASK_STATUS_CANCELLING:
+            if task.status in (
+                    constants.TASK_STATUS_CANCELLING,
+                    constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION):
                 is_cancelling = True
                 is_cancelling = True
             if task.status == constants.TASK_STATUS_SCHEDULED:
             if task.status == constants.TASK_STATUS_SCHEDULED:
                 has_scheduled_tasks = True
                 has_scheduled_tasks = True
@@ -1747,22 +1782,39 @@ class ConductorServerEndpoint(object):
         LOG.info("Task completed: %s", task_id)
         LOG.info("Task completed: %s", task_id)
 
 
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
-        if task.status != constants.TASK_STATUS_RUNNING:
-            LOG.warn(
-                "Task '%s' was in '%s' state instead of the expected "
-                "RUNNING state. Marking as COMPLETED anyway.",
-                task_id, task.status)
-
-        db_api.set_task_status(
-            ctxt, task_id, constants.TASK_STATUS_COMPLETED)
-        task = db_api.get_task(ctxt, task_id)
+        if task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
+            if not task.on_error:
+                LOG.warn(
+                    "Non-error task '%s' was marked as %s although it should "
+                    "not have. Running to completion anyway.",
+                    task.id, task.status)
+            LOG.info(
+                "On-error task '%s' which was in '%s' has just completed "
+                "successfully.  Marking it as '%s' as a final status, "
+                "but processing its result as if it completed successfully.",
+                task_id, task.status,
+                constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
+            db_api.set_task_status(
+                ctxt, task_id, constants.TASK_STATUS_CANCELED_AFTER_COMPLETION,
+                exception_details=(
+                    "This is a cleanup task so it was allowed to run to "
+                    "completion after user-cancellation."))
+        else:
+            if task.status != constants.TASK_STATUS_RUNNING:
+                LOG.warn(
+                    "Task '%s' was in '%s' state instead of the expected "
+                    "RUNNING state. Marking as COMPLETED anyway.",
+                    task_id, task.status)
+            db_api.set_task_status(
+                ctxt, task_id, constants.TASK_STATUS_COMPLETED)
 
 
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
-        action_id = execution.action_id
-        action = db_api.get_action(ctxt, action_id)
         with lockutils.lock(
         with lockutils.lock(
                 constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
                 constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
-                    execution.type] % action_id):
+                    execution.type] % execution.action_id):
+            action_id = execution.action_id
+            action = db_api.get_action(ctxt, action_id)
+
             updated_task_info = None
             updated_task_info = None
             if task_result:
             if task_result:
                 LOG.info(
                 LOG.info(
@@ -1771,10 +1823,13 @@ class ConductorServerEndpoint(object):
                         "task_id": task_id,
                         "task_id": task_id,
                         "instance": task.instance,
                         "instance": task.instance,
                         "action_id": action_id,
                         "action_id": action_id,
-                        "task_result": task_result})
+                        "task_result": utils.filter_chunking_info_for_task(
+                            task_result)})
                 updated_task_info = (
                 updated_task_info = (
                     db_api.update_transfer_action_info_for_instance(
                     db_api.update_transfer_action_info_for_instance(
-                        ctxt, action_id, task.instance, task_result))
+                        ctxt, action_id, task.instance,
+                        utils.filter_chunking_info_for_task(
+                            task_result)))
             else:
             else:
                 action = db_api.get_action(ctxt, action_id)
                 action = db_api.get_action(ctxt, action_id)
                 updated_task_info = action.info[task.instance]
                 updated_task_info = action.info[task.instance]
@@ -1783,6 +1838,8 @@ class ConductorServerEndpoint(object):
                     "has completed successfuly but has not returned "
                     "has completed successfuly but has not returned "
                     "any result.", task.id, task.instance, action_id)
                     "any result.", task.id, task.instance, action_id)
 
 
+            # NOTE: refresh the execution just in case:
+            execution = db_api.get_tasks_execution(ctxt, task.execution_id)
             self._handle_post_task_actions(
             self._handle_post_task_actions(
                 ctxt, task, execution, updated_task_info)
                 ctxt, task, execution, updated_task_info)
 
 
@@ -1827,6 +1884,24 @@ class ConductorServerEndpoint(object):
         final_status = constants.TASK_STATUS_ERROR
         final_status = constants.TASK_STATUS_ERROR
         if task.status == constants.TASK_STATUS_CANCELLING:
         if task.status == constants.TASK_STATUS_CANCELLING:
             final_status = constants.TASK_STATUS_CANCELED
             final_status = constants.TASK_STATUS_CANCELED
+        elif task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
+            if not task.on_error:
+                LOG.warn(
+                    "Non-error '%s' was in '%s' status although it should "
+                    "never have been marked as such. Marking as '%s' anyway.",
+                    task.id, task.status,
+                    constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
+            else:
+                LOG.warn(
+                    "On-error task '%s' which was in '%s' status ended up "
+                    "error-ing. Marking as '%s'",
+                    task.id, task.status,
+                    constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
+                exception_details = (
+                    "This is a cleanup task so was allowed to complete "
+                    "following user-cancellation, but encountered an "
+                    "error: %s" % exception_details)
+            final_status = constants.TASK_STATUS_CANCELED_AFTER_COMPLETION
         elif task.status == constants.TASK_STATUS_FORCE_CANCELED:
         elif task.status == constants.TASK_STATUS_FORCE_CANCELED:
             # it means a force cancel has been issued before the
             # it means a force cancel has been issued before the
             # confirmation that the task was canceled came in:
             # confirmation that the task was canceled came in:

+ 7 - 2
coriolis/constants.py

@@ -30,7 +30,9 @@ TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_FORCE_CANCELED = "FORCE_CANCELED"
 TASK_STATUS_FORCE_CANCELED = "FORCE_CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
+TASK_STATUS_CANCELED_AFTER_COMPLETION = "CANCELED_AFTER_COMPLETION"
 TASK_STATUS_CANCELLING = "CANCELLING"
 TASK_STATUS_CANCELLING = "CANCELLING"
+TASK_STATUS_CANCELLING_AFTER_COMPLETION = "CANCELLING_AFTER_COMPLETION"
 TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
 TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
 TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK"
 TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
@@ -38,13 +40,15 @@ TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 ACTIVE_TASK_STATUSES = [
 ACTIVE_TASK_STATUSES = [
     TASK_STATUS_PENDING,
     TASK_STATUS_PENDING,
     TASK_STATUS_RUNNING,
     TASK_STATUS_RUNNING,
-    TASK_STATUS_CANCELLING
+    TASK_STATUS_CANCELLING,
+    TASK_STATUS_CANCELLING_AFTER_COMPLETION
 ]
 ]
 
 
 CANCELED_TASK_STATUSES = [
 CANCELED_TASK_STATUSES = [
     TASK_STATUS_CANCELED,
     TASK_STATUS_CANCELED,
     TASK_STATUS_UNSCHEDULED,
     TASK_STATUS_UNSCHEDULED,
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_FORCE_CANCELED,
+    TASK_STATUS_CANCELED_AFTER_COMPLETION,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FROM_DEADLOCK
     TASK_STATUS_CANCELED_FROM_DEADLOCK
 ]
 ]
@@ -56,7 +60,8 @@ FINALIZED_TASK_STATUSES = [
     TASK_STATUS_CANCELED,
     TASK_STATUS_CANCELED,
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
-    TASK_STATUS_CANCELED_FROM_DEADLOCK
+    TASK_STATUS_CANCELED_FROM_DEADLOCK,
+    TASK_STATUS_CANCELED_AFTER_COMPLETION
 ]
 ]
 
 
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (

+ 2 - 1
coriolis/tasks/base.py

@@ -109,7 +109,8 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
                 "Task type '%s' failed to return the following "
                 "Task type '%s' failed to return the following "
                 "declared return values in its result: %s. "
                 "declared return values in its result: %s. "
                 "Result was: %s" % (
                 "Result was: %s" % (
-                    self.__class__, missing_returns, result))
+                    self.__class__, missing_returns,
+                    utils.filter_chunking_info_for_task(result)))
 
 
         undeclared_returns = [
         undeclared_returns = [
             prop for prop in result.keys()
             prop for prop in result.keys()

+ 25 - 9
coriolis/utils.py

@@ -580,15 +580,31 @@ def filter_chunking_info_for_task(task_info):
     """ Returns a copy of the given task info with any chunking
     """ Returns a copy of the given task info with any chunking
     info on volumes removed.
     info on volumes removed.
     """
     """
-    cpy = copy.deepcopy(task_info)
-    if not cpy.get("volumes_info"):
-        return cpy
-
-    for vol in cpy['volumes_info']:
-        if vol.get("replica_state", {}).get("chunks"):
-            vol["replica_state"]["chunks"] = "<redacted>"
-
-    return cpy
+    new = {}
+    for key in task_info.keys():
+        if key != "volumes_info":
+            new[key] = copy.deepcopy(task_info[key])
+
+    if 'volumes_info' in task_info:
+        new['volumes_info'] = []
+        for vol in task_info['volumes_info']:
+            vol_cpy = {}
+            for key in vol:
+                if key != "replica_state":
+                    vol_cpy[key] = copy.deepcopy(vol[key])
+                else:
+                    vol_cpy['replica_state'] = {}
+                    for statekey in vol['replica_state']:
+                        if statekey != "chunks":
+                            vol_cpy['replica_state'][statekey] = (
+                                copy.deepcopy(
+                                    vol['replica_state'][statekey]))
+                        else:
+                            vol_cpy['replica_state']["chunks"] = (
+                                ["<redacted>"])
+            new['volumes_info'].append(vol_cpy)
+
+    return new
 
 
 
 
 def _write_systemd(ssh, cmdline, svcname, run_as=None, start=True):
 def _write_systemd(ssh, cmdline, svcname, run_as=None, start=True):