Explorar o código

Allow for dep. scheduling on on_error tasks.

This patch allows for the definition of dependencies between on_error
tasks to ensure cleanup actions are executed in the correct order.
Nashwan Azhari %!s(int64=6) %!d(string=hai) anos
pai
achega
cea401deef
Modificáronse 3 ficheiros con 536 adicións e 127 borrados
  1. 490 127
      coriolis/conductor/rpc/server.py
  2. 41 0
      coriolis/constants.py
  3. 5 0
      coriolis/exception.py

+ 490 - 127
coriolis/conductor/rpc/server.py

@@ -3,6 +3,7 @@
 
 
 import copy
 import copy
 import functools
 import functools
+import itertools
 import uuid
 import uuid
 
 
 from oslo_concurrency import lockutils
 from oslo_concurrency import lockutils
@@ -38,11 +39,23 @@ conductor_opts = [
 CONF = cfg.CONF
 CONF = cfg.CONF
 CONF.register_opts(conductor_opts, 'conductor')
 CONF.register_opts(conductor_opts, 'conductor')
 
 
+TASK_LOCK_NAME_FORMAT = "task-%s"
+EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
+ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s"
+MIGRATION_LOCK_NAME_FORMAT = "migration-%s"
+REPLICA_LOCK_NAME_FORMAT = "replica-%s"
+SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
+
+TASK_DEADLOCK_ERROR_MESSAGE = (
+    "A fatal deadlock has occurred. Further debugging is required. "
+    "Please review the Conductor logs and contact support for assistance.")
+
 
 
 def endpoint_synchronized(func):
 def endpoint_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
-        @lockutils.synchronized(endpoint_id)
+        @lockutils.synchronized(
+                ENDPOINT_LOCK_NAME_FORMAT % endpoint_id)
         def inner():
         def inner():
             return func(self, ctxt, endpoint_id, *args, **kwargs)
             return func(self, ctxt, endpoint_id, *args, **kwargs)
         return inner()
         return inner()
@@ -52,7 +65,8 @@ def endpoint_synchronized(func):
 def replica_synchronized(func):
 def replica_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, *args, **kwargs):
     def wrapper(self, ctxt, replica_id, *args, **kwargs):
-        @lockutils.synchronized(replica_id)
+        @lockutils.synchronized(
+                REPLICA_LOCK_NAME_FORMAT % replica_id)
         def inner():
         def inner():
             return func(self, ctxt, replica_id, *args, **kwargs)
             return func(self, ctxt, replica_id, *args, **kwargs)
         return inner()
         return inner()
@@ -62,7 +76,8 @@ def replica_synchronized(func):
 def schedule_synchronized(func):
 def schedule_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
     def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
-        @lockutils.synchronized(schedule_id)
+        @lockutils.synchronized(
+                SCHEDULE_LOCK_NAME_FORMAT % schedule_id)
         def inner():
         def inner():
             return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
             return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
         return inner()
         return inner()
@@ -72,7 +87,22 @@ def schedule_synchronized(func):
 def task_synchronized(func):
 def task_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, task_id, *args, **kwargs):
     def wrapper(self, ctxt, task_id, *args, **kwargs):
-        @lockutils.synchronized(task_id)
+        @lockutils.synchronized(
+                TASK_LOCK_NAME_FORMAT % task_id)
+        def inner():
+            return func(self, ctxt, task_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
+def task_and_execution_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, task_id, *args, **kwargs):
+        task = db_api.get_task(ctxt, task_id)
+        @lockutils.synchronized(
+                EXECUTION_LOCK_NAME_FORMAT % task.execution_id)
+        @lockutils.synchronized(
+                TASK_LOCK_NAME_FORMAT % task_id)
         def inner():
         def inner():
             return func(self, ctxt, task_id, *args, **kwargs)
             return func(self, ctxt, task_id, *args, **kwargs)
         return inner()
         return inner()
@@ -82,7 +112,8 @@ def task_synchronized(func):
 def migration_synchronized(func):
 def migration_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, migration_id, *args, **kwargs):
     def wrapper(self, ctxt, migration_id, *args, **kwargs):
-        @lockutils.synchronized(migration_id)
+        @lockutils.synchronized(
+                MIGRATION_LOCK_NAME_FORMAT % migration_id)
         def inner():
         def inner():
             return func(self, ctxt, migration_id, *args, **kwargs)
             return func(self, ctxt, migration_id, *args, **kwargs)
         return inner()
         return inner()
@@ -92,7 +123,8 @@ def migration_synchronized(func):
 def tasks_execution_synchronized(func):
 def tasks_execution_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
     def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
-        @lockutils.synchronized(execution_id)
+        @lockutils.synchronized(
+                EXECUTION_LOCK_NAME_FORMAT % execution_id)
         def inner():
         def inner():
             return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
             return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
         return inner()
         return inner()
@@ -274,7 +306,7 @@ class ConductorServerEndpoint(object):
 
 
     @staticmethod
     @staticmethod
     def _create_task(instance, task_type, execution, depends_on=None,
     def _create_task(instance, task_type, execution, depends_on=None,
-                     on_error=False):
+                     on_error=False, on_error_only=False):
         task = models.Task()
         task = models.Task()
         task.id = str(uuid.uuid4())
         task.id = str(uuid.uuid4())
         task.instance = instance
         task.instance = instance
@@ -284,11 +316,18 @@ class ConductorServerEndpoint(object):
         task.on_error = on_error
         task.on_error = on_error
         task.index = len(task.execution.tasks) + 1
         task.index = len(task.execution.tasks) + 1
 
 
-        if not on_error:
+        # non-error tasks are automatically set to pending:
+        if not on_error and not on_error_only:
             task.status = constants.TASK_STATUS_PENDING
             task.status = constants.TASK_STATUS_PENDING
         else:
         else:
             task.status = constants.TASK_STATUS_ON_ERROR_ONLY
             task.status = constants.TASK_STATUS_ON_ERROR_ONLY
-            if depends_on:
+            # on-error-only tasks remain marked as such regardless
+            # of dependencies:
+            if on_error_only:
+                task.on_error = True
+            # tasks which are on-error but depend on already-defined
+            # pending tasks count as pending:
+            elif depends_on:
                 for task_id in depends_on:
                 for task_id in depends_on:
                     if [t for t in task.execution.tasks if t.id == task_id and
                     if [t for t in task.execution.tasks if t.id == task_id and
                             t.status != constants.TASK_STATUS_ON_ERROR_ONLY]:
                             t.status != constants.TASK_STATUS_ON_ERROR_ONLY]:
@@ -323,6 +362,8 @@ class ConductorServerEndpoint(object):
         for task in execution.tasks:
         for task in execution.tasks:
             if (not task.depends_on and
             if (not task.depends_on and
                     task.status == constants.TASK_STATUS_PENDING):
                     task.status == constants.TASK_STATUS_PENDING):
+                db_api.set_task_status(
+                    ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
                 self._rpc_worker_client.begin_task(
                 self._rpc_worker_client.begin_task(
                     ctxt, server=None,
                     ctxt, server=None,
                     task_id=task.id,
                     task_id=task.id,
@@ -427,9 +468,11 @@ class ConductorServerEndpoint(object):
     def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
     def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
         execution = self._get_replica_tasks_execution(
         execution = self._get_replica_tasks_execution(
             ctxt, replica_id, execution_id)
             ctxt, replica_id, execution_id)
-        if execution.status == constants.EXECUTION_STATUS_RUNNING:
+        if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
             raise exception.InvalidMigrationState(
             raise exception.InvalidMigrationState(
-                "Cannot delete a running replica tasks execution")
+                "Cannot delete execution '%s' for Replica '%s' as it is "
+                "currently in '%s' state." % (
+                    execution_id, replica_id, execution.status))
         db_api.delete_replica_tasks_execution(ctxt, execution_id)
         db_api.delete_replica_tasks_execution(ctxt, execution_id)
 
 
     @tasks_execution_synchronized
     @tasks_execution_synchronized
@@ -437,9 +480,15 @@ class ConductorServerEndpoint(object):
                                        force):
                                        force):
         execution = self._get_replica_tasks_execution(
         execution = self._get_replica_tasks_execution(
             ctxt, replica_id, execution_id)
             ctxt, replica_id, execution_id)
-        if execution.status != constants.EXECUTION_STATUS_RUNNING:
+        if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
+            raise exception.InvalidReplicaState(
+                "Replica '%s' has no running execution." % replica_id)
+        if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
+                not force):
             raise exception.InvalidReplicaState(
             raise exception.InvalidReplicaState(
-                "The replica tasks execution is not running")
+                "Replica '%s' is already being cancelled. Please use the "
+                "force option if you'd like to force-cancel it." % (
+                    replica_id))
         self._cancel_tasks_execution(ctxt, execution, force)
         self._cancel_tasks_execution(ctxt, execution, force)
 
 
     def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
     def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
@@ -694,7 +743,7 @@ class ConductorServerEndpoint(object):
             cleanup_deployment_task = self._create_task(
             cleanup_deployment_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
                 constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
-                execution, on_error=True)
+                execution, on_error_only=True)
 
 
             if not clone_disks:
             if not clone_disks:
                 self._create_task(
                 self._create_task(
@@ -878,19 +927,23 @@ class ConductorServerEndpoint(object):
                     execution, depends_on=[last_task.id])
                     execution, depends_on=[last_task.id])
                 last_task = get_optimal_flavor_task
                 last_task = get_optimal_flavor_task
 
 
-            self._create_task(
+            finalize_deployment_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT,
                 constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT,
                 execution, depends_on=[last_task.id])
                 execution, depends_on=[last_task.id])
 
 
-            self._create_task(
+            cleanup_failed_deployment_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT,
                 constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT,
-                execution, on_error=True)
+                execution, depends_on=[finalize_deployment_task.id],
+                on_error_only=True)
 
 
             self._create_task(
             self._create_task(
                 instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
                 instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
-                execution, on_error=True)
+                execution, depends_on=[
+                    create_instance_disks_task.id,
+                    cleanup_failed_deployment_task.id],
+                on_error_only=True)
 
 
         db_api.add_migration(ctxt, migration)
         db_api.add_migration(ctxt, migration)
         LOG.info("Migration created: %s", migration.id)
         LOG.info("Migration created: %s", migration.id)
@@ -909,118 +962,422 @@ class ConductorServerEndpoint(object):
     def delete_migration(self, ctxt, migration_id):
     def delete_migration(self, ctxt, migration_id):
         migration = self._get_migration(ctxt, migration_id)
         migration = self._get_migration(ctxt, migration_id)
         execution = migration.executions[0]
         execution = migration.executions[0]
-        if execution.status == constants.EXECUTION_STATUS_RUNNING:
+        if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
             raise exception.InvalidMigrationState(
             raise exception.InvalidMigrationState(
-                "Cannot delete a running migration")
+                "Cannot delete Migration '%s' as it is currently in "
+                "'%s' state." % (migration_id, execution.status))
         db_api.delete_migration(ctxt, migration_id)
         db_api.delete_migration(ctxt, migration_id)
 
 
     @migration_synchronized
     @migration_synchronized
     def cancel_migration(self, ctxt, migration_id, force):
     def cancel_migration(self, ctxt, migration_id, force):
         migration = self._get_migration(ctxt, migration_id)
         migration = self._get_migration(ctxt, migration_id)
         execution = migration.executions[0]
         execution = migration.executions[0]
-        if execution.status != constants.EXECUTION_STATUS_RUNNING:
+        if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
             raise exception.InvalidMigrationState(
             raise exception.InvalidMigrationState(
-                "The migration is not running")
-        execution = migration.executions[0]
+                "Migration '%s' is not running" % migration_id)
+        if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
+                not force):
+            raise exception.InvalidMigrationState(
+                "Migration '%s' is already being cancelled. Please use the "
+                "force option if you'd like to force-cancel it.")
         self._cancel_tasks_execution(ctxt, execution, force)
         self._cancel_tasks_execution(ctxt, execution, force)
         self._check_delete_reservation_for_transfer(migration)
         self._check_delete_reservation_for_transfer(migration)
 
 
     def _cancel_tasks_execution(self, ctxt, execution, force=False):
     def _cancel_tasks_execution(self, ctxt, execution, force=False):
-        has_error_tasks = False
-        has_running_tasks = False
+        """ Cancels a running Execution by:
+        - telling workers to kill any already running non-on-error tasks
+        - cancelling any non-on-error tasks which are pending
+        - triggering any on-error-only tasks with no dependencies
+
+        NOTE: affects whole execution, only call this
+        with a lock on the Execution as a whole!
+        """
+        LOG.debug(
+            "Cancelling tasks execution %s. Current status before "
+            "cancellation is '%s'", execution.id, execution.status)
+
+        # mark execution as cancelling:
+        self._set_tasks_execution_status(
+            ctxt, execution.id, constants.EXECUTION_STATUS_CANCELLING)
+        # iterate through and kill/cancel any non-error
+        # tasks which are running/pending:
         for task in execution.tasks:
         for task in execution.tasks:
-            if task.on_error and task.status in (
-                    constants.TASK_STATUS_RUNNING,
-                    constants.TASK_STATUS_PENDING):
-                # NOTE: always allow on_error tasks to execute
-                # as they may be required for cleanup:
-                has_error_tasks = True
+            if force and task.status == constants.TASK_STATUS_CANCELLING:
+                LOG.warn(
+                    "Task '%s' is in %s state, but forcibly setting to "
+                    "'%s' because 'force' flag was provided",
+                    task.id, task.status,
+                    constants.TASK_STATUS_FORCE_CANCELED)
+                db_api.set_task_status(
+                    ctxt, task.id, constants.TASK_STATUS_FORCE_CANCELED)
                 continue
                 continue
 
 
-            if task.status == constants.TASK_STATUS_RUNNING:
-                self._rpc_worker_client.cancel_task(
-                    ctxt, task.host, task.id, task.process_id, force)
-                has_running_tasks = True
-            elif task.status == constants.TASK_STATUS_PENDING:
+            if not task.on_error:
+                if task.status in (
+                        constants.TASK_STATUS_RUNNING,
+                        constants.TASK_STATUS_SCHEDULED):
+                    LOG.debug(
+                        "Killing %s task '%s' as part of "
+                        "cancellation of execution '%s'",
+                        task.status, task.id, execution.id)
+                    db_api.set_task_status(
+                        ctxt, task.id, constants.TASK_STATUS_CANCELLING)
+                    self._rpc_worker_client.cancel_task(
+                        ctxt, task.host, task.id, task.process_id, force)
+                # cancel any non-on-error tasks which are aleady pending:
+                elif task.status == constants.TASK_STATUS_PENDING:
+                    LOG.debug(
+                        "Marking pending task '%s' as cancelled as "
+                        "part of cancellation of execution '%s'",
+                        task.id, execution.id)
+                    db_api.set_task_status(
+                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
+            # NOTE: ideally, all on-error-only tasks should have a
+            # clear-cut dependency task so this inefficient code
+            # should never be called:
+            elif task.on_error and (
+                    task.status == constants.TASK_STATUS_ON_ERROR_ONLY and not (
+                        task.depends_on)):
+                try:
+                    origin = self._get_task_origin(ctxt, execution.action)
+                    destination = self._get_task_destination(
+                        ctxt, execution.action)
+                except exception.NotFound as ex:
+                    LOG.error("A required endpoint could not be found")
+                    LOG.exception(ex)
+
+                action = db_api.get_action(
+                    ctxt, execution.action_id)
                 db_api.set_task_status(
                 db_api.set_task_status(
-                    ctxt, task.id, constants.TASK_STATUS_CANCELED)
+                    ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
+                self._rpc_worker_client.begin_task(
+                    ctxt, server=None,
+                    task_id=task.id,
+                    task_type=task.task_type,
+                    origin=origin,
+                    destination=destination,
+                    instance=task.instance,
+                    task_info=action.info.get(task.instance, {}))
+            else:
+                LOG.debug(
+                    "No action currently taken with respect to task '%s' "
+                    "during cancellation of execution '%s'",
+                    task.id, execution.id)
 
 
-        if not has_running_tasks:
-            try:
-                origin = self._get_task_origin(ctxt, execution.action)
-                destination = self._get_task_destination(
-                    ctxt, execution.action)
-
-                for task in execution.tasks:
-                    if task.status in [constants.TASK_STATUS_PENDING,
-                                       constants.TASK_STATUS_ON_ERROR_ONLY]:
-                        if task.on_error:
-                            action = db_api.get_action(
-                                ctxt, execution.action_id)
-                            task_info = action.info.get(task.instance, {})
-
-                            self._rpc_worker_client.begin_task(
-                                ctxt, server=None,
-                                task_id=task.id,
-                                task_type=task.task_type,
-                                origin=origin,
-                                destination=destination,
-                                instance=task.instance,
-                                task_info=task_info)
-
-                            has_running_tasks = True
-            except exception.NotFound as ex:
-                LOG.error("A required endpoint could not be found")
-                LOG.exception(ex)
-
-        # NOTE: only set the whole execution to 'ERROR' if nothing's
-        # running and no on_error=True tasks are pending.
-        # Otherwise, the lifecycle of the rest of the execution will
-        # be governed in `task_completed` when any of the
-        # running/pending tasks finish:
-        if not has_running_tasks and not has_error_tasks:
-            self._set_tasks_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+        self._advance_execution_state(ctxt, execution)
 
 
     @staticmethod
     @staticmethod
     def _set_tasks_execution_status(ctxt, execution_id, execution_status):
     def _set_tasks_execution_status(ctxt, execution_id, execution_status):
-        LOG.info("Tasks execution %(id)s completed with status: %(status)s",
-                 {"id": execution_id, "status": execution_status})
+        LOG.info(
+            "Tasks execution %(id)s status updated to: %(status)s",
+            {"id": execution_id, "status": execution_status})
         db_api.set_execution_status(ctxt, execution_id, execution_status)
         db_api.set_execution_status(ctxt, execution_id, execution_status)
         if ctxt.delete_trust_id:
         if ctxt.delete_trust_id:
             keystone.delete_trust(ctxt)
             keystone.delete_trust(ctxt)
 
 
     @task_synchronized
     @task_synchronized
     def set_task_host(self, ctxt, task_id, host, process_id):
     def set_task_host(self, ctxt, task_id, host, process_id):
+        """ Saves the ID of the worker host which has accepted and started
+        the task to the DB and marks the task as 'RUNNING'. """
+        task = db_api.get_task(ctxt, task_id)
+        if task.status != constants.TASK_STATUS_SCHEDULED:
+            raise exception.InvalidTaskState(
+                "Task with ID '%s' is in '%s' status instead of the "
+                "expected '%s' required for it to be executed." % (
+                    task_id, task.status, constants.TASK_STATUS_SCHEDULED))
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_status(
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
 
 
-    def _start_pending_tasks(self, ctxt, execution, parent_task, task_info):
+    def _check_clean_execution_deadlock(self, ctxt, execution):
+        """ Checks whether an execution is deadlocked.
+        Deadlocked execution have no currently running/scheduled tasks
+        but some remaining pending tasks.
+        If this occurs, all pending tasks are marked as deadlocked,
+        and the execution is marked as ERROR'd.
+        Returns the state of the when the check occured
+        (either RUNNING or DEADLOCKED)
+        """
+        execution = db_api.get_tasks_execution(ctxt, execution.id)
+        task_statuses = {}
+        for task in execution.tasks:
+            dbtask = db_api.get_task(ctxt, task.id)
+            task_statuses[dbtask.id] = dbtask.status
+
+        determined_state = constants.EXECUTION_STATUS_RUNNING
+        status_vals = task_statuses.values()
+        if constants.TASK_STATUS_PENDING in status_vals and (
+                constants.TASK_STATUS_RUNNING not in status_vals or (
+                    constants.TASK_STATUS_SCHEDULED not in status_vals)):
+            LOG.warn(
+                "Execution '%s' is deadlocked. Task statuses are: %s",
+                execution.id, task_statuses)
+            for task_id, stat in task_statuses.items():
+                if stat == constants.TASK_STATUS_PENDING:
+                    LOG.warn("Marking deadlocked task '%s' as that ", task_id)
+                    db_api.set_task_status(
+                        ctxt, task_id,
+                        constants.TASK_STATUS_CANCELED_FROM_DEADLOCK,
+                        exception_details=TASK_DEADLOCK_ERROR_MESSAGE)
+            LOG.warn(
+                "Marking deadlocked execution '%s' as ERROR'd", execution.id)
+            self._set_tasks_execution_status(
+                ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+            LOG.error(
+                "Execution '%s' is deadlocked. Cleanup has been performed. "
+                "Task statuses at time of deadlock were: %s",
+                execution.id, task_statuses)
+            determined_state = constants.EXECUTION_STATUS_DEADLOCKED
+        return determined_state
+
+    def _get_execution_status(self, ctxt, execution, requery=False):
+        """ Returns the global status of an execution.
+        RUNNING - at least one task is RUNNING, SCHEDULED or CANCELLING
+        COMPLETED - all non-error-only tasks are COMPLETED
+        CANCELED - no more RUNNING or PENDING tasks but some CANCELED
+        ERROR - no tasks are RUNNING and at least one is ERROR'd
+        DEADLOCKED - has PENDING tasks but none RUNNING/SCHEDULED/CANCELLING
+        """
+        is_running = False
+        is_canceled = False
+        is_cancelling = False
+        is_errord = False
+        has_pending_tasks = False
+        task_stat_map = {}
+        if requery:
+            execution = db_api.get_tasks_execution(ctxt, execution.id)
+        for task in execution.tasks:
+            task_stat_map[task.id] = task.status
+            if task.status in constants.ACTIVE_TASK_STATUSES:
+                is_running = True
+            if task.status in constants.CANCELED_TASK_STATUSES:
+                is_canceled = True
+            if task.status == constants.TASK_STATUS_ERROR:
+                is_errord = True
+            if task.status == constants.TASK_STATUS_CANCELLING:
+                is_cancelling = True
+            if task.status == constants.TASK_STATUS_PENDING:
+                has_pending_tasks = True
+
+        status = constants.EXECUTION_STATUS_COMPLETED
+        if has_pending_tasks and not is_running:
+            status = constants.EXECUTION_STATUS_DEADLOCKED
+        elif is_cancelling:
+            status = constants.EXECUTION_STATUS_CANCELLING
+        elif is_running:
+            status = constants.EXECUTION_STATUS_RUNNING
+        elif is_errord:
+            status = constants.EXECUTION_STATUS_ERROR
+        # NOTE: canceled executions should never have ERROR'd tasks
+        # (they should also be CANCELED) so this comes last:
+        elif is_canceled:
+            status = constants.EXECUTION_STATUS_CANCELED
+
+        LOG.debug(
+            "Overall status for Execution '%s' determined to be '%s'."
+            "Task statuses at time of decision: %s",
+            execution.id, status, task_stat_map)
+        return status
+
+    def _advance_execution_state(self, ctxt, execution):
+        """ Advances the state of the execution by starting/refreshing
+        the state of all child tasks.
+        If the task has finalized (either completed or error'd), updates
+        its state to the finalized one.
+        Returns a list of all the tasks which were started.
+        NOTE: should only be called with a lock on the Execution!
+
+        Requirements for a tasks to be started:
+        - any task where any parent task got CANCELED will also be CANCELED
+        - normal tasks (task.on_error==False & task.status==PENDING):
+            * instantly started if they have no parent dependencies
+            * all parent dependency tasks must be COMPLETED
+        - on-error tasks (task.on_error==True):
+            * all parent tasks must be COMPLETED or ERROR'd
+        - on-error-only tasks (task.status==ON_ERROR_ONLY):
+            * at least one non-error parent tasks must have been COMPLETED
+            * all non-error parent tasks must have reached a terminal state
+            * all on-error parent tasks must have reached a terminal state
+        """
+        LOG.debug(
+            "State of execution '%s' before state advancement is: %s",
+            execution.id, execution.status)
+
+        started_tasks = []
+
         origin = self._get_task_origin(ctxt, execution.action)
         origin = self._get_task_origin(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
+        action = db_api.get_action(ctxt, execution.action_id)
+
+        def _start_task(task):
+            db_api.set_task_status(
+                ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
+            self._rpc_worker_client.begin_task(
+                ctxt, server=None,
+                task_id=task.id,
+                task_type=task.task_type,
+                origin=origin,
+                destination=destination,
+                instance=task.instance,
+                task_info=action.info.get(task.instance, {}))
+            started_tasks.append(task.id)
 
 
         for task in execution.tasks:
         for task in execution.tasks:
             if task.status == constants.TASK_STATUS_PENDING:
             if task.status == constants.TASK_STATUS_PENDING:
-                if task.depends_on and parent_task.id in task.depends_on:
-                    start_task = True
-                    for depend_task_id in task.depends_on:
-                        if depend_task_id != parent_task.id:
-                            depend_task = db_api.get_task(ctxt, depend_task_id)
-                            if (depend_task.status !=
-                                    constants.TASK_STATUS_COMPLETED):
-                                start_task = False
-                                break
-                    if start_task:
-                        server = None
-                        self._rpc_worker_client.begin_task(
-                            ctxt, server=server,
-                            task_id=task.id,
-                            task_type=task.task_type,
-                            origin=origin,
-                            destination=destination,
-                            instance=task.instance,
-                            task_info=task_info)
+                # immediately start pending tasks with no deps:
+                if not task.depends_on:
+                    LOG.info(
+                        "Starting depency-less task '%s'", task.id)
+                    _start_task(task)
+                    continue
+
+                # gather statuses of dependent tasks:
+                parent_task_statuses = {}
+                if task.depends_on:
+                    for dep_task_id in task.depends_on:
+                        depend_task = db_api.get_task(ctxt, dep_task_id)
+                        parent_task_statuses[dep_task_id] = depend_task.status
+
+                # immediately cancel any task whose parent task(s)
+                # got canceled (including on_error=True tasks):
+                if any([
+                        dep_stat in constants.CANCELED_TASK_STATUSES or (
+                            dep_stat == constants.TASK_STATUS_CANCELLING)
+                        for dep_stat in parent_task_statuses.values()]):
+                    LOG.info(
+                        "Marking task '%s' as cancelled since it has one or "
+                        "more parent tasks which got canceled: %s",
+                        task.id, parent_task_statuses)
+                    db_api.set_task_status(
+                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
+                    continue
+
+                if not task.on_error:
+                    # start all non-error tasks whose parent tasks have completed:
+                    if all([
+                            dep_stat == constants.TASK_STATUS_COMPLETED
+                            for dep_stat in parent_task_statuses.values()]):
+                        LOG.info(
+                            "Starting task '%s' as all dependencies have "
+                            "completed: %s", task.id, parent_task_statuses)
+                        _start_task(task)
+                    else:
+                        LOG.debug(
+                            "Skipping starting task '%s' as some parent "
+                            "tasks have not completed: %s",
+                            task.id, parent_task_statuses)
+                    continue
+
+                elif task.on_error:
+                    # start all on-error tasks whose parents have
+                    # either completed or error'd:
+                    if all([
+                            dep_stat in (
+                                constants.TASK_STATUS_COMPLETED,
+                                constants.TASK_STATUS_ERROR)
+                            for dep_stat in parent_task_statuses.values()]):
+                        LOG.info(
+                            "Starting task '%s' as all dependencies have "
+                            "completed: %s", task.id, parent_task_statuses)
+                        _start_task(task)
+                    else:
+                        LOG.debug(
+                            "Skipping starting on-error task '%s' as some "
+                            "parent tasks have not completed: %s",
+                            task.id, parent_task_statuses)
+                    continue
+                LOG.debug(
+                    "No lifecycle decision was taken for pending task '%s'. "
+                    "Current status: %s. Parent tasks: %s",
+                    task.id, task.status, parent_task_statuses)
+
+            elif task.status == constants.TASK_STATUS_ON_ERROR_ONLY:
+                if not task.depends_on:
+                    LOG.warn(
+                        "Encountered on-error-only task '%s' with no "
+                        "dependencies. These types of tasks should ideally "
+                        "always be declared with dependencies.", task.id)
+                    # skipping as it was `_cancel_task_execution`'s job
+                    # to trigger such tasks.
+                    continue
+
+                error_deps = {}
+                non_error_deps = {}
+                # gather statuses for non-error and on-error parents:
+                for dep_task_id in task.depends_on:
+                    depend_task = db_api.get_task(ctxt, dep_task_id)
+                    if depend_task.on_error:
+                        error_deps[depend_task.id] = depend_task.status
+                    else:
+                        non_error_deps[depend_task.id] = depend_task.status
+
+                # do not trigger on-error-only tasks if not all non-error
+                # parent tasks have reached a final state:
+                if not all([
+                        dep_stat in constants.FINALIZED_TASK_STATUSES
+                        for dep_stat in non_error_deps.values()]):
+                    LOG.debug(
+                        "Not triggering on-error-only task '%s' as some "
+                        "of its parent non-error tasks have not reached a "
+                        "terminal status: %s", task.id, non_error_deps)
+                    continue
+
+                # cancel any on-error-only task if no non-on-error parent
+                # tasks completed successfully:
+                if constants.TASK_STATUS_COMPLETED not in (
+                        non_error_deps.values()):
+                    LOG.info(
+                        "Marking on-error-only task '%s' as canceled as all of "
+                        "its parent non-error tasks have finalized but none "
+                        "has completed successfully: %s",
+                        task.id, non_error_deps)
+                    db_api.set_task_status(
+                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
+                    continue
+
+                # do not trigger on-error-only tasks unless all on-error
+                # parent tasks have reached a final state:
+                if not all([
+                        dep_stat in constants.FINALIZED_TASK_STATUSES
+                        for dep_stat in error_deps.values()]):
+                    LOG.debug(
+                        "Not triggering on-error-only task '%s' as it "
+                        "has parent on-error tasks which didn't "
+                        "complete/error: %s", task.id, error_deps)
+                    continue
+
+                LOG.debug("Starting on-error-only task '%s'", task.id)
+                _start_task(task)
+                continue
+
+        if started_tasks:
+            LOG.info(
+                "Started the following tasks for execution '%s': %s",
+                execution.id, started_tasks)
+        else:
+            # check for deadlock:
+            if self._check_clean_execution_deadlock(ctxt, execution) == (
+                    constants.EXECUTION_STATUS_DEADLOCKED):
+                LOG.error(
+                    "Execution '%s' deadlocked after Replica state advancement"
+                    ". Cleanup has been perfomed. Returning early.")
+                return []
+
+        # check if execution status has changed:
+        latest_execution_status = self._get_execution_status(
+            ctxt, execution, requery=True)
+        if latest_execution_status != execution.status:
+            LOG.info(
+                "Execution '%s' transitioned from status %s to %s",
+                execution.id, execution.status, latest_execution_status)
+            self._set_tasks_execution_status(
+                ctxt, execution.id, latest_execution_status)
+        else:
+            LOG.debug(
+                "Execution '%s' has remained in status '%s' following "
+                "state advancement.", execution.id, latest_execution_status)
+
+        return started_tasks
 
 
     def _update_replica_volumes_info(self, ctxt, replica_id, instance,
     def _update_replica_volumes_info(self, ctxt, replica_id, instance,
                                      updated_task_info):
                                      updated_task_info):
@@ -1105,7 +1462,7 @@ class ConductorServerEndpoint(object):
                 ctxt, execution.action_id, task.instance,
                 ctxt, execution.action_id, task.instance,
                 {"volumes_info": task_info.get("volumes_info")})
                 {"volumes_info": task_info.get("volumes_info")})
 
 
-    @task_synchronized
+    @task_and_execution_synchronized
     def task_completed(self, ctxt, task_id, task_info):
     def task_completed(self, ctxt, task_id, task_info):
         LOG.info("Task completed: %s", task_id)
         LOG.info("Task completed: %s", task_id)
 
 
@@ -1113,13 +1470,8 @@ class ConductorServerEndpoint(object):
             ctxt, task_id, constants.TASK_STATUS_COMPLETED)
             ctxt, task_id, constants.TASK_STATUS_COMPLETED)
 
 
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
-        execution = db_api.get_tasks_execution(ctxt, task.execution_id)
-
-        task_error_states = [
-            constants.TASK_STATUS_ERROR,
-            constants.TASK_STATUS_CANCELED,
-            constants.TASK_STATUS_CANCELED_FOR_DEBUGGING]
 
 
+        execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         action_id = execution.action_id
         action_id = execution.action_id
         with lockutils.lock(action_id):
         with lockutils.lock(action_id):
             LOG.info("Setting instance %(instance)s "
             LOG.info("Setting instance %(instance)s "
@@ -1131,23 +1483,18 @@ class ConductorServerEndpoint(object):
             self._handle_post_task_actions(
             self._handle_post_task_actions(
                 ctxt, task, execution, updated_task_info)
                 ctxt, task, execution, updated_task_info)
 
 
-            if execution.status == constants.EXECUTION_STATUS_RUNNING:
-                self._start_pending_tasks(
-                    ctxt, execution, task, updated_task_info)
-
-                if not [t for t in execution.tasks
-                        if t.status in [constants.TASK_STATUS_RUNNING,
-                                        constants.TASK_STATUS_PENDING]]:
-                    # The execution is in error status if there's one or more
-                    # tasks in error or canceled status
-                    if [t for t in execution.tasks
-                            if t.status in task_error_states]:
-                        execution_status = constants.EXECUTION_STATUS_ERROR
-                    else:
-                        execution_status = constants.EXECUTION_STATUS_COMPLETED
-
-                    self._set_tasks_execution_status(
-                        ctxt, execution.id, execution_status)
+            newly_started_tasks = self._advance_execution_state(
+                ctxt, execution)
+            if newly_started_tasks:
+                LOG.info(
+                    "The following tasks were started for execution '%s' "
+                    "following the completion of task '%s': %s" % (
+                        execution.id, task.id, newly_started_tasks))
+            else:
+                LOG.debug(
+                    "No new tasks were started for execution '%s' following "
+                    "the successful completion of task '%s'.",
+                    execution.id, task.id)
 
 
     def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
     def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
         # go through all scheduled tasks and cancel them:
         # go through all scheduled tasks and cancel them:
@@ -1156,9 +1503,9 @@ class ConductorServerEndpoint(object):
                 continue
                 continue
 
 
             if subtask.status == constants.TASK_STATUS_RUNNING:
             if subtask.status == constants.TASK_STATUS_RUNNING:
-                raise execution.CoriolisException(
-                    "Task %s is still running although it should not!",
-                    subtask.id)
+                raise exception.CoriolisException(
+                    "Task %s is still running although it should not!" % (
+                        subtask.id))
 
 
             if subtask.status in [
             if subtask.status in [
                     constants.TASK_STATUS_PENDING,
                     constants.TASK_STATUS_PENDING,
@@ -1167,13 +1514,28 @@ class ConductorServerEndpoint(object):
                     ctxt, subtask.id,
                     ctxt, subtask.id,
                     constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
                     constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
 
 
-    @task_synchronized
+    @task_and_execution_synchronized
     def set_task_error(self, ctxt, task_id, exception_details):
     def set_task_error(self, ctxt, task_id, exception_details):
         LOG.error("Task error: %(task_id)s - %(ex)s",
         LOG.error("Task error: %(task_id)s - %(ex)s",
                   {"task_id": task_id, "ex": exception_details})
                   {"task_id": task_id, "ex": exception_details})
 
 
+        task = db_api.get_task(ctxt, task_id)
+
+        final_status = constants.TASK_STATUS_ERROR
+        if task.status == constants.TASK_STATUS_CANCELLING:
+            final_status = constants.TASK_STATUS_CANCELED
+        elif task.status == constants.TASK_STATUS_FORCE_CANCELED:
+            # it means a force cancel has been issued before the
+            # confirmation that the task was canceled came in:
+            LOG.warn(
+                "Only just received error confirmation for force-cancelled "
+                "task '%s'. Leaving marked as force-cancelled.", task.id)
+            final_status = constants.TASK_STATUS_FORCE_CANCELED
+        LOG.debug(
+            "Transitioning errored task '%s' from '%s' to '%s'",
+            task.id, task.status, final_status)
         db_api.set_task_status(
         db_api.set_task_status(
-            ctxt, task_id, constants.TASK_STATUS_ERROR, exception_details)
+            ctxt, task_id, final_status, exception_details)
 
 
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
@@ -1183,8 +1545,9 @@ class ConductorServerEndpoint(object):
         with lockutils.lock(action_id):
         with lockutils.lock(action_id):
             if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
             if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
                     CONF.conductor.debug_os_morphing_errors):
                     CONF.conductor.debug_os_morphing_errors):
-                LOG.debug("Attempting to cancel execution '%s' for OSMorphing "
-                          "debugging.", execution.id)
+                LOG.debug(
+                    "Attempting to cancel execution '%s' of action '%s' "
+                    "for OSMorphing debugging.", execution.id, action_id)
                 # NOTE: the OSMorphing task always runs by itself so no
                 # NOTE: the OSMorphing task always runs by itself so no
                 # further tasks should be running, but we double-check here:
                 # further tasks should be running, but we double-check here:
                 running = [
                 running = [
@@ -1213,7 +1576,7 @@ class ConductorServerEndpoint(object):
 
 
         # NOTE: if this is a migration, make sure to delete
         # NOTE: if this is a migration, make sure to delete
         # its associated reservation.
         # its associated reservation.
-        if action.type == "migration":
+        if action.type == constants.EXECUTION_TYPE_MIGRATION:
             self._check_delete_reservation_for_transfer(action)
             self._check_delete_reservation_for_transfer(action)
 
 
     @task_synchronized
     @task_synchronized

+ 41 - 0
coriolis/constants.py

@@ -4,15 +4,56 @@
 EXECUTION_STATUS_RUNNING = "RUNNING"
 EXECUTION_STATUS_RUNNING = "RUNNING"
 EXECUTION_STATUS_COMPLETED = "COMPLETED"
 EXECUTION_STATUS_COMPLETED = "COMPLETED"
 EXECUTION_STATUS_ERROR = "ERROR"
 EXECUTION_STATUS_ERROR = "ERROR"
+EXECUTION_STATUS_DEADLOCKED = "DEADLOCKED"
+EXECUTION_STATUS_CANCELED = "CANCELED"
+EXECUTION_STATUS_CANCELLING = "CANCELLING"
+EXECUTION_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
+
+ACTIVE_EXECUTION_STATUSES = [
+    EXECUTION_STATUS_RUNNING,
+    EXECUTION_STATUS_CANCELLING
+]
+
+FINALIZED_EXECUTION_STATUSES = [
+    EXECUTION_STATUS_COMPLETED,
+    EXECUTION_STATUS_CANCELED,
+    EXECUTION_STATUS_ERROR
+]
 
 
 TASK_STATUS_PENDING = "PENDING"
 TASK_STATUS_PENDING = "PENDING"
+TASK_STATUS_SCHEDULED = "SCHEDULED"
 TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_ERROR = "ERROR"
+TASK_STATUS_FORCE_CANCELED = "FORCE_CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
+TASK_STATUS_CANCELLING = "CANCELLING"
 TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
 TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
+TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 
 
+ACTIVE_TASK_STATUSES = [
+    TASK_STATUS_SCHEDULED,
+    TASK_STATUS_RUNNING,
+    TASK_STATUS_CANCELLING
+]
+
+CANCELED_TASK_STATUSES = [
+    TASK_STATUS_CANCELED,
+    TASK_STATUS_FORCE_CANCELED,
+    TASK_STATUS_CANCELED_FOR_DEBUGGING,
+    TASK_STATUS_CANCELED_FROM_DEADLOCK
+]
+
+FINALIZED_TASK_STATUSES = [
+    TASK_STATUS_COMPLETED,
+    TASK_STATUS_ERROR,
+    TASK_STATUS_CANCELED,
+    TASK_STATUS_FORCE_CANCELED,
+    TASK_STATUS_CANCELED_FOR_DEBUGGING,
+    TASK_STATUS_CANCELED_FROM_DEADLOCK
+]
+
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
     "DEPLOY_MIGRATION_SOURCE_RESOURCES")
     "DEPLOY_MIGRATION_SOURCE_RESOURCES")
 TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES = (
 TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES = (

+ 5 - 0
coriolis/exception.py

@@ -180,6 +180,11 @@ class InvalidConfigurationValue(Invalid):
                 'configuration option "%(option)s"')
                 'configuration option "%(option)s"')
 
 
 
 
+class InvalidTaskState(Invalid):
+    message = _(
+        'Task "%(task_id)s" in in an invalid state: %(task_state)s')
+
+
 class InvalidActionTasksExecutionState(Invalid):
 class InvalidActionTasksExecutionState(Invalid):
     message = _("Invalid tasks execution state: %(reason)s")
     message = _("Invalid tasks execution state: %(reason)s")