Просмотр исходного кода

Simplify task advancement logic.

Nashwan Azhari 6 лет назад
Родитель
Сommit
f3cb1857c8
3 измененных файлов с 284 добавлено и 249 удалено
  1. 263 247
      coriolis/conductor/rpc/server.py
  2. 18 1
      coriolis/constants.py
  3. 3 1
      coriolis/tasks/replica_tasks.py

+ 263 - 247
coriolis/conductor/rpc/server.py

@@ -40,13 +40,6 @@ CONDUCTOR_OPTS = [
 CONF = cfg.CONF
 CONF.register_opts(CONDUCTOR_OPTS, 'conductor')
 
-TASK_LOCK_NAME_FORMAT = "task-%s"
-EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
-ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s"
-MIGRATION_LOCK_NAME_FORMAT = "migration-%s"
-REPLICA_LOCK_NAME_FORMAT = "replica-%s"
-SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
-
 TASK_DEADLOCK_ERROR_MESSAGE = (
     "A fatal deadlock has occurred. Further debugging is required. "
     "Please review the Conductor logs and contact support for assistance.")
@@ -56,7 +49,7 @@ def endpoint_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
         @lockutils.synchronized(
-            ENDPOINT_LOCK_NAME_FORMAT % endpoint_id)
+            constants.ENDPOINT_LOCK_NAME_FORMAT % endpoint_id)
         def inner():
             return func(self, ctxt, endpoint_id, *args, **kwargs)
         return inner()
@@ -67,7 +60,7 @@ def replica_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, *args, **kwargs):
         @lockutils.synchronized(
-            REPLICA_LOCK_NAME_FORMAT % replica_id)
+            constants.REPLICA_LOCK_NAME_FORMAT % replica_id)
         def inner():
             return func(self, ctxt, replica_id, *args, **kwargs)
         return inner()
@@ -78,7 +71,7 @@ def schedule_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
         @lockutils.synchronized(
-                SCHEDULE_LOCK_NAME_FORMAT % schedule_id)
+                constants.SCHEDULE_LOCK_NAME_FORMAT % schedule_id)
         def inner():
             return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
         return inner()
@@ -89,7 +82,7 @@ def task_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, task_id, *args, **kwargs):
         @lockutils.synchronized(
-                TASK_LOCK_NAME_FORMAT % task_id)
+                constants.TASK_LOCK_NAME_FORMAT % task_id)
         def inner():
             return func(self, ctxt, task_id, *args, **kwargs)
         return inner()
@@ -101,9 +94,9 @@ def task_and_execution_synchronized(func):
     def wrapper(self, ctxt, task_id, *args, **kwargs):
         task = db_api.get_task(ctxt, task_id)
         @lockutils.synchronized(
-            EXECUTION_LOCK_NAME_FORMAT % task.execution_id)
+            constants.EXECUTION_LOCK_NAME_FORMAT % task.execution_id)
         @lockutils.synchronized(
-            TASK_LOCK_NAME_FORMAT % task_id)
+            constants.TASK_LOCK_NAME_FORMAT % task_id)
         def inner():
             return func(self, ctxt, task_id, *args, **kwargs)
         return inner()
@@ -114,7 +107,7 @@ def migration_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, migration_id, *args, **kwargs):
         @lockutils.synchronized(
-            MIGRATION_LOCK_NAME_FORMAT % migration_id)
+            constants.MIGRATION_LOCK_NAME_FORMAT % migration_id)
         def inner():
             return func(self, ctxt, migration_id, *args, **kwargs)
         return inner()
@@ -125,7 +118,7 @@ def tasks_execution_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
         @lockutils.synchronized(
-            EXECUTION_LOCK_NAME_FORMAT % execution_id)
+            constants.EXECUTION_LOCK_NAME_FORMAT % execution_id)
         def inner():
             return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
         return inner()
@@ -324,17 +317,17 @@ class ConductorServerEndpoint(object):
         task.on_error = on_error
         task.index = len(task.execution.tasks) + 1
 
-        # non-error tasks are automatically set to pending:
+        # non-error tasks are automatically set to scheduled:
         if not on_error and not on_error_only:
             task.status = constants.TASK_STATUS_SCHEDULED
         else:
             task.status = constants.TASK_STATUS_ON_ERROR_ONLY
-            # on-error-only tasks remain marked as such regardless
-            # of dependencies:
+            # on-error-only tasks remain marked as such
+            # regardless of dependencies:
             if on_error_only:
                 task.on_error = True
-            # tasks which are on-error but depend on already-defined
-            # pending tasks count as pending:
+            # plain on-error but depend on already-defined
+            # scheduled tasks count as scheduled:
             elif depends_on:
                 for task_id in depends_on:
                     if [t for t in task.execution.tasks if t.id == task_id and
@@ -360,6 +353,7 @@ class ConductorServerEndpoint(object):
         }
 
     def _begin_tasks(self, ctxt, execution, task_info={}):
+        """ Starts all non-error-only tasks which have no depencies. """
         if not ctxt.trust_id:
             keystone.create_trust(ctxt)
             ctxt.delete_trust_id = True
@@ -367,9 +361,13 @@ class ConductorServerEndpoint(object):
         origin = self._get_task_origin(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
 
+        newly_started_tasks = []
         for task in execution.tasks:
             if (not task.depends_on and
                     task.status == constants.TASK_STATUS_SCHEDULED):
+                LOG.info(
+                    "Starting dependency-less task '%s' for execution '%s'",
+                    task.id, execution.id)
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_PENDING)
                 self._rpc_worker_client.begin_task(
@@ -380,6 +378,14 @@ class ConductorServerEndpoint(object):
                     destination=destination,
                     instance=task.instance,
                     task_info=task_info.get(task.instance, {}))
+                newly_started_tasks.append(task.id)
+
+        # NOTE: this should never happen if _check_execution_tasks_sanity
+        # was called before this method:
+        if not newly_started_tasks:
+            raise exception.InvalidActionTasksExecutionState(
+                "No tasks were started at the beginning of execution '%s'" % (
+                    execution.id))
 
     def _check_execution_tasks_sanity(
             self, execution, initial_task_info):
@@ -457,7 +463,7 @@ class ConductorServerEndpoint(object):
                         for tid, t in processed_tasks.items()}
                     raise exception.CoriolisException(
                         "Execution '%s' (type '%s') is bound to be deadlocked:"
-                        " there are leftover tasks for instance '%s 'which "
+                        " there are leftover tasks for instance '%s' which "
                         "will never get queued. Already processed tasks are: "
                         "%s. Tasks left: %s" % (
                             execution.id, execution.type, instance,
@@ -533,8 +539,9 @@ class ConductorServerEndpoint(object):
             replica.info[instance].update({
                 "source_environment": replica.source_environment,
                 "target_environment": replica.destination_environment})
-                # TODO(aznashwan): have these passed separately to the tasks
-                # (they're currently passed inside the dest-env)
+                # TODO(aznashwan): have these passed separately to the relevant
+                # provider methods (they're currently passed directly inside
+                # dest-env by the API service when accepting the call)
                 # "network_map": network_map,
                 # "storage_mappings": storage_mappings,
 
@@ -645,14 +652,15 @@ class ConductorServerEndpoint(object):
             ctxt, replica_id, execution_id)
         if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
             raise exception.InvalidReplicaState(
-                "Replica '%s' has no running execution." % replica_id)
+                "Replica '%s' has no running execution to cancel." % (
+                    replica_id))
         if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
                 not force):
             raise exception.InvalidReplicaState(
                 "Replica '%s' is already being cancelled. Please use the "
                 "force option if you'd like to force-cancel it." % (
                     replica_id))
-        self._cancel_tasks_execution(ctxt, execution, force)
+        self._cancel_tasks_execution(ctxt, execution, force=force)
 
     def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
         execution = db_api.get_replica_tasks_execution(
@@ -766,15 +774,17 @@ class ConductorServerEndpoint(object):
         if [m.id for m in migrations if m.executions[0].status in (
                 constants.ACTIVE_EXECUTION_STATUSES)]:
             raise exception.InvalidReplicaState(
-                "This replica is currently being migrated")
+                "Replica '%s' is currently being migrated" % replica_id)
 
     @staticmethod
     def _check_running_executions(action):
-        if [
-                e for e in action.executions
-                if e.status in constants.ACTIVE_EXECUTION_STATUSES]:
+        running_executions = [
+            e.id for e in action.executions
+            if e.status in constants.ACTIVE_EXECUTION_STATUSES]
+        if running_executions:
             raise exception.InvalidActionTasksExecutionState(
-                "Another tasks execution is in progress")
+                "Another tasks execution is in progress: %s" % (
+                    running_executions))
 
     def _check_replica_running_executions(self, ctxt, replica):
         self._check_running_executions(replica)
@@ -986,8 +996,9 @@ class ConductorServerEndpoint(object):
                 # NOTE: we must explicitly set this in each VM's info
                 # to prevent the Replica disks from being cloned:
                 "clone_disks": False}
-                # TODO(aznashwan): have these passed separately to the tasks
-                # (they're currently passed inside the dest-env)
+                # TODO(aznashwan): have these passed separately to the relevant
+                # provider methods (they're currently passed directly inside
+                # dest-env by the API service when accepting the call)
                 # "network_map": network_map,
                 # "storage_mappings": storage_mappings,
 
@@ -1168,7 +1179,10 @@ class ConductorServerEndpoint(object):
             raise exception.InvalidMigrationState(
                 "Migration '%s' is already being cancelled. Please use the "
                 "force option if you'd like to force-cancel it.")
-        self._cancel_tasks_execution(ctxt, execution, force)
+
+        with lockutils.lock(
+                constants.EXECUTION_LOCK_NAME_FORMAT % execution.id):
+            self._cancel_tasks_execution(ctxt, execution, force=force)
         self._check_delete_reservation_for_transfer(migration)
 
     def _cancel_tasks_execution(
@@ -1176,24 +1190,41 @@ class ConductorServerEndpoint(object):
         """ Cancels a running Execution by:
         - telling workers to kill any already running non-on-error tasks
         - cancelling any non-on-error tasks which are pending
-        - triggering any on-error-only tasks with no dependencies
+        - making all on-error-only tasks as scheduled
 
         NOTE: affects whole execution, only call this
         with a lock on the Execution as a whole!
         """
-        LOG.debug(
-            "Cancelling tasks execution %s. Current status before "
-            "cancellation is '%s'", execution.id, execution.status)
-        # mark execution as cancelling:
-        self._set_tasks_execution_status(
-            ctxt, execution.id, constants.EXECUTION_STATUS_CANCELLING)
+        if requery:
+            execution = db_api.get_tasks_execution(ctxt, execution.id)
+        if execution.status == constants.EXECUTION_STATUS_RUNNING:
+            LOG.info(
+                "Cancelling tasks execution %s. Current status before "
+                "cancellation is '%s'", execution.id, execution.status)
+            # mark execution as cancelling:
+            self._set_tasks_execution_status(
+                ctxt, execution.id, constants.EXECUTION_STATUS_CANCELLING)
+        elif execution.status == constants.EXECUTION_STATUS_CANCELLING and (
+                not force):
+            LOG.info(
+                "Execution '%s' is already in CANCELLING status and no "
+                "force flag was provided, skipping re-cancellation.",
+                execution.id)
+            self._advance_execution_state(
+                ctxt, execution, requery=not requery)
+            return
+        elif execution.status in constants.FINALIZED_TASK_STATUSES:
+            LOG.info(
+                "Execution '%s' is in a finalized status '%s'. "
+                "Skipping re-cancellation.", execution.id, execution.status)
+            return
+
         # iterate through and kill/cancel any non-error
         # tasks which are running/pending:
         for task in execution.tasks:
-            if requery:
-                task = db_api.get_task(ctxt, task.id)
 
-            if force and task.status == constants.TASK_STATUS_CANCELLING:
+            # if force is provided, force-cancel tasks directly:
+            if force and task.status in constants.ACTIVE_TASK_STATUSES:
                 LOG.warn(
                     "Task '%s' is in %s state, but forcibly setting to "
                     "'%s' because 'force' flag was provided",
@@ -1204,6 +1235,7 @@ class ConductorServerEndpoint(object):
                 continue
 
             if not task.on_error:
+                # cancel any currently running/pending tasks:
                 if task.status in (
                         constants.TASK_STATUS_RUNNING,
                         constants.TASK_STATUS_PENDING):
@@ -1215,49 +1247,30 @@ class ConductorServerEndpoint(object):
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
                     self._rpc_worker_client.cancel_task(
                         ctxt, task.host, task.id, task.process_id, force)
-                # cancel any non-on-error tasks which have been scheduled:
-                elif task.status == constants.TASK_STATUS_SCHEDULED:
-                    LOG.debug(
-                        "Marking scheduled task '%s' as unscheduled as "
-                        "part of cancellation of execution '%s'",
-                        task.id, execution.id)
-                    db_api.set_task_status(
-                        ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
-                        exception_details=(
-                            "Unscheduled as part of cancellation procedure."))
-            # NOTE: ideally, all on-error-only tasks should have a
-            # clear-cut dependency task so this inefficient code
-            # should never be called:
-            elif task.on_error and (
-                    task.status == constants.TASK_STATUS_ON_ERROR_ONLY and not (
-                        task.depends_on)):
-                try:
-                    origin = self._get_task_origin(ctxt, execution.action)
-                    destination = self._get_task_destination(
-                        ctxt, execution.action)
-                except exception.NotFound as ex:
-                    LOG.error("A required endpoint could not be found")
-                    LOG.exception(ex)
-
-                action = db_api.get_action(
-                    ctxt, execution.action_id)
+            elif task.status == constants.TASK_STATUS_ON_ERROR_ONLY:
+                # mark all on-error-only tasks as scheduled:
+                LOG.debug(
+                    "Marking on-error-only task '%s' as scheduled following "
+                    "cancellation of execution '%s'",
+                    task.id, execution.id)
                 db_api.set_task_status(
-                    ctxt, task.id, constants.TASK_STATUS_PENDING)
-                self._rpc_worker_client.begin_task(
-                    ctxt, server=None,
-                    task_id=task.id,
-                    task_type=task.task_type,
-                    origin=origin,
-                    destination=destination,
-                    instance=task.instance,
-                    task_info=action.info.get(task.instance, {}))
+                    ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
             else:
                 LOG.debug(
                     "No action currently taken with respect to task '%s' "
                     "(status '%s') during cancellation of execution '%s'",
                     task.id, task.status, execution.id)
 
-        self._advance_execution_state(ctxt, execution)
+        started_tasks = self._advance_execution_state(ctxt, execution)
+        if started_tasks:
+            LOG.info(
+                "The following tasks were started after state advancement "
+                "of execution '%s' after cancellation request: %s",
+                execution.id, started_tasks)
+        else:
+            LOG.debug(
+                "No new tasks were started for execution '%s' following "
+                "state advancement after cancellation.", execution.id)
 
     @staticmethod
     def _set_tasks_execution_status(ctxt, execution_id, execution_status):
@@ -1284,20 +1297,22 @@ class ConductorServerEndpoint(object):
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
 
-    def _check_clean_execution_deadlock(self, ctxt, execution):
+    def _check_clean_execution_deadlock(
+            self, ctxt, execution, task_statuses=None, requery=True):
         """ Checks whether an execution is deadlocked.
         Deadlocked executions have no currently running/pending tasks
         but some remaining scheduled tasks.
         If this occurs, all pending/error-only tasks are marked
         as DEADLOCKED, and the execution is marked as such too.
-        Returns the state of the when the check occured
+        Returns the state of the execution when the check occured
         (either RUNNING or DEADLOCKED)
         """
-        execution = db_api.get_tasks_execution(ctxt, execution.id)
-        task_statuses = {}
-        for task in execution.tasks:
-            dbtask = db_api.get_task(ctxt, task.id)
-            task_statuses[dbtask.id] = dbtask.status
+        if requery:
+            execution = db_api.get_tasks_execution(ctxt, execution.id)
+        if not task_statuses:
+            task_statuses = {}
+            for task in execution.tasks:
+                task_statuses[task.id] = task.status
 
         determined_state = constants.EXECUTION_STATUS_RUNNING
         status_vals = task_statuses.values()
@@ -1305,19 +1320,22 @@ class ConductorServerEndpoint(object):
                 any([stat in status_vals
                      for stat in constants.ACTIVE_TASK_STATUSES])):
             LOG.warn(
-                "Execution '%s' is deadlocked. Task statuses are: %s",
+                "Execution '%s' is deadlocked. Cleaning up now. "
+                "Task statuses are: %s",
                 execution.id, task_statuses)
             for task_id, stat in task_statuses.items():
                 if stat in (
                         constants.TASK_STATUS_SCHEDULED,
                         constants.TASK_STATUS_ON_ERROR_ONLY):
-                    LOG.warn("Marking deadlocked task '%s' as that ", task_id)
+                    LOG.warn(
+                        "Marking deadlocked task '%s' as that (current "
+                        "state: %s)", task_id, stat)
                     db_api.set_task_status(
                         ctxt, task_id,
                         constants.TASK_STATUS_CANCELED_FROM_DEADLOCK,
                         exception_details=TASK_DEADLOCK_ERROR_MESSAGE)
             LOG.warn(
-                "Marking deadlocked execution '%s' as DEADLOCKED'd", execution.id)
+                "Marking deadlocked execution '%s' as DEADLOCKED", execution.id)
             self._set_tasks_execution_status(
                 ctxt, execution.id, constants.EXECUTION_STATUS_DEADLOCKED)
             LOG.error(
@@ -1332,7 +1350,8 @@ class ConductorServerEndpoint(object):
         RUNNING - at least one task is RUNNING, PENDING or CANCELLING
         COMPLETED - all non-error-only tasks are COMPLETED
         CANCELED - no more RUNNING/PENDING/SCHEDULED tasks but some CANCELED
-        ERROR - no tasks are RUNNING and at least one is ERROR'd
+        CANCELIING - at least one task in CANCELLING status
+        ERROR - not RUNNING and at least one is ERROR'd
         DEADLOCKED - has SCHEDULED tasks but none RUNNING/PENDING/CANCELLING
         """
         is_running = False
@@ -1362,11 +1381,16 @@ class ConductorServerEndpoint(object):
         elif is_cancelling:
             status = constants.EXECUTION_STATUS_CANCELLING
         elif is_running:
-            status = constants.EXECUTION_STATUS_RUNNING
+            if is_canceled:
+                # means that a cancel was issued but some cleanup tasks are
+                # currently being run, so the final status is CANCELLING:
+                status = constants.EXECUTION_STATUS_CANCELLING
+            else:
+                status = constants.EXECUTION_STATUS_RUNNING
         elif is_errord:
             status = constants.EXECUTION_STATUS_ERROR
-        # NOTE: canceled executions should never have ERROR'd tasks
-        # (they should also be CANCELED) so this comes last:
+        # NOTE: user-canceled executions should never have ERROR'd tasks
+        # (they should also be marked as CANCELED) so this comes last:
         elif is_canceled:
             status = constants.EXECUTION_STATUS_CANCELED
 
@@ -1376,27 +1400,44 @@ class ConductorServerEndpoint(object):
             execution.id, status, task_stat_map)
         return status
 
-    def _advance_execution_state(self, ctxt, execution):
+    def _advance_execution_state(self, ctxt, execution, requery=True):
         """ Advances the state of the execution by starting/refreshing
         the state of all child tasks.
-        If the task has finalized (either completed or error'd), updates
-        its state to the finalized one.
+        If the execution has finalized (either completed or error'd),
+        updates its state to the finalized one.
         Returns a list of all the tasks which were started.
         NOTE: should only be called with a lock on the Execution!
 
-        Requirements for a tasks to be started:
-        - any non-error task where any parent task got UNSCHEDULED
-          will also be UNSCHEDULED
+        Requirements for a task to be started:
+        - any SCHEDULED task with no deps will be instantly started
+        - any task where all parent tasks got UNSCHEDULED will
+          also be UNSCHEDULED
         - normal tasks (task.on_error==False & task.status==SCHEDULED):
-            * instantly started if they have no parent dependencies
-            * all parent dependency tasks must be COMPLETED
+            * started if all parent dependency tasks have been COMPLETED
+            * instantly unscheduled if all parents finalized but some
+              didn't complete successfuly.
         - on-error tasks (task.on_error==True & task.status==SCHEDULED):
-            * all parent tasks must have reached a terminal state
-        - on-error-only tasks (task.status==ON_ERROR_ONLY):
-            * at least one non-error parent tasks must have been COMPLETED
-            * all non-error parent tasks must have reached a terminal state
-            * all on-error parent tasks must have reached a terminal state
+            * all parent tasks (including on-error parents) must have
+              reached a terminal state
+            * at least one non-error parent task must have been COMPLETED
         """
+        if requery:
+            execution = db_api.get_tasks_execution(ctxt, execution.id)
+        if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
+            LOG.warn(
+                "Execution state advancement called on Execution '%s' which "
+                "is not in an active status in the DB (it's currently '%s'). "
+                "Double-checking for deadlock and returning early.")
+            if self._check_clean_execution_deadlock(
+                    ctxt, execution, task_statuses=None,
+                    requery=not requery) == (
+                        constants.EXECUTION_STATUS_DEADLOCKED):
+                LOG.error(
+                    "Execution '%s' deadlocked even before Replica state "
+                    "advancement . Cleanup has been perfomed. Returning.",
+                    execution.id)
+            return []
+
         LOG.debug(
             "State of execution '%s' before state advancement is: %s",
             execution.id, execution.status)
@@ -1420,26 +1461,47 @@ class ConductorServerEndpoint(object):
                 task_info=action.info.get(task.instance, {}))
             started_tasks.append(task.id)
 
+        # aggregate tasks and statuses:
+        task_statuses = {}
+        task_deps = {}
+        on_error_tasks = []
         for task in execution.tasks:
+            task_statuses[task.id] = task.status
 
-            if task.status == constants.TASK_STATUS_SCHEDULED:
-                # immediately start pending tasks with no deps:
-                if not task.depends_on:
+            if task.depends_on:
+                task_deps[task.id] = task.depends_on
+            else:
+                task_deps[task.id] = []
+
+            if task.on_error:
+                on_error_tasks.append(task.id)
+
+        LOG.debug(
+            "Task statuses before execution '%s' lifecycle iteration: %s",
+            execution.id, task_statuses)
+
+        # NOTE: the tasks are saved in a random order in the DB, which
+        # complicates the processing logic so we just pre-sort:
+        for task in sorted(
+                execution.tasks, key=lambda t: t.index):
+
+            if task_statuses[task.id] == constants.TASK_STATUS_SCHEDULED:
+
+                # immediately start depency-less tasks (on-error or otherwise)
+                if not task_deps[task.id]:
                     LOG.info(
                         "Starting depency-less task '%s'", task.id)
                     _start_task(task)
+                    task_statuses[task.id] = constants.TASK_STATUS_PENDING
                     continue
 
-                # gather statuses of dependent tasks:
-                parent_task_statuses = {}
-                if task.depends_on:
-                    for dep_task_id in task.depends_on:
-                        depend_task = db_api.get_task(ctxt, dep_task_id)
-                        parent_task_statuses[dep_task_id] = depend_task.status
+                parent_task_statuses = {
+                    dep_id: task_statuses[dep_id]
+                    for dep_id in task_deps[task.id]}
 
                 # immediately unschedule tasks (on-error or otherwise)
                 # if all of their parent tasks got un-scheduled:
-                if all([
+                if task_deps[task.id] and all([
                         dep_stat == constants.TASK_STATUS_UNSCHEDULED
                         for dep_stat in parent_task_statuses.values()]):
                     LOG.info(
@@ -1451,156 +1513,102 @@ class ConductorServerEndpoint(object):
                         exception_details=(
                             "Unscheduled due to the unscheduling of all "
                             "parent tasks."))
+                    task_statuses[task.id] = constants.TASK_STATUS_UNSCHEDULED
                     continue
 
-                if not task.on_error:
+                # check all parents have finalized:
+                if all([
+                        dep_stat in constants.FINALIZED_TASK_STATUSES
+                        for dep_stat in parent_task_statuses.values()]):
 
-                    # check if all parent tasks have reached a terminal state:
-                    if all([
-                            dep_stat in constants.FINALIZED_TASK_STATUSES
-                            for dep_stat in parent_task_statuses.values()]):
-                        # if all parents completed, start the task:
+                    # handle non-error tasks:
+                    if task.id not in on_error_tasks:
+                        # start non-error tasks whose parents have
+                        # all completed successfully:
                         if all([
                                 dep_stat == constants.TASK_STATUS_COMPLETED
                                 for dep_stat in (
-                                    parent_task_statuses.values())]):
+                                        parent_task_statuses.values())]):
                             LOG.info(
                                 "Starting task '%s' as all dependencies have "
-                                "completed: %s", task.id, parent_task_statuses)
+                                "completed successfully: %s",
+                                task.id, parent_task_statuses)
                             _start_task(task)
-                        # if even one parent did not complete, unschedule:
+                            task_statuses[task.id] = (
+                                constants.TASK_STATUS_PENDING)
                         else:
+                            # it means one/more parents error'd/unscheduled
+                            # so we mark this task as unscheduled:
                             LOG.info(
-                                "Unscheduling task '%s' as it has one or more "
-                                "parent tasks which got canceled/unscheduled/"
-                                "errord: %s",
+                                "Unscheduling plain task '%s' as not all "
+                                "parent tasks completed successfully: %s",
                                 task.id, parent_task_statuses)
                             db_api.set_task_status(
                                 ctxt, task.id,
                                 constants.TASK_STATUS_UNSCHEDULED,
                                 exception_details=(
-                                    "Unscheduled due to the unsuccessful "
-                                    "execution of one or more parent tasks."))
-                    else:
-                        # still has active deps, wait for next hook:
-                        LOG.debug(
-                            "Skipping starting task '%s' as some parent "
-                            "tasks have not yet finalized: %s",
-                            task.id, parent_task_statuses)
-                    continue
-
-                elif task.on_error:
+                                    "Unscheduled due to some parent tasks not "
+                                    "having completed successfully."))
+                            task_statuses[task.id] = (
+                                constants.TASK_STATUS_UNSCHEDULED)
 
-                    # check if all parents have reached a finalized state:
-                    if all([
-                            dep_stat in constants.FINALIZED_TASK_STATUSES
-                            for dep_stat in parent_task_statuses.values()]):
-                        # unschedule the task if no parent has completed:
-                        if constants.TASK_STATUS_COMPLETED not in (
-                                parent_task_statuses.values()):
+                    # handle on-error tasks:
+                    else:
+                        non_error_parents = {
+                            dep_id: task_statuses[dep_id]
+                            for dep_id in parent_task_statuses.keys()
+                            if dep_id not in on_error_tasks}
+
+                        # start on-error tasks only if at least one non-error
+                        # parent task has completed successfully:
+                        if constants.TASK_STATUS_COMPLETED in (
+                                non_error_parents.values()):
                             LOG.info(
-                                "Unscheduling plain on-error task '%s' as "
-                                "all of its dependencies have finalized "
-                                "but no parent had executed successfully: %s",
-                                task.id, parent_task_statuses)
+                                "Starting on-error task '%s' as all parent "
+                                "tasks have been finalized and at least one "
+                                "non-error parent (%s) was completed: %s",
+                                task.id, list(non_error_parents.keys()),
+                                parent_task_statuses)
+                            _start_task(task)
+                            task_statuses[task.id] = (
+                                constants.TASK_STATUS_PENDING)
+                        else:
+                            LOG.info(
+                                "Unscheduling on-error task '%s' as none of "
+                                "its parent non-error tasks (%s) have "
+                                "completed successfully: %s",
+                                task.id, list(non_error_parents.keys()),
+                                parent_task_statuses)
                             db_api.set_task_status(
                                 ctxt, task.id,
                                 constants.TASK_STATUS_UNSCHEDULED,
                                 exception_details=(
-                                    "Unscheduled due to no parent task "
-                                    "having executed successfully."))
-                            continue
-                        LOG.info(
-                            "Starting plain on-error task '%s' as all dependencies "
-                            "have been finalized: %s", task.id, parent_task_statuses)
-                        _start_task(task)
-                    else:
-                        LOG.debug(
-                            "Skipping starting on-error task '%s' as some "
-                            "parent tasks have not yet finalized: %s",
-                            task.id, parent_task_statuses)
-                    continue
-
-                LOG.debug(
-                    "No lifecycle decision was taken for SCHEDULED task '%s'. "
-                    "Current status: %s. Parent tasks: %s",
-                    task.id, task.status, parent_task_statuses)
+                                    "Unscheduled due to no non-error parent "
+                                    "tasks having completed successfully."))
+                            task_statuses[task.id] = (
+                                constants.TASK_STATUS_UNSCHEDULED)
 
-            elif task.status == constants.TASK_STATUS_ON_ERROR_ONLY:
-                if not task.depends_on:
-                    LOG.warn(
-                        "Encountered on-error-only task '%s' with no "
-                        "dependencies. These types of tasks should ideally "
-                        "always be declared with dependencies.", task.id)
-                    # skipping as it was `_cancel_task_execution`'s job
-                    # to trigger such tasks.
-                    continue
-
-                error_deps = {}
-                non_error_deps = {}
-                # gather statuses for non-error and on-error parents:
-                for dep_task_id in task.depends_on:
-                    depend_task = db_api.get_task(ctxt, dep_task_id)
-                    if depend_task.on_error:
-                        error_deps[depend_task.id] = depend_task.status
-                    else:
-                        non_error_deps[depend_task.id] = depend_task.status
-
-                # do not trigger on-error-only tasks if not all non-error
-                # parent tasks have reached a final state:
-                if not all([
-                        dep_stat in constants.FINALIZED_TASK_STATUSES
-                        for dep_stat in non_error_deps.values()]):
-                    LOG.debug(
-                        "Not triggering on-error-only task '%s' as some "
-                        "of its parent non-error tasks have not reached a "
-                        "terminal status: %s", task.id, non_error_deps)
-                    continue
-
-                # unschedule any on-error-only task if no non-on-error parent
-                # tasks completed successfully:
-                if constants.TASK_STATUS_COMPLETED not in (
-                        non_error_deps.values()):
-                    LOG.info(
-                        "Marking on-error-only task '%s' as unscheduled as all"
-                        " of its parent non-error tasks have finalized but "
-                        "none has completed successfully: %s",
-                        task.id, non_error_deps)
-                    db_api.set_task_status(
-                        ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
-                        exception_details=(
-                            "Unscheduled because all parent non-error tasks "
-                            "have finalized but none have completed "
-                            "successfully."))
-                    continue
-
-                # do not trigger on-error-only tasks unless all on-error
-                # parent tasks have reached a final state:
-                if not all([
-                        dep_stat in constants.FINALIZED_TASK_STATUSES
-                        for dep_stat in error_deps.values()]):
+                else:
                     LOG.debug(
-                        "Not triggering on-error-only task '%s' as it "
-                        "has parent on-error tasks which have not yet "
-                        "reached a terminal state: %s", task.id, error_deps)
-                    continue
-
+                        "No lifecycle decision was taken with respect to task "
+                        "%s of execution %s as not all parent tasks have "
+                        "reached a terminal state: %s",
+                        task.id, execution.id, parent_task_statuses)
+            else:
                 LOG.debug(
-                    "Starting on-error-only task '%s'. Dependency status: %s",
-                    task.id, {
-                        "on-error-dependencies": error_deps,
-                        "non-error-dependencies": non_error_deps})
-                _start_task(task)
-                continue
+                    "No lifecycle decision to make for task '%s' of execution "
+                    "'%s' as it is not in a position to be scheduled: %s",
+                    task.id, execution.id, task_statuses[task.id])
 
         if started_tasks:
-            LOG.info(
+            LOG.debug(
                 "Started the following tasks for execution '%s': %s",
                 execution.id, started_tasks)
         else:
             # check for deadlock:
-            if self._check_clean_execution_deadlock(ctxt, execution) == (
-                    constants.EXECUTION_STATUS_DEADLOCKED):
+            if self._check_clean_execution_deadlock(
+                    ctxt, execution, task_statuses=task_statuses) == (
+                        constants.EXECUTION_STATUS_DEADLOCKED):
                 LOG.error(
                     "Execution '%s' deadlocked after Replica state advancement"
                     ". Cleanup has been perfomed. Returning early.",
@@ -1612,14 +1620,17 @@ class ConductorServerEndpoint(object):
             ctxt, execution, requery=True)
         if latest_execution_status != execution.status:
             LOG.info(
-                "Execution '%s' transitioned from status %s to %s",
-                execution.id, execution.status, latest_execution_status)
+                "Execution '%s' transitioned from status %s to %s "
+                "following the updated task statuses: %s",
+                execution.id, execution.status,
+                latest_execution_status, task_statuses)
             self._set_tasks_execution_status(
                 ctxt, execution.id, latest_execution_status)
         else:
             LOG.debug(
                 "Execution '%s' has remained in status '%s' following "
-                "state advancement.", execution.id, latest_execution_status)
+                "state advancement. task statuses are: %s",
+                execution.id, latest_execution_status, task_statuses)
 
         return started_tasks
 
@@ -1635,15 +1646,15 @@ class ConductorServerEndpoint(object):
         migration = db_api.get_migration(ctxt, migration_id)
         replica_id = migration.replica_id
 
-        with lockutils.lock(replica_id):
+        with lockutils.lock(
+                constants.REPLICA_LOCK_NAME_FORMAT % replica_id):
             LOG.debug(
                 "Updating volume_info in replica due to snapshot "
                 "restore during migration. replica id: %s", replica_id)
             self._update_replica_volumes_info(
                 ctxt, replica_id, instance, updated_task_info)
 
-    def _handle_post_task_actions(
-            self, ctxt, task, execution, task_info):
+    def _handle_post_task_actions(self, ctxt, task, execution, task_info):
         task_type = task.task_type
 
         if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
@@ -1738,7 +1749,10 @@ class ConductorServerEndpoint(object):
 
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         action_id = execution.action_id
-        with lockutils.lock(action_id):
+        action = db_api.get_action(ctxt, action_id)
+        with lockutils.lock(
+                constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
+                    execution.type] % action_id):
             updated_task_info = None
             if task_result:
                 LOG.info(
@@ -1821,7 +1835,9 @@ class ConductorServerEndpoint(object):
 
         action_id = execution.action_id
         action = db_api.get_action(ctxt, action_id)
-        with lockutils.lock(action_id):
+        with lockutils.lock(
+                constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
+                    execution.type] % action_id):
             if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
                     CONF.conductor.debug_os_morphing_errors):
                 LOG.debug(
@@ -1854,10 +1870,10 @@ class ConductorServerEndpoint(object):
             else:
                 self._cancel_tasks_execution(ctxt, execution)
 
-        # NOTE: if this is a migration, make sure to delete
-        # its associated reservation.
-        if action.type == constants.EXECUTION_TYPE_MIGRATION:
-            self._check_delete_reservation_for_transfer(action)
+            # NOTE: if this was a migration, make sure to delete
+            # its associated reservation.
+            if execution.type == constants.EXECUTION_TYPE_MIGRATION:
+                self._check_delete_reservation_for_transfer(action)
 
     @task_synchronized
     def task_event(self, ctxt, task_id, level, message):

+ 18 - 1
coriolis/constants.py

@@ -17,7 +17,9 @@ ACTIVE_EXECUTION_STATUSES = [
 FINALIZED_EXECUTION_STATUSES = [
     EXECUTION_STATUS_COMPLETED,
     EXECUTION_STATUS_CANCELED,
-    EXECUTION_STATUS_ERROR
+    EXECUTION_STATUS_ERROR,
+    EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
+    EXECUTION_STATUS_DEADLOCKED
 ]
 
 TASK_STATUS_SCHEDULED = "SCHEDULED"
@@ -176,3 +178,18 @@ EXECUTION_TYPE_REPLICA_DISKS_DELETE = "replica_disks_delete"
 EXECUTION_TYPE_REPLICA_DEPLOY = "replica_deploy"
 EXECUTION_TYPE_MIGRATION = "migration"
 EXECUTION_TYPE_REPLICA_UPDATE = "replica_update"
+
+TASK_LOCK_NAME_FORMAT = "task-%s"
+EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
+ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s"
+MIGRATION_LOCK_NAME_FORMAT = "migration-%s"
+REPLICA_LOCK_NAME_FORMAT = "replica-%s"
+SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
+
+EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = {
+    EXECUTION_TYPE_MIGRATION: MIGRATION_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_EXECUTION: REPLICA_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_DEPLOY: REPLICA_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_UPDATE: REPLICA_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_DISKS_DELETE: REPLICA_LOCK_NAME_FORMAT
+}

+ 3 - 1
coriolis/tasks/replica_tasks.py

@@ -243,7 +243,9 @@ class DeleteReplicaDisksTask(base.TaskRunner):
         if volumes_info:
             LOG.warn(
                 "'volumes_info' should have been void after disk "
-                "deletion: %s" % volumes_info)
+                "deletion task but it is: %s" % volumes_info)
+        elif volumes_info is None:
+            volumes_info = []
 
         return {
             'volumes_info': volumes_info}