Parcourir la source

Add SCHEDULED/UNSCHEDULED task statuses.

Add task statuses to indicate which tasks are scheduled and which were
unscheduled.
The old PENDING status is now used for tasks which are confirmed by the
conductor to be started, up to the point where a worker connfirms
picking it up (at which point it will transition to RUNNING.)
Nashwan Azhari il y a 6 ans
Parent
commit
a7a385b32b
2 fichiers modifiés avec 186 ajouts et 101 suppressions
  1. 181 99
      coriolis/conductor/rpc/server.py
  2. 5 2
      coriolis/constants.py

+ 181 - 99
coriolis/conductor/rpc/server.py

@@ -308,6 +308,13 @@ class ConductorServerEndpoint(object):
     @staticmethod
     @staticmethod
     def _create_task(instance, task_type, execution, depends_on=None,
     def _create_task(instance, task_type, execution, depends_on=None,
                      on_error=False, on_error_only=False):
                      on_error=False, on_error_only=False):
+        """ Creates a task with the given properties.
+
+        NOTE: for on_error and on_error_only tasks, the parent dependencies who
+        are the ones which require cleanup should also be included!
+        Ex: for the DELETE_OS_MORPHING_RESOURCES task, include both the
+        OSMorphing task, as well as the DEPLOY_OS_MORPHIN_RESOURCES one!
+        """
         task = models.Task()
         task = models.Task()
         task.id = str(uuid.uuid4())
         task.id = str(uuid.uuid4())
         task.instance = instance
         task.instance = instance
@@ -319,7 +326,7 @@ class ConductorServerEndpoint(object):
 
 
         # non-error tasks are automatically set to pending:
         # non-error tasks are automatically set to pending:
         if not on_error and not on_error_only:
         if not on_error and not on_error_only:
-            task.status = constants.TASK_STATUS_PENDING
+            task.status = constants.TASK_STATUS_SCHEDULED
         else:
         else:
             task.status = constants.TASK_STATUS_ON_ERROR_ONLY
             task.status = constants.TASK_STATUS_ON_ERROR_ONLY
             # on-error-only tasks remain marked as such regardless
             # on-error-only tasks remain marked as such regardless
@@ -332,7 +339,7 @@ class ConductorServerEndpoint(object):
                 for task_id in depends_on:
                 for task_id in depends_on:
                     if [t for t in task.execution.tasks if t.id == task_id and
                     if [t for t in task.execution.tasks if t.id == task_id and
                             t.status != constants.TASK_STATUS_ON_ERROR_ONLY]:
                             t.status != constants.TASK_STATUS_ON_ERROR_ONLY]:
-                        task.status = constants.TASK_STATUS_PENDING
+                        task.status = constants.TASK_STATUS_SCHEDULED
                         break
                         break
         return task
         return task
 
 
@@ -362,9 +369,9 @@ class ConductorServerEndpoint(object):
 
 
         for task in execution.tasks:
         for task in execution.tasks:
             if (not task.depends_on and
             if (not task.depends_on and
-                    task.status == constants.TASK_STATUS_PENDING):
+                    task.status == constants.TASK_STATUS_SCHEDULED):
                 db_api.set_task_status(
                 db_api.set_task_status(
-                    ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
+                    ctxt, task.id, constants.TASK_STATUS_PENDING)
                 self._rpc_worker_client.begin_task(
                 self._rpc_worker_client.begin_task(
                     ctxt, server=None,
                     ctxt, server=None,
                     task_id=task.id,
                     task_id=task.id,
@@ -413,7 +420,7 @@ class ConductorServerEndpoint(object):
                 # gather all tasks which will be queued to run in parallel:
                 # gather all tasks which will be queued to run in parallel:
                 for task in tasks_to_process.values():
                 for task in tasks_to_process.values():
                     if task.status in (
                     if task.status in (
-                            constants.TASK_STATUS_PENDING,
+                            constants.TASK_STATUS_SCHEDULED,
                             constants.TASK_STATUS_ON_ERROR_ONLY):
                             constants.TASK_STATUS_ON_ERROR_ONLY):
                         if not task.depends_on:
                         if not task.depends_on:
                             queued_tasks.append(task)
                             queued_tasks.append(task)
@@ -581,13 +588,17 @@ class ConductorServerEndpoint(object):
                 instance,
                 instance,
                 constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
                 constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
                 execution,
                 execution,
-                depends_on=[replicate_disks_task.id],
+                depends_on=[
+                    deploy_replica_source_resources_task.id,
+                    replicate_disks_task.id],
                 on_error=True)
                 on_error=True)
 
 
             self._create_task(
             self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
                 constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
-                execution, depends_on=[replicate_disks_task.id],
+                execution, depends_on=[
+                    deploy_replica_target_resources_task.id,
+                    replicate_disks_task.id],
                 on_error=True)
                 on_error=True)
 
 
         self._check_execution_tasks_sanity(
         self._check_execution_tasks_sanity(
@@ -839,26 +850,15 @@ class ConductorServerEndpoint(object):
         execution.type = constants.EXECUTION_TYPE_REPLICA_DEPLOY
         execution.type = constants.EXECUTION_TYPE_REPLICA_DEPLOY
 
 
         for instance in instances:
         for instance in instances:
-            validate_replica_desployment_inputs_task = self._create_task(
+            validate_replica_deployment_inputs_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
                 constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
                 execution)
                 execution)
 
 
-            create_snapshot_task_depends_on = [
-                validate_replica_desployment_inputs_task.id]
-
-            if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
-                    destination_provider_types):
-                get_optimal_flavor_task = self._create_task(
-                    instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
-                    execution, depends_on=[
-                        validate_replica_desployment_inputs_task.id])
-                create_snapshot_task_depends_on.append(
-                    get_optimal_flavor_task.id)
-
             create_snapshot_task = self._create_task(
             create_snapshot_task = self._create_task(
                 instance, constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS,
                 instance, constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS,
-                execution, depends_on=create_snapshot_task_depends_on)
+                execution, depends_on=[
+                    validate_replica_deployment_inputs_task.id])
 
 
             deploy_replica_task = self._create_task(
             deploy_replica_task = self._create_task(
                 instance, constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE,
                 instance, constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE,
@@ -876,13 +876,22 @@ class ConductorServerEndpoint(object):
 
 
                 task_delete_os_morphing_resources = self._create_task(
                 task_delete_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
                     instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[task_osmorphing.id],
+                    execution, depends_on=[
+                        task_deploy_os_morphing_resources.id,
+                        task_osmorphing.id],
                     on_error=True)
                     on_error=True)
 
 
                 next_task = task_delete_os_morphing_resources
                 next_task = task_delete_os_morphing_resources
             else:
             else:
                 next_task = deploy_replica_task
                 next_task = deploy_replica_task
 
 
+            if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
+                    destination_provider_types):
+                get_optimal_flavor_task = self._create_task(
+                    instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
+                    execution, depends_on=[next_task.id])
+                next_task = get_optimal_flavor_task
+
             finalize_deployment_task = self._create_task(
             finalize_deployment_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
                 constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
@@ -897,7 +906,11 @@ class ConductorServerEndpoint(object):
             cleanup_deployment_task = self._create_task(
             cleanup_deployment_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
                 constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
-                execution, on_error_only=True)
+                execution,
+                depends_on=[
+                    deploy_replica_task.id,
+                    finalize_deployment_task.id],
+                on_error_only=True)
 
 
             if not clone_disks:
             if not clone_disks:
                 self._create_task(
                 self._create_task(
@@ -1044,13 +1057,16 @@ class ConductorServerEndpoint(object):
             delete_source_resources_task = self._create_task(
             delete_source_resources_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
                 constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
-                execution, depends_on=[last_migration_task.id],
+                execution, depends_on=[
+                    deploy_migration_source_resources_task.id,
+                    last_migration_task.id],
                 on_error=True)
                 on_error=True)
 
 
             delete_destination_resources_task = self._create_task(
             delete_destination_resources_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
                 constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
                 execution, depends_on=[
                 execution, depends_on=[
+                    deploy_migration_target_resources_task.id,
                     last_migration_task.id],
                     last_migration_task.id],
                 on_error=True)
                 on_error=True)
 
 
@@ -1073,7 +1089,9 @@ class ConductorServerEndpoint(object):
 
 
                 task_delete_os_morphing_resources = self._create_task(
                 task_delete_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
                     instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[task_osmorphing.id],
+                    execution, depends_on=[
+                        task_deploy_os_morphing_resources.id,
+                        task_osmorphing.id],
                     on_error=True)
                     on_error=True)
 
 
                 last_task = task_delete_os_morphing_resources
                 last_task = task_delete_os_morphing_resources
@@ -1093,14 +1111,16 @@ class ConductorServerEndpoint(object):
             cleanup_failed_deployment_task = self._create_task(
             cleanup_failed_deployment_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT,
                 constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT,
-                execution, depends_on=[finalize_deployment_task.id],
+                execution, depends_on=[
+                    deploy_instance_task.id,
+                    finalize_deployment_task.id],
                 on_error_only=True)
                 on_error_only=True)
 
 
             cleanup_deps = [
             cleanup_deps = [
                 create_instance_disks_task.id,
                 create_instance_disks_task.id,
                 delete_destination_resources_task.id,
                 delete_destination_resources_task.id,
                 cleanup_failed_deployment_task.id]
                 cleanup_failed_deployment_task.id]
-            if not skip_os_morphing:
+            if task_delete_os_morphing_resources:
                 cleanup_deps.append(task_delete_os_morphing_resources.id)
                 cleanup_deps.append(task_delete_os_morphing_resources.id)
             self._create_task(
             self._create_task(
                 instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
                 instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
@@ -1135,10 +1155,14 @@ class ConductorServerEndpoint(object):
     @migration_synchronized
     @migration_synchronized
     def cancel_migration(self, ctxt, migration_id, force):
     def cancel_migration(self, ctxt, migration_id, force):
         migration = self._get_migration(ctxt, migration_id)
         migration = self._get_migration(ctxt, migration_id)
+        if len(migration.executions) != 1:
+            raise exception.InvalidMigrationState(
+                "Migration '%s' has in improper number of tasks "
+                "executions: %d" % (migration_id, len(migration.executions)))
         execution = migration.executions[0]
         execution = migration.executions[0]
         if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
         if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
             raise exception.InvalidMigrationState(
             raise exception.InvalidMigrationState(
-                "Migration '%s' is not running" % migration_id)
+                "Migration '%s' is not currently running" % migration_id)
         if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
         if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
                 not force):
                 not force):
             raise exception.InvalidMigrationState(
             raise exception.InvalidMigrationState(
@@ -1182,7 +1206,7 @@ class ConductorServerEndpoint(object):
             if not task.on_error:
             if not task.on_error:
                 if task.status in (
                 if task.status in (
                         constants.TASK_STATUS_RUNNING,
                         constants.TASK_STATUS_RUNNING,
-                        constants.TASK_STATUS_SCHEDULED):
+                        constants.TASK_STATUS_PENDING):
                     LOG.debug(
                     LOG.debug(
                         "Killing %s task '%s' as part of "
                         "Killing %s task '%s' as part of "
                         "cancellation of execution '%s'",
                         "cancellation of execution '%s'",
@@ -1191,14 +1215,16 @@ class ConductorServerEndpoint(object):
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
                     self._rpc_worker_client.cancel_task(
                     self._rpc_worker_client.cancel_task(
                         ctxt, task.host, task.id, task.process_id, force)
                         ctxt, task.host, task.id, task.process_id, force)
-                # cancel any non-on-error tasks which are aleady pending:
-                elif task.status == constants.TASK_STATUS_PENDING:
+                # cancel any non-on-error tasks which have been scheduled:
+                elif task.status == constants.TASK_STATUS_SCHEDULED:
                     LOG.debug(
                     LOG.debug(
-                        "Marking pending task '%s' as cancelled as "
+                        "Marking scheduled task '%s' as unscheduled as "
                         "part of cancellation of execution '%s'",
                         "part of cancellation of execution '%s'",
                         task.id, execution.id)
                         task.id, execution.id)
                     db_api.set_task_status(
                     db_api.set_task_status(
-                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
+                        ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
+                        exception_details=(
+                            "Unscheduled as part of cancellation procedure."))
             # NOTE: ideally, all on-error-only tasks should have a
             # NOTE: ideally, all on-error-only tasks should have a
             # clear-cut dependency task so this inefficient code
             # clear-cut dependency task so this inefficient code
             # should never be called:
             # should never be called:
@@ -1216,7 +1242,7 @@ class ConductorServerEndpoint(object):
                 action = db_api.get_action(
                 action = db_api.get_action(
                     ctxt, execution.action_id)
                     ctxt, execution.action_id)
                 db_api.set_task_status(
                 db_api.set_task_status(
-                    ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
+                    ctxt, task.id, constants.TASK_STATUS_PENDING)
                 self._rpc_worker_client.begin_task(
                 self._rpc_worker_client.begin_task(
                     ctxt, server=None,
                     ctxt, server=None,
                     task_id=task.id,
                     task_id=task.id,
@@ -1228,8 +1254,8 @@ class ConductorServerEndpoint(object):
             else:
             else:
                 LOG.debug(
                 LOG.debug(
                     "No action currently taken with respect to task '%s' "
                     "No action currently taken with respect to task '%s' "
-                    "during cancellation of execution '%s'",
-                    task.id, execution.id)
+                    "(status '%s') during cancellation of execution '%s'",
+                    task.id, task.status, execution.id)
 
 
         self._advance_execution_state(ctxt, execution)
         self._advance_execution_state(ctxt, execution)
 
 
@@ -1249,21 +1275,21 @@ class ConductorServerEndpoint(object):
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
         if task.status == constants.TASK_STATUS_CANCELLING:
         if task.status == constants.TASK_STATUS_CANCELLING:
             raise exception.TaskIsCancelling(task_id=task_id)
             raise exception.TaskIsCancelling(task_id=task_id)
-        elif task.status != constants.TASK_STATUS_SCHEDULED:
+        elif task.status != constants.TASK_STATUS_PENDING:
             raise exception.InvalidTaskState(
             raise exception.InvalidTaskState(
                 "Task with ID '%s' is in '%s' status instead of the "
                 "Task with ID '%s' is in '%s' status instead of the "
                 "expected '%s' required for it to have a task host set." % (
                 "expected '%s' required for it to have a task host set." % (
-                    task_id, task.status, constants.TASK_STATUS_SCHEDULED))
+                    task_id, task.status, constants.TASK_STATUS_PENDING))
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_status(
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
 
 
     def _check_clean_execution_deadlock(self, ctxt, execution):
     def _check_clean_execution_deadlock(self, ctxt, execution):
         """ Checks whether an execution is deadlocked.
         """ Checks whether an execution is deadlocked.
-        Deadlocked execution have no currently running/scheduled tasks
-        but some remaining pending tasks.
-        If this occurs, all pending tasks are marked as deadlocked,
-        and the execution is marked as ERROR'd.
+        Deadlocked executions have no currently running/pending tasks
+        but some remaining scheduled tasks.
+        If this occurs, all pending/error-only tasks are marked
+        as DEADLOCKED, and the execution is marked as such too.
         Returns the state of the when the check occured
         Returns the state of the when the check occured
         (either RUNNING or DEADLOCKED)
         (either RUNNING or DEADLOCKED)
         """
         """
@@ -1275,23 +1301,25 @@ class ConductorServerEndpoint(object):
 
 
         determined_state = constants.EXECUTION_STATUS_RUNNING
         determined_state = constants.EXECUTION_STATUS_RUNNING
         status_vals = task_statuses.values()
         status_vals = task_statuses.values()
-        if constants.TASK_STATUS_PENDING in status_vals and (
-                constants.TASK_STATUS_RUNNING not in status_vals and (
-                    constants.TASK_STATUS_SCHEDULED not in status_vals)):
+        if constants.TASK_STATUS_SCHEDULED in status_vals and not (
+                any([stat in status_vals
+                     for stat in constants.ACTIVE_TASK_STATUSES])):
             LOG.warn(
             LOG.warn(
                 "Execution '%s' is deadlocked. Task statuses are: %s",
                 "Execution '%s' is deadlocked. Task statuses are: %s",
                 execution.id, task_statuses)
                 execution.id, task_statuses)
             for task_id, stat in task_statuses.items():
             for task_id, stat in task_statuses.items():
-                if stat == constants.TASK_STATUS_PENDING:
+                if stat in (
+                        constants.TASK_STATUS_SCHEDULED,
+                        constants.TASK_STATUS_ON_ERROR_ONLY):
                     LOG.warn("Marking deadlocked task '%s' as that ", task_id)
                     LOG.warn("Marking deadlocked task '%s' as that ", task_id)
                     db_api.set_task_status(
                     db_api.set_task_status(
                         ctxt, task_id,
                         ctxt, task_id,
                         constants.TASK_STATUS_CANCELED_FROM_DEADLOCK,
                         constants.TASK_STATUS_CANCELED_FROM_DEADLOCK,
                         exception_details=TASK_DEADLOCK_ERROR_MESSAGE)
                         exception_details=TASK_DEADLOCK_ERROR_MESSAGE)
             LOG.warn(
             LOG.warn(
-                "Marking deadlocked execution '%s' as ERROR'd", execution.id)
+                "Marking deadlocked execution '%s' as DEADLOCKED'd", execution.id)
             self._set_tasks_execution_status(
             self._set_tasks_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+                ctxt, execution.id, constants.EXECUTION_STATUS_DEADLOCKED)
             LOG.error(
             LOG.error(
                 "Execution '%s' is deadlocked. Cleanup has been performed. "
                 "Execution '%s' is deadlocked. Cleanup has been performed. "
                 "Task statuses at time of deadlock were: %s",
                 "Task statuses at time of deadlock were: %s",
@@ -1301,17 +1329,17 @@ class ConductorServerEndpoint(object):
 
 
     def _get_execution_status(self, ctxt, execution, requery=False):
     def _get_execution_status(self, ctxt, execution, requery=False):
         """ Returns the global status of an execution.
         """ Returns the global status of an execution.
-        RUNNING - at least one task is RUNNING, SCHEDULED or CANCELLING
+        RUNNING - at least one task is RUNNING, PENDING or CANCELLING
         COMPLETED - all non-error-only tasks are COMPLETED
         COMPLETED - all non-error-only tasks are COMPLETED
-        CANCELED - no more RUNNING or PENDING tasks but some CANCELED
+        CANCELED - no more RUNNING/PENDING/SCHEDULED tasks but some CANCELED
         ERROR - no tasks are RUNNING and at least one is ERROR'd
         ERROR - no tasks are RUNNING and at least one is ERROR'd
-        DEADLOCKED - has PENDING tasks but none RUNNING/SCHEDULED/CANCELLING
+        DEADLOCKED - has SCHEDULED tasks but none RUNNING/PENDING/CANCELLING
         """
         """
         is_running = False
         is_running = False
         is_canceled = False
         is_canceled = False
         is_cancelling = False
         is_cancelling = False
         is_errord = False
         is_errord = False
-        has_pending_tasks = False
+        has_scheduled_tasks = False
         task_stat_map = {}
         task_stat_map = {}
         if requery:
         if requery:
             execution = db_api.get_tasks_execution(ctxt, execution.id)
             execution = db_api.get_tasks_execution(ctxt, execution.id)
@@ -1325,11 +1353,11 @@ class ConductorServerEndpoint(object):
                 is_errord = True
                 is_errord = True
             if task.status == constants.TASK_STATUS_CANCELLING:
             if task.status == constants.TASK_STATUS_CANCELLING:
                 is_cancelling = True
                 is_cancelling = True
-            if task.status == constants.TASK_STATUS_PENDING:
-                has_pending_tasks = True
+            if task.status == constants.TASK_STATUS_SCHEDULED:
+                has_scheduled_tasks = True
 
 
         status = constants.EXECUTION_STATUS_COMPLETED
         status = constants.EXECUTION_STATUS_COMPLETED
-        if has_pending_tasks and not is_running:
+        if has_scheduled_tasks and not is_running:
             status = constants.EXECUTION_STATUS_DEADLOCKED
             status = constants.EXECUTION_STATUS_DEADLOCKED
         elif is_cancelling:
         elif is_cancelling:
             status = constants.EXECUTION_STATUS_CANCELLING
             status = constants.EXECUTION_STATUS_CANCELLING
@@ -1357,12 +1385,13 @@ class ConductorServerEndpoint(object):
         NOTE: should only be called with a lock on the Execution!
         NOTE: should only be called with a lock on the Execution!
 
 
         Requirements for a tasks to be started:
         Requirements for a tasks to be started:
-        - any task where any parent task got CANCELED will also be CANCELED
-        - normal tasks (task.on_error==False & task.status==PENDING):
+        - any non-error task where any parent task got UNSCHEDULED
+          will also be UNSCHEDULED
+        - normal tasks (task.on_error==False & task.status==SCHEDULED):
             * instantly started if they have no parent dependencies
             * instantly started if they have no parent dependencies
             * all parent dependency tasks must be COMPLETED
             * all parent dependency tasks must be COMPLETED
-        - on-error tasks (task.on_error==True):
-            * all parent tasks must be COMPLETED or ERROR'd
+        - on-error tasks (task.on_error==True & task.status==SCHEDULED):
+            * all parent tasks must have reached a terminal state
         - on-error-only tasks (task.status==ON_ERROR_ONLY):
         - on-error-only tasks (task.status==ON_ERROR_ONLY):
             * at least one non-error parent tasks must have been COMPLETED
             * at least one non-error parent tasks must have been COMPLETED
             * all non-error parent tasks must have reached a terminal state
             * all non-error parent tasks must have reached a terminal state
@@ -1380,7 +1409,7 @@ class ConductorServerEndpoint(object):
 
 
         def _start_task(task):
         def _start_task(task):
             db_api.set_task_status(
             db_api.set_task_status(
-                ctxt, task.id, constants.TASK_STATUS_SCHEDULED)
+                ctxt, task.id, constants.TASK_STATUS_PENDING)
             self._rpc_worker_client.begin_task(
             self._rpc_worker_client.begin_task(
                 ctxt, server=None,
                 ctxt, server=None,
                 task_id=task.id,
                 task_id=task.id,
@@ -1392,7 +1421,8 @@ class ConductorServerEndpoint(object):
             started_tasks.append(task.id)
             started_tasks.append(task.id)
 
 
         for task in execution.tasks:
         for task in execution.tasks:
-            if task.status == constants.TASK_STATUS_PENDING:
+
+            if task.status == constants.TASK_STATUS_SCHEDULED:
                 # immediately start pending tasks with no deps:
                 # immediately start pending tasks with no deps:
                 if not task.depends_on:
                 if not task.depends_on:
                     LOG.info(
                     LOG.info(
@@ -1407,56 +1437,92 @@ class ConductorServerEndpoint(object):
                         depend_task = db_api.get_task(ctxt, dep_task_id)
                         depend_task = db_api.get_task(ctxt, dep_task_id)
                         parent_task_statuses[dep_task_id] = depend_task.status
                         parent_task_statuses[dep_task_id] = depend_task.status
 
 
-                # immediately cancel any task whose parent task(s)
-                # got canceled (including on_error=True tasks):
-                if any([
-                        dep_stat in constants.CANCELED_TASK_STATUSES or (
-                            dep_stat == constants.TASK_STATUS_CANCELLING)
+                # immediately unschedule tasks (on-error or otherwise)
+                # if all of their parent tasks got un-scheduled:
+                if all([
+                        dep_stat == constants.TASK_STATUS_UNSCHEDULED
                         for dep_stat in parent_task_statuses.values()]):
                         for dep_stat in parent_task_statuses.values()]):
                     LOG.info(
                     LOG.info(
-                        "Marking task '%s' as cancelled since it has one or "
-                        "more parent tasks which got canceled: %s",
+                        "Unscheduling task '%s' as all parent "
+                        "tasks got unscheduled: %s",
                         task.id, parent_task_statuses)
                         task.id, parent_task_statuses)
                     db_api.set_task_status(
                     db_api.set_task_status(
-                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
+                        ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
+                        exception_details=(
+                            "Unscheduled due to the unscheduling of all "
+                            "parent tasks."))
                     continue
                     continue
 
 
                 if not task.on_error:
                 if not task.on_error:
-                    # start all non-error tasks whose parent tasks have completed:
+
+                    # check if all parent tasks have reached a terminal state:
                     if all([
                     if all([
-                            dep_stat == constants.TASK_STATUS_COMPLETED
+                            dep_stat in constants.FINALIZED_TASK_STATUSES
                             for dep_stat in parent_task_statuses.values()]):
                             for dep_stat in parent_task_statuses.values()]):
-                        LOG.info(
-                            "Starting task '%s' as all dependencies have "
-                            "completed: %s", task.id, parent_task_statuses)
-                        _start_task(task)
+                        # if all parents completed, start the task:
+                        if all([
+                                dep_stat == constants.TASK_STATUS_COMPLETED
+                                for dep_stat in (
+                                    parent_task_statuses.values())]):
+                            LOG.info(
+                                "Starting task '%s' as all dependencies have "
+                                "completed: %s", task.id, parent_task_statuses)
+                            _start_task(task)
+                        # if even one parent did not complete, unschedule:
+                        else:
+                            LOG.info(
+                                "Unscheduling task '%s' as it has one or more "
+                                "parent tasks which got canceled/unscheduled/"
+                                "errord: %s",
+                                task.id, parent_task_statuses)
+                            db_api.set_task_status(
+                                ctxt, task.id,
+                                constants.TASK_STATUS_UNSCHEDULED,
+                                exception_details=(
+                                    "Unscheduled due to the unsuccessful "
+                                    "execution of one or more parent tasks."))
                     else:
                     else:
+                        # still has active deps, wait for next hook:
                         LOG.debug(
                         LOG.debug(
                             "Skipping starting task '%s' as some parent "
                             "Skipping starting task '%s' as some parent "
-                            "tasks have not completed: %s",
+                            "tasks have not yet finalized: %s",
                             task.id, parent_task_statuses)
                             task.id, parent_task_statuses)
                     continue
                     continue
 
 
                 elif task.on_error:
                 elif task.on_error:
-                    # start all on-error tasks whose parents have
-                    # either completed or error'd:
+
+                    # check if all parents have reached a finalized state:
                     if all([
                     if all([
-                            dep_stat in (
-                                constants.TASK_STATUS_COMPLETED,
-                                constants.TASK_STATUS_ERROR)
+                            dep_stat in constants.FINALIZED_TASK_STATUSES
                             for dep_stat in parent_task_statuses.values()]):
                             for dep_stat in parent_task_statuses.values()]):
+                        # unschedule the task if no parent has completed:
+                        if constants.TASK_STATUS_COMPLETED not in (
+                                parent_task_statuses.values()):
+                            LOG.info(
+                                "Unscheduling plain on-error task '%s' as "
+                                "all of its dependencies have finalized "
+                                "but no parent had executed successfully: %s",
+                                task.id, parent_task_statuses)
+                            db_api.set_task_status(
+                                ctxt, task.id,
+                                constants.TASK_STATUS_UNSCHEDULED,
+                                exception_details=(
+                                    "Unscheduled due to no parent task "
+                                    "having executed successfully."))
+                            continue
                         LOG.info(
                         LOG.info(
-                            "Starting task '%s' as all dependencies have "
-                            "completed: %s", task.id, parent_task_statuses)
+                            "Starting plain on-error task '%s' as all dependencies "
+                            "have been finalized: %s", task.id, parent_task_statuses)
                         _start_task(task)
                         _start_task(task)
                     else:
                     else:
                         LOG.debug(
                         LOG.debug(
                             "Skipping starting on-error task '%s' as some "
                             "Skipping starting on-error task '%s' as some "
-                            "parent tasks have not completed: %s",
+                            "parent tasks have not yet finalized: %s",
                             task.id, parent_task_statuses)
                             task.id, parent_task_statuses)
                     continue
                     continue
+
                 LOG.debug(
                 LOG.debug(
-                    "No lifecycle decision was taken for pending task '%s'. "
+                    "No lifecycle decision was taken for SCHEDULED task '%s'. "
                     "Current status: %s. Parent tasks: %s",
                     "Current status: %s. Parent tasks: %s",
                     task.id, task.status, parent_task_statuses)
                     task.id, task.status, parent_task_statuses)
 
 
@@ -1491,17 +1557,21 @@ class ConductorServerEndpoint(object):
                         "terminal status: %s", task.id, non_error_deps)
                         "terminal status: %s", task.id, non_error_deps)
                     continue
                     continue
 
 
-                # cancel any on-error-only task if no non-on-error parent
+                # unschedule any on-error-only task if no non-on-error parent
                 # tasks completed successfully:
                 # tasks completed successfully:
                 if constants.TASK_STATUS_COMPLETED not in (
                 if constants.TASK_STATUS_COMPLETED not in (
                         non_error_deps.values()):
                         non_error_deps.values()):
                     LOG.info(
                     LOG.info(
-                        "Marking on-error-only task '%s' as canceled as all of "
-                        "its parent non-error tasks have finalized but none "
-                        "has completed successfully: %s",
+                        "Marking on-error-only task '%s' as unscheduled as all"
+                        " of its parent non-error tasks have finalized but "
+                        "none has completed successfully: %s",
                         task.id, non_error_deps)
                         task.id, non_error_deps)
                     db_api.set_task_status(
                     db_api.set_task_status(
-                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
+                        ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
+                        exception_details=(
+                            "Unscheduled because all parent non-error tasks "
+                            "have finalized but none have completed "
+                            "successfully."))
                     continue
                     continue
 
 
                 # do not trigger on-error-only tasks unless all on-error
                 # do not trigger on-error-only tasks unless all on-error
@@ -1511,11 +1581,15 @@ class ConductorServerEndpoint(object):
                         for dep_stat in error_deps.values()]):
                         for dep_stat in error_deps.values()]):
                     LOG.debug(
                     LOG.debug(
                         "Not triggering on-error-only task '%s' as it "
                         "Not triggering on-error-only task '%s' as it "
-                        "has parent on-error tasks which didn't "
-                        "complete/error: %s", task.id, error_deps)
+                        "has parent on-error tasks which have not yet "
+                        "reached a terminal state: %s", task.id, error_deps)
                     continue
                     continue
 
 
-                LOG.debug("Starting on-error-only task '%s'", task.id)
+                LOG.debug(
+                    "Starting on-error-only task '%s'. Dependency status: %s",
+                    task.id, {
+                        "on-error-dependencies": error_deps,
+                        "non-error-dependencies": non_error_deps})
                 _start_task(task)
                 _start_task(task)
                 continue
                 continue
 
 
@@ -1529,7 +1603,8 @@ class ConductorServerEndpoint(object):
                     constants.EXECUTION_STATUS_DEADLOCKED):
                     constants.EXECUTION_STATUS_DEADLOCKED):
                 LOG.error(
                 LOG.error(
                     "Execution '%s' deadlocked after Replica state advancement"
                     "Execution '%s' deadlocked after Replica state advancement"
-                    ". Cleanup has been perfomed. Returning early.")
+                    ". Cleanup has been perfomed. Returning early.",
+                    execution.id)
                 return []
                 return []
 
 
         # check if execution status has changed:
         # check if execution status has changed:
@@ -1650,6 +1725,13 @@ class ConductorServerEndpoint(object):
     def task_completed(self, ctxt, task_id, task_result):
     def task_completed(self, ctxt, task_id, task_result):
         LOG.info("Task completed: %s", task_id)
         LOG.info("Task completed: %s", task_id)
 
 
+        task = db_api.get_task(ctxt, task_id)
+        if task.status != constants.TASK_STATUS_RUNNING:
+            LOG.warn(
+                "Task '%s' was in '%s' state instead of the expected "
+                "RUNNING state. Marking as COMPLETED anyway.",
+                task_id, task.status)
+
         db_api.set_task_status(
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_COMPLETED)
             ctxt, task_id, constants.TASK_STATUS_COMPLETED)
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
@@ -1699,13 +1781,13 @@ class ConductorServerEndpoint(object):
             if subtask.task_type == constants.TASK_TYPE_OS_MORPHING:
             if subtask.task_type == constants.TASK_TYPE_OS_MORPHING:
                 continue
                 continue
 
 
-            if subtask.status == constants.TASK_STATUS_RUNNING:
+            if subtask.status in constants.ACTIVE_TASK_STATUSES:
                 raise exception.CoriolisException(
                 raise exception.CoriolisException(
-                    "Task %s is still running although it should not!" % (
-                        subtask.id))
+                    "Task %s is still in an active state (%s) although "
+                    "it should not!" % (subtask.id, subtask.status))
 
 
             if subtask.status in [
             if subtask.status in [
-                    constants.TASK_STATUS_PENDING,
+                    constants.TASK_STATUS_SCHEDULED,
                     constants.TASK_STATUS_ON_ERROR_ONLY]:
                     constants.TASK_STATUS_ON_ERROR_ONLY]:
                 db_api.set_task_status(
                 db_api.set_task_status(
                     ctxt, subtask.id,
                     ctxt, subtask.id,
@@ -1749,7 +1831,7 @@ class ConductorServerEndpoint(object):
                 # further tasks should be running, but we double-check here:
                 # further tasks should be running, but we double-check here:
                 running = [
                 running = [
                     t for t in execution.tasks
                     t for t in execution.tasks
-                    if t.status == constants.TASK_STATUS_RUNNING
+                    if t.status in constants.ACTIVE_TASK_STATUSES
                     and t.task_type != constants.TASK_TYPE_OS_MORPHING]
                     and t.task_type != constants.TASK_TYPE_OS_MORPHING]
                 if not running:
                 if not running:
                     self._cancel_execution_for_osmorphing_debugging(
                     self._cancel_execution_for_osmorphing_debugging(

+ 5 - 2
coriolis/constants.py

@@ -20,8 +20,9 @@ FINALIZED_EXECUTION_STATUSES = [
     EXECUTION_STATUS_ERROR
     EXECUTION_STATUS_ERROR
 ]
 ]
 
 
-TASK_STATUS_PENDING = "PENDING"
 TASK_STATUS_SCHEDULED = "SCHEDULED"
 TASK_STATUS_SCHEDULED = "SCHEDULED"
+TASK_STATUS_PENDING = "PENDING"
+TASK_STATUS_UNSCHEDULED = "UNSCHEDULED"
 TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_ERROR = "ERROR"
@@ -33,13 +34,14 @@ TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 
 
 ACTIVE_TASK_STATUSES = [
 ACTIVE_TASK_STATUSES = [
-    TASK_STATUS_SCHEDULED,
+    TASK_STATUS_PENDING,
     TASK_STATUS_RUNNING,
     TASK_STATUS_RUNNING,
     TASK_STATUS_CANCELLING
     TASK_STATUS_CANCELLING
 ]
 ]
 
 
 CANCELED_TASK_STATUSES = [
 CANCELED_TASK_STATUSES = [
     TASK_STATUS_CANCELED,
     TASK_STATUS_CANCELED,
+    TASK_STATUS_UNSCHEDULED,
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FROM_DEADLOCK
     TASK_STATUS_CANCELED_FROM_DEADLOCK
@@ -48,6 +50,7 @@ CANCELED_TASK_STATUSES = [
 FINALIZED_TASK_STATUSES = [
 FINALIZED_TASK_STATUSES = [
     TASK_STATUS_COMPLETED,
     TASK_STATUS_COMPLETED,
     TASK_STATUS_ERROR,
     TASK_STATUS_ERROR,
+    TASK_STATUS_UNSCHEDULED,
     TASK_STATUS_CANCELED,
     TASK_STATUS_CANCELED,
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,