Просмотр исходного кода

Fix task state conflict issues.

This PR makes tasks strictly return the values which were updated
instead of directly manipulating the global task_info in order to avoid
any state conflicts and allow for the parallelization of tasks.

This is done by:
    - defining clear required and returned parameters for task classes
    - initializing the task info on Replica/Migration execution creation
    - validating that the declared tasks of an execution which are run
    in parallel do not manipulate the same shared fields in the task_info
Nashwan Azhari 6 лет назад
Родитель
Сommit
fac30e7c7c

+ 4 - 4
coriolis/conductor/rpc/client.py

@@ -236,9 +236,9 @@ class ConductorClient(object):
             ctxt, 'set_task_host', task_id=task_id, host=host,
             process_id=process_id)
 
-    def task_completed(self, ctxt, task_id, task_info):
+    def task_completed(self, ctxt, task_id, task_result):
         self._client.call(
-            ctxt, 'task_completed', task_id=task_id, task_info=task_info)
+            ctxt, 'task_completed', task_id=task_id, task_result=task_result)
 
     def set_task_error(self, ctxt, task_id, exception_details):
         self._client.call(ctxt, 'set_task_error', task_id=task_id,
@@ -292,11 +292,11 @@ class ConductorClient(object):
             schedule_id=schedule_id,
             expired=expired)
 
-    def update_replica(self, ctxt, replica_id, properties):
+    def update_replica(self, ctxt, replica_id, updated_properties):
         return self._client.call(
             ctxt, 'update_replica',
             replica_id=replica_id,
-            properties=properties)
+            updated_properties=updated_properties)
 
     def get_diagnostics(self, ctxt):
         return self._client.call(ctxt, 'get_diagnostics')

+ 316 - 99
coriolis/conductor/rpc/server.py

@@ -19,6 +19,7 @@ from coriolis import keystone
 from coriolis.licensing import client as licensing_client
 from coriolis.replica_cron.rpc import client as rpc_cron_client
 from coriolis import schemas
+from coriolis.tasks import factory as tasks_factory
 from coriolis import utils
 from coriolis.worker.rpc import client as rpc_worker_client
 
@@ -28,7 +29,7 @@ VERSION = "1.0"
 LOG = logging.getLogger(__name__)
 
 
-conductor_opts = [
+CONDUCTOR_OPTS = [
     cfg.BoolOpt("debug_os_morphing_errors",
                 default=False,
                 help="If set, any OSMorphing task which errors out will have "
@@ -37,7 +38,7 @@ conductor_opts = [
 ]
 
 CONF = cfg.CONF
-CONF.register_opts(conductor_opts, 'conductor')
+CONF.register_opts(CONDUCTOR_OPTS, 'conductor')
 
 TASK_LOCK_NAME_FORMAT = "task-%s"
 EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
@@ -55,7 +56,7 @@ def endpoint_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
         @lockutils.synchronized(
-                ENDPOINT_LOCK_NAME_FORMAT % endpoint_id)
+            ENDPOINT_LOCK_NAME_FORMAT % endpoint_id)
         def inner():
             return func(self, ctxt, endpoint_id, *args, **kwargs)
         return inner()
@@ -66,7 +67,7 @@ def replica_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, *args, **kwargs):
         @lockutils.synchronized(
-                REPLICA_LOCK_NAME_FORMAT % replica_id)
+            REPLICA_LOCK_NAME_FORMAT % replica_id)
         def inner():
             return func(self, ctxt, replica_id, *args, **kwargs)
         return inner()
@@ -100,9 +101,9 @@ def task_and_execution_synchronized(func):
     def wrapper(self, ctxt, task_id, *args, **kwargs):
         task = db_api.get_task(ctxt, task_id)
         @lockutils.synchronized(
-                EXECUTION_LOCK_NAME_FORMAT % task.execution_id)
+            EXECUTION_LOCK_NAME_FORMAT % task.execution_id)
         @lockutils.synchronized(
-                TASK_LOCK_NAME_FORMAT % task_id)
+            TASK_LOCK_NAME_FORMAT % task_id)
         def inner():
             return func(self, ctxt, task_id, *args, **kwargs)
         return inner()
@@ -113,7 +114,7 @@ def migration_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, migration_id, *args, **kwargs):
         @lockutils.synchronized(
-                MIGRATION_LOCK_NAME_FORMAT % migration_id)
+            MIGRATION_LOCK_NAME_FORMAT % migration_id)
         def inner():
             return func(self, ctxt, migration_id, *args, **kwargs)
         return inner()
@@ -124,7 +125,7 @@ def tasks_execution_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
         @lockutils.synchronized(
-                EXECUTION_LOCK_NAME_FORMAT % execution_id)
+            EXECUTION_LOCK_NAME_FORMAT % execution_id)
         def inner():
             return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
         return inner()
@@ -373,6 +374,139 @@ class ConductorServerEndpoint(object):
                     instance=task.instance,
                     task_info=task_info.get(task.instance, {}))
 
+    def _check_execution_tasks_sanity(
+            self, execution, initial_task_info):
+        """ Checks whether the given execution's tasks are:
+        - properly odered and not set to deadlock off the bat
+        - properly manipulate the task_info in the right order
+        """
+        all_instances_in_tasks = {
+            t.instance for t in execution.tasks}
+        instances_tasks_mapping = {
+            instance: [
+                t for t in execution.tasks if t.instance == instance]
+            for instance in all_instances_in_tasks}
+
+        def _check_task_cls_param_requirements(task, instance_task_info_keys):
+            task_cls = tasks_factory.get_task_runner_class(task.task_type)()
+            missing_params = [
+                p for p in task_cls.required_task_info_properties
+                if p not in instance_task_info_keys]
+            if missing_params:
+                raise exception.CoriolisException(
+                    "The following task parameters for instance '%s' "
+                    "are missing from the task_info for task '%s' of "
+                    "type '%s': %s" % (
+                        task.instance, task.id, task.task_type,
+                        missing_params))
+            return task_cls.returned_task_info_properties
+
+        for instance, instance_tasks in instances_tasks_mapping.items():
+            task_info_keys = set(initial_task_info.get(
+                instance, {}).keys())
+            # mapping between the ID and associated object of processed tasks:
+            processed_tasks = {}
+            tasks_to_process = {
+                t.id: t for t in instance_tasks}
+            while tasks_to_process:
+                queued_tasks = []
+                # gather all tasks which will be queued to run in parallel:
+                for task in tasks_to_process.values():
+                    if task.status in (
+                            constants.TASK_STATUS_PENDING,
+                            constants.TASK_STATUS_ON_ERROR_ONLY):
+                        if not task.depends_on:
+                            queued_tasks.append(task)
+                        else:
+                            missing_deps = [
+                                dep_id
+                                for dep_id in task.depends_on
+                                if dep_id not in tasks_to_process and (
+                                    dep_id not in processed_tasks)]
+                            if missing_deps:
+                                raise exception.CoriolisException(
+                                    "Task '%s' (type '%s') for instance '%s' "
+                                    "has non-exitent tasks referenced as "
+                                    "dependencies: %s" % (
+                                        task.id, task.task_type,
+                                        instance, missing_deps))
+                            if all(
+                                    [dep_id in processed_tasks
+                                     for dep_id in task.depends_on]):
+                                queued_tasks.append(task)
+                    else:
+                        raise exception.CoriolisException(
+                            "Invalid initial state '%s' for task '%s' "
+                            "of type '%s'."% (
+                                task.status, task.id, task.task_type))
+
+                # check if nothing was left queued:
+                if not queued_tasks:
+                    remaining_tasks_deps_map = {
+                        (tid, t.task_type): t.depends_on
+                        for tid, t in tasks_to_process.items()}
+                    processed_tasks_type_map = {
+                        tid: t.task_type
+                        for tid, t in processed_tasks.items()}
+                    raise exception.CoriolisException(
+                        "Execution '%s' (type '%s') is bound to be deadlocked:"
+                        " there are leftover tasks for instance '%s 'which "
+                        "will never get queued. Already processed tasks are: "
+                        "%s. Tasks left: %s" % (
+                            execution.id, execution.type, instance,
+                            processed_tasks_type_map, remaining_tasks_deps_map))
+
+                # mapping for task_info fields modified by each task:
+                modified_fields_by_queued_tasks = {}
+                # check that each task has what it needs and
+                # register what they return/modify:
+                for task in queued_tasks:
+                    for new_field in _check_task_cls_param_requirements(
+                            task, task_info_keys):
+                        if new_field not in modified_fields_by_queued_tasks:
+                            modified_fields_by_queued_tasks[new_field] = [task]
+                        else:
+                            modified_fields_by_queued_tasks[new_field].append(
+                                task)
+
+                # check if any queued tasks would manipulate the same fields:
+                conflicting_fields = {
+                    new_field: [t.task_type for t in tasks]
+                    for new_field, tasks in (
+                        modified_fields_by_queued_tasks.items())
+                    if len(tasks) > 1}
+                if conflicting_fields:
+                    raise exception.CoriolisException(
+                        "There are fields which will encounter a state "
+                        "conflict following the parallelized execution of "
+                        "tasks for execution '%s' (type '%s') for instance "
+                        "'%s'. Conflicting fields and tasks will be: : %s" % (
+                            execution.id, execution.type, instance,
+                            conflicting_fields))
+
+                # register queued tasks as processed before continuing:
+                for task in queued_tasks:
+                    processed_tasks[task.id] = task
+                    tasks_to_process.pop(task.id)
+                # update current state fields at this point:
+                task_info_keys = task_info_keys.union(set(
+                    modified_fields_by_queued_tasks.keys()))
+                LOG.debug(
+                    "Successfully processed following tasks for instance '%s' "
+                    "for execution %s (type '%s') for any state conflict "
+                    "checks: %s",
+                    instance, execution.id, execution.type, [
+                        (t.id, t.task_type) for t in queued_tasks])
+            LOG.debug(
+                "Successfully checked all tasks for instance '%s' as part of "
+                "execution '%s' (type '%s') for any state conflicts: %s",
+                instance, execution.id, execution.type,
+                [(t.id, t.task_type) for t in instance_tasks])
+        LOG.debug(
+            "Successfully checked all tasks for execution '%s' (type '%s') "
+            "for ordering or state conflicts.",
+            execution.id, execution.type)
+
     @replica_synchronized
     def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
         replica = self._get_replica(ctxt, replica_id)
@@ -385,6 +519,18 @@ class ConductorServerEndpoint(object):
         execution.type = constants.EXECUTION_TYPE_REPLICA_EXECUTION
 
         for instance in execution.action.instances:
+            # NOTE: we update all of the param values before triggering an
+            # execution to ensure that the latest parameters are used:
+            if instance not in replica.info:
+                replica.info[instance] = {'volumes_info': []}
+            replica.info[instance].update({
+                "source_environment": replica.source_environment,
+                "target_environment": replica.destination_environment})
+                # TODO(aznashwan): have these passed separately to the tasks
+                # (they're currently passed inside the dest-env)
+                # "network_map": network_map,
+                # "storage_mappings": storage_mappings,
+
             validate_replica_source_inputs_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS,
@@ -393,8 +539,7 @@ class ConductorServerEndpoint(object):
             get_instance_info_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_GET_INSTANCE_INFO,
-                execution,
-                depends_on=[validate_replica_source_inputs_task.id])
+                execution)
 
             validate_replica_destination_inputs_task = self._create_task(
                 instance,
@@ -402,18 +547,11 @@ class ConductorServerEndpoint(object):
                 execution,
                 depends_on=[get_instance_info_task.id])
 
-            depends_on = [
-                validate_replica_source_inputs_task.id,
-                validate_replica_destination_inputs_task.id]
-            if shutdown_instances:
-                shutdown_instance_task = self._create_task(
-                    instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
-                    execution, depends_on=depends_on)
-                depends_on = [shutdown_instance_task.id]
-
             deploy_replica_disks_task = self._create_task(
                 instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
-                execution, depends_on=depends_on)
+                execution, depends_on=[
+                    validate_replica_source_inputs_task.id,
+                    validate_replica_destination_inputs_task.id])
 
             deploy_replica_source_resources_task = self._create_task(
                 instance,
@@ -424,16 +562,22 @@ class ConductorServerEndpoint(object):
                 instance,
                 constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
                 execution, depends_on=[
-                    deploy_replica_disks_task.id,
-                    deploy_replica_source_resources_task.id])
+                    deploy_replica_disks_task.id])
+
+            depends_on = [
+                deploy_replica_source_resources_task.id,
+                deploy_replica_target_resources_task.id]
+            if shutdown_instances:
+                shutdown_instance_task = self._create_task(
+                    instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
+                    execution, depends_on=depends_on)
+                depends_on = [shutdown_instance_task.id]
 
             replicate_disks_task = self._create_task(
                 instance, constants.TASK_TYPE_REPLICATE_DISKS,
-                execution, depends_on=[
-                    deploy_replica_source_resources_task.id,
-                    deploy_replica_target_resources_task.id])
+                execution, depends_on=depends_on)
 
-            delete_source_resources_task = self._create_task(
+            self._create_task(
                 instance,
                 constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
                 execution,
@@ -443,14 +587,22 @@ class ConductorServerEndpoint(object):
             self._create_task(
                 instance,
                 constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
-                execution, depends_on=[
-                    replicate_disks_task.id, delete_source_resources_task.id],
+                execution, depends_on=[replicate_disks_task.id],
                 on_error=True)
 
+        self._check_execution_tasks_sanity(
+            execution, replica.info)
+
+        # update the action info for all of the Replicas:
+        for instance in execution.action.instances:
+            db_api.update_transfer_action_info_for_instance(
+                ctxt, replica.id, instance, replica.info[instance])
+
+        # add new execution to DB:
         db_api.add_replica_tasks_execution(ctxt, execution)
         LOG.info("Replica tasks execution created: %s", execution.id)
 
-        self._begin_tasks(ctxt, execution, replica.info)
+        self._begin_tasks(ctxt, execution, task_info=replica.info)
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
     @replica_synchronized
@@ -534,14 +686,14 @@ class ConductorServerEndpoint(object):
 
         if not has_tasks:
             raise exception.InvalidReplicaState(
-                "This replica does not have volumes information for any "
+                "Replica '%s' does not have volumes information for any "
                 "instance. Ensure that the replica has been executed "
-                "successfully priorly")
+                "successfully priorly" % replica_id)
 
         db_api.add_replica_tasks_execution(ctxt, execution)
         LOG.info("Replica tasks execution created: %s", execution.id)
 
-        self._begin_tasks(ctxt, execution, replica.info)
+        self._begin_tasks(ctxt, execution, task_info=replica.info)
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
     @staticmethod
@@ -567,7 +719,8 @@ class ConductorServerEndpoint(object):
         replica.source_environment = source_environment
         replica.instances = instances
         replica.executions = []
-        replica.info = {}
+        replica.info = {instance: {
+            'volumes_info': []} for instance in instances}
         replica.notes = notes
         replica.network_map = network_map
         replica.storage_mappings = storage_mappings
@@ -599,15 +752,16 @@ class ConductorServerEndpoint(object):
     @staticmethod
     def _check_running_replica_migrations(ctxt, replica_id):
         migrations = db_api.get_replica_migrations(ctxt, replica_id)
-        if [m.id for m in migrations if m.executions[0].status ==
-                constants.EXECUTION_STATUS_RUNNING]:
+        if [m.id for m in migrations if m.executions[0].status in (
+                constants.ACTIVE_EXECUTION_STATUSES)]:
             raise exception.InvalidReplicaState(
                 "This replica is currently being migrated")
 
     @staticmethod
     def _check_running_executions(action):
-        if [e for e in action.executions
-                if e.status == constants.EXECUTION_STATUS_RUNNING]:
+        if [
+                e for e in action.executions
+                if e.status in constants.ACTIVE_EXECUTION_STATUSES]:
             raise exception.InvalidActionTasksExecutionState(
                 "Another tasks execution is in progress")
 
@@ -753,10 +907,11 @@ class ConductorServerEndpoint(object):
                     depends_on=[cleanup_deployment_task.id],
                     on_error=True)
 
+        self._check_execution_tasks_sanity(execution, migration.info)
         db_api.add_migration(ctxt, migration)
         LOG.info("Migration created: %s", migration.id)
 
-        self._begin_tasks(ctxt, execution, migration.info)
+        self._begin_tasks(ctxt, execution, task_info=migration.info)
 
         return self.get_migration(ctxt, migration.id)
 
@@ -809,22 +964,24 @@ class ConductorServerEndpoint(object):
             migration, licensing_client.RESERVATION_TYPE_MIGRATION)
 
         for instance in instances:
-            # NOTE: we must explicitly set this in each VM's info
-            # to prevent the Replica disks from being cloned:
-            migration.info[instance] = {"clone_disks": False}
-            scripts = self._get_instance_scripts(user_scripts, instance)
-            migration.info[instance]["user_scripts"] = scripts
-
-            validate_migration_source_inputs_task = self._create_task(
-                instance,
-                constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
-                execution)
+            migration.info[instance] = {
+                "volumes_info": [],
+                "source_environment": source_environment,
+                "target_environment": destination_environment,
+                "user_scripts": self._get_instance_scripts(
+                    user_scripts, instance),
+                # NOTE: we must explicitly set this in each VM's info
+                # to prevent the Replica disks from being cloned:
+                "clone_disks": False}
+                # TODO(aznashwan): have these passed separately to the tasks
+                # (they're currently passed inside the dest-env)
+                # "network_map": network_map,
+                # "storage_mappings": storage_mappings,
 
             get_instance_info_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_GET_INSTANCE_INFO,
-                execution,
-                depends_on=[validate_migration_source_inputs_task.id])
+                execution)
 
             validate_migration_destination_inputs_task = self._create_task(
                 instance,
@@ -832,33 +989,35 @@ class ConductorServerEndpoint(object):
                 execution,
                 depends_on=[get_instance_info_task.id])
 
-            depends_on = [
-                validate_migration_source_inputs_task.id,
-                validate_migration_destination_inputs_task.id]
-
-            create_instance_disks_task = self._create_task(
-                instance, constants.TASK_TYPE_CREATE_INSTANCE_DISKS,
-                execution, depends_on=depends_on)
+            validate_migration_source_inputs_task = self._create_task(
+                instance,
+                constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
+                execution)
 
             deploy_migration_source_resources_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES,
-                execution, depends_on=[create_instance_disks_task.id])
+                execution, depends_on=[validate_migration_source_inputs_task.id])
+
+            create_instance_disks_task = self._create_task(
+                instance, constants.TASK_TYPE_CREATE_INSTANCE_DISKS,
+                execution, depends_on=[
+                    validate_migration_source_inputs_task.id,
+                    validate_migration_destination_inputs_task.id])
 
             deploy_migration_target_resources_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES,
-                execution, depends_on=[
-                    create_instance_disks_task.id,
-                    deploy_migration_source_resources_task.id])
+                execution, depends_on=[create_instance_disks_task.id])
 
             # NOTE(aznashwan): re-executing the REPLICATE_DISKS task only works
             # if all the source disk snapshotting and worker setup steps are
             # performed by the source plugin in REPLICATE_DISKS.
             # This should no longer be a problem when worker pooling lands.
-            # Alternatively, if the DEPLOY_REPLICA_SOURCE/DEST_RESOURCES tasks
-            # will no longer have a state conflict, iterating through and
-            # re-executing DEPLOY_REPLICA_SOURCE_RESOURCES will be required:
+            # Alternatively, REPLICATE_DISKS could be modfied to re-use the
+            # resources deployed during 'DEPLOY_SOURCE_RESOURCES'.
+            # These are currently not even passed to REPLICATE_DISKS (just
+            # their connection info), and should be fixed later.
             last_migration_task = None
             migration_resources_tasks = [
                 deploy_migration_source_resources_task.id,
@@ -892,17 +1051,16 @@ class ConductorServerEndpoint(object):
                 instance,
                 constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
                 execution, depends_on=[
-                    last_migration_task.id,
-                    delete_source_resources_task.id],
+                    last_migration_task.id],
                 on_error=True)
 
             deploy_instance_task = self._create_task(
                 instance, constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES,
                 execution, depends_on=[
-                    delete_source_resources_task.id,
                     delete_destination_resources_task.id])
 
             last_task = deploy_instance_task
+            task_delete_os_morphing_resources = None
             if not skip_os_morphing:
                 task_deploy_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
@@ -938,17 +1096,23 @@ class ConductorServerEndpoint(object):
                 execution, depends_on=[finalize_deployment_task.id],
                 on_error_only=True)
 
+            cleanup_deps = [
+                create_instance_disks_task.id,
+                delete_destination_resources_task.id,
+                cleanup_failed_deployment_task.id]
+            if not skip_os_morphing:
+                cleanup_deps.append(task_delete_os_morphing_resources.id)
             self._create_task(
                 instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
-                execution, depends_on=[
-                    create_instance_disks_task.id,
-                    cleanup_failed_deployment_task.id],
+                execution, depends_on=cleanup_deps,
                 on_error_only=True)
 
+        self._check_execution_tasks_sanity(
+            execution, migration.info)
         db_api.add_migration(ctxt, migration)
-        LOG.info("Migration created: %s", migration.id)
 
-        self._begin_tasks(ctxt, execution)
+        LOG.info("Migration created: %s", migration.id)
+        self._begin_tasks(ctxt, execution, task_info=migration.info)
 
         return self.get_migration(ctxt, migration.id)
 
@@ -983,7 +1147,8 @@ class ConductorServerEndpoint(object):
         self._cancel_tasks_execution(ctxt, execution, force)
         self._check_delete_reservation_for_transfer(migration)
 
-    def _cancel_tasks_execution(self, ctxt, execution, force=False):
+    def _cancel_tasks_execution(
+            self, ctxt, execution, requery=True, force=False):
         """ Cancels a running Execution by:
         - telling workers to kill any already running non-on-error tasks
         - cancelling any non-on-error tasks which are pending
@@ -995,13 +1160,15 @@ class ConductorServerEndpoint(object):
         LOG.debug(
             "Cancelling tasks execution %s. Current status before "
             "cancellation is '%s'", execution.id, execution.status)
-
         # mark execution as cancelling:
         self._set_tasks_execution_status(
             ctxt, execution.id, constants.EXECUTION_STATUS_CANCELLING)
         # iterate through and kill/cancel any non-error
         # tasks which are running/pending:
         for task in execution.tasks:
+            if requery:
+                task = db_api.get_task(ctxt, task.id)
+
             if force and task.status == constants.TASK_STATUS_CANCELLING:
                 LOG.warn(
                     "Task '%s' is in %s state, but forcibly setting to "
@@ -1080,10 +1247,12 @@ class ConductorServerEndpoint(object):
         """ Saves the ID of the worker host which has accepted and started
         the task to the DB and marks the task as 'RUNNING'. """
         task = db_api.get_task(ctxt, task_id)
-        if task.status != constants.TASK_STATUS_SCHEDULED:
+        if task.status == constants.TASK_STATUS_CANCELLING:
+            raise exception.TaskIsCancelling(task_id=task_id)
+        elif task.status != constants.TASK_STATUS_SCHEDULED:
             raise exception.InvalidTaskState(
                 "Task with ID '%s' is in '%s' status instead of the "
-                "expected '%s' required for it to be executed." % (
+                "expected '%s' required for it to have a task host set." % (
                     task_id, task.status, constants.TASK_STATUS_SCHEDULED))
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_status(
@@ -1107,7 +1276,7 @@ class ConductorServerEndpoint(object):
         determined_state = constants.EXECUTION_STATUS_RUNNING
         status_vals = task_statuses.values()
         if constants.TASK_STATUS_PENDING in status_vals and (
-                constants.TASK_STATUS_RUNNING not in status_vals or (
+                constants.TASK_STATUS_RUNNING not in status_vals and (
                     constants.TASK_STATUS_SCHEDULED not in status_vals)):
             LOG.warn(
                 "Execution '%s' is deadlocked. Task statuses are: %s",
@@ -1382,7 +1551,7 @@ class ConductorServerEndpoint(object):
     def _update_replica_volumes_info(self, ctxt, replica_id, instance,
                                      updated_task_info):
         """ WARN: the lock for the Replica must be pre-acquired. """
-        db_api.set_transfer_action_info(
+        db_api.update_transfer_action_info_for_instance(
             ctxt, replica_id, instance,
             updated_task_info)
 
@@ -1398,7 +1567,8 @@ class ConductorServerEndpoint(object):
             self._update_replica_volumes_info(
                 ctxt, replica_id, instance, updated_task_info)
 
-    def _handle_post_task_actions(self, ctxt, task, execution, task_info):
+    def _handle_post_task_actions(
+            self, ctxt, task, execution, task_info):
         task_type = task.task_type
 
         if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
@@ -1451,8 +1621,6 @@ class ConductorServerEndpoint(object):
         elif task_type in (
                 constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
                 constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA):
-            # NOTE: perform the actual db update on the Replica's properties:
-            db_api.update_replica(ctxt, execution.action_id, task_info)
             # NOTE: remember to update the `volumes_info`:
             # NOTE: considering this method is only called with a lock on the
             # `execution.action_id` (in a Replica update tasks' case that's the
@@ -1460,25 +1628,54 @@ class ConductorServerEndpoint(object):
             # `_update_replica_volumes_info` below:
             self._update_replica_volumes_info(
                 ctxt, execution.action_id, task.instance,
-                {"volumes_info": task_info.get("volumes_info")})
+                {"volumes_info": task_info.get("volumes_info", [])})
+
+            if task_type == constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
+                # check if this was the last task in the update execution:
+                still_running = False
+                for other_task in execution.tasks:
+                    if other_task.id == task.id:
+                        continue
+                    if other_task.status in constants.ACTIVE_TASK_STATUSES:
+                        still_running = True
+                        break
+                if not still_running:
+                    # it means this was the last update task in the Execution
+                    # and we may safely update the params of the Replica
+                    # as they are in the DB:
+                    db_api.update_replica(
+                        ctxt, execution.action_id, task_info)
 
     @task_and_execution_synchronized
-    def task_completed(self, ctxt, task_id, task_info):
+    def task_completed(self, ctxt, task_id, task_result):
         LOG.info("Task completed: %s", task_id)
 
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_COMPLETED)
-
         task = db_api.get_task(ctxt, task_id)
 
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         action_id = execution.action_id
         with lockutils.lock(action_id):
-            LOG.info("Setting instance %(instance)s "
-                     "action info: %(task_info)s",
-                     {"instance": task.instance, "task_info": task_info})
-            updated_task_info = db_api.set_transfer_action_info(
-                ctxt, action_id, task.instance, task_info)
+            updated_task_info = None
+            if task_result:
+                LOG.info(
+                    "Setting task %(task_id)s result for instance %(instance)s "
+                    "into action %(action_id)s info: %(task_result)s", {
+                        "task_id": task_id,
+                        "instance": task.instance,
+                        "action_id": action_id,
+                        "task_result": task_result})
+                updated_task_info = (
+                    db_api.update_transfer_action_info_for_instance(
+                        ctxt, action_id, task.instance, task_result))
+            else:
+                action = db_api.get_action(ctxt, action_id)
+                updated_task_info = action.info[task.instance]
+                LOG.info(
+                    "Task '%s' for instance '%s' of transfer action '%s' "
+                    "has completed successfuly but has not returned "
+                    "any result.", task.id, task.instance, action_id)
 
             self._handle_post_task_actions(
                 ctxt, task, execution, updated_task_info)
@@ -1564,7 +1761,8 @@ class ConductorServerEndpoint(object):
                         action_id, action.info.get(task.instance, {}).get(
                             'osmorphing_connection_info', {}))
                     self._set_tasks_execution_status(
-                        ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+                        ctxt, execution.id,
+                        constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING)
                 else:
                     LOG.warn(
                         "Some tasks are running in parallel with the "
@@ -1660,7 +1858,7 @@ class ConductorServerEndpoint(object):
 
     @replica_synchronized
     def update_replica(
-            self, ctxt, replica_id, properties):
+            self, ctxt, replica_id, updated_properties):
         replica = self._get_replica(ctxt, replica_id)
         self._check_replica_running_executions(ctxt, replica)
         self._check_valid_replica_tasks_execution(replica, force=True)
@@ -1670,15 +1868,32 @@ class ConductorServerEndpoint(object):
         execution.action = replica
         execution.type = constants.EXECUTION_TYPE_REPLICA_UPDATE
 
-        LOG.debug(
-            "Replica '%s' info pre-replica-update: %s",
-            replica_id, replica.info)
         for instance in execution.action.instances:
+            LOG.debug(
+                "Pre-replica-update task_info for instance '%s' of Replica "
+                "'%s': %s", instance, replica_id,
+                utils.filter_chunking_info_for_task(
+                    replica.info[instance]))
             # NOTE: "circular assignment" would lead to a `None` value
             # so we must operate on a copy:
             inst_info_copy = copy.deepcopy(replica.info[instance])
-            inst_info_copy.update(properties)
+            # NOTE: we update the various values in the task info itself
+            # As a result, the values within the task_info will be the updated
+            # values which will be checked. The old values will be send to the
+            # tasks through the origin/destination parameters for them to be
+            # compared to the new ones.
+            # The actual values on the Replica object itself will be set
+            # during _handle_post_task_actions once the final destination-side
+            # update task will be completed.
+            inst_info_copy.update(updated_properties)
             replica.info[instance] = inst_info_copy
+
+            LOG.debug(
+                "Updated task_info for instance '%s' of Replica "
+                "'%s' which will be verified during update procedure: %s",
+                instance, replica_id, utils.filter_chunking_info_for_task(
+                    replica.info[instance]))
+
             get_instance_info_task = self._create_task(
                 instance, constants.TASK_TYPE_GET_INSTANCE_INFO,
                 execution)
@@ -1690,14 +1905,16 @@ class ConductorServerEndpoint(object):
                 execution,
                 depends_on=[
                     get_instance_info_task.id,
+                    # NOTE: the dest-side update task must be done after
+                    # the source-side one as both can potentially modify
+                    # the 'volumes_info' together:
                     update_source_replica_task.id])
-        LOG.debug(
-            "Replica '%s' info post-replica-update: %s",
-            replica_id, replica.info)
+
+        self._check_execution_tasks_sanity(execution, replica.info)
         db_api.add_replica_tasks_execution(ctxt, execution)
         LOG.debug("Execution for Replica update tasks created: %s",
                   execution.id)
-        self._begin_tasks(ctxt, execution, replica.info)
+        self._begin_tasks(ctxt, execution, task_info=replica.info)
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
     def get_diagnostics(self, ctxt):

+ 22 - 4
coriolis/db/api.py

@@ -5,6 +5,7 @@ from oslo_config import cfg
 from oslo_db import api as db_api
 from oslo_db import options as db_options
 from oslo_db.sqlalchemy import enginefacade
+from oslo_log import log as logging
 from oslo_utils import timeutils
 from sqlalchemy import func
 from sqlalchemy import or_
@@ -17,6 +18,7 @@ from coriolis import exception
 CONF = cfg.CONF
 db_options.set_defaults(CONF)
 
+LOG = logging.getLogger(__name__)
 
 _BACKEND_MAPPING = {'sqlalchemy': 'coriolis.db.sqlalchemy.api'}
 IMPL = db_api.DBAPI.from_config(CONF, backend_mapping=_BACKEND_MAPPING)
@@ -449,17 +451,33 @@ def get_action(context, action_id):
 
 
 @enginefacade.writer
-def set_transfer_action_info(context, action_id, instance, instance_info):
+def update_transfer_action_info_for_instance(
+        context, action_id, instance, new_instance_info):
+    """ Updates the info for the given action with the provided dict.
+    Returns the updated value.
+    Sub-fields of the dict already in the info will get overwritten entirely!
+    """
     action = get_action(context, action_id)
+    if not new_instance_info:
+        LOG.debug(
+            "No new info provided for action '%s' and instance '%s'",
+            action_id, instance)
+        return action.info.get(instance, {})
 
     # Copy is needed, otherwise sqlalchemy won't save the changes
     action_info = action.info.copy()
     if instance in action_info:
         instance_info_old = action_info[instance].copy()
-        instance_info_old.update(instance_info)
+        overwritten_keys = [
+            k for k in new_instance_info.keys()
+            if k in instance_info_old.keys()]
+        if overwritten_keys:
+            LOG.debug(
+                "Overwriting the values of the following keys for info of "
+                "instance '%s' of action with ID '%s': %s",
+                instance, action_id, overwritten_keys)
+        instance_info_old.update(new_instance_info)
         action_info[instance] = instance_info_old
-    else:
-        action_info[instance] = instance_info
     action.info = action_info
 
     return action_info[instance]

+ 8 - 0
coriolis/exception.py

@@ -32,6 +32,10 @@ LOG = logging.getLogger(__name__)
 CONF = cfg.CONF
 
 
+TASK_ALREADY_CANCELLING_EXCEPTION_FMT = (
+    "Task %(task_id)s is in CANCELLING status.")
+
+
 class ConvertedException(webob.exc.WSGIHTTPException):
 
     def __init__(self, code=500, title="", explanation=""):
@@ -185,6 +189,10 @@ class InvalidTaskState(Invalid):
         'Task "%(task_id)s" in in an invalid state: %(task_state)s')
 
 
+class TaskIsCancelling(InvalidTaskState):
+    message = _(TASK_ALREADY_CANCELLING_EXCEPTION_FMT)
+
+
 class InvalidActionTasksExecutionState(Invalid):
     message = _("Invalid tasks execution state: %(reason)s")
 

+ 2 - 2
coriolis/replicas/api.py

@@ -16,9 +16,9 @@ class API(object):
             source_environment, destination_environment, instances,
             network_map, storage_mappings, notes)
 
-    def update(self, ctxt, replica_id, properties):
+    def update(self, ctxt, replica_id, updated_properties):
         return self._rpc_client.update_replica(
-            ctxt, replica_id, properties)
+            ctxt, replica_id, updated_properties)
 
     def delete(self, ctxt, replica_id):
         self._rpc_client.delete_replica(ctxt, replica_id)

+ 67 - 1
coriolis/tasks/base.py

@@ -9,6 +9,7 @@ from oslo_log import log as logging
 from six import with_metaclass
 
 from coriolis import constants
+from coriolis import exception
 from coriolis import utils
 from coriolis.providers import factory as providers_factory
 
@@ -51,10 +52,75 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
 
         return required_libs
 
+    @property
     @abc.abstractmethod
+    def required_task_info_properties(self):
+        """ Returns a list of the string fields which are required
+        to be present during the tasks' run method. """
+        pass
+
+    @property
+    @abc.abstractmethod
+    def returned_task_info_properties(self):
+        """ Returns a list of the string fields which are returned by the
+        tasks' run method to be added to the task info.
+        """
+        pass
+
+    @abc.abstractmethod
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        """ The actual logic run by the task.
+        Should return a dict with all the fields declared by
+        'self.returned_task_info_properties'.
+        Must be implemented in all child classes.
+        """
+        pass
+
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
-        pass
+        """ Runs the task with the given params and returns
+        a dict with the results.
+        NOTE: This should NOT modify the existing task_info in any way.
+        """
+        missing_info_props = [
+            prop for prop in self.required_task_info_properties
+            if prop not in task_info]
+        if missing_info_props:
+            raise exception.CoriolisException(
+                "Task type '%s' asked to run on task info with "
+                "missing properties: %s" % (
+                    self.__class__, missing_info_props))
+
+        result = self._run(
+            ctxt, instance, origin, destination, task_info, event_handler)
+
+        if type(result) is not dict:
+            raise exception.CoriolisException(
+                "Task type '%s' returned result of type %s "
+                "instead of a dict: %s" % (
+                    self.__class__, type(result), result))
+
+        missing_returns = [
+            prop for prop in self.returned_task_info_properties
+            if prop not in result.keys()]
+        if missing_returns:
+            raise exception.CoriolisException(
+                "Task type '%s' failed to return the following "
+                "declared return values in its result: %s. "
+                "Result was: %s" % (
+                    self.__class__, missing_returns, result))
+
+        undeclared_returns = [
+            prop for prop in result.keys()
+            if prop not in self.returned_task_info_properties]
+        if undeclared_returns:
+            raise exception.CoriolisException(
+                "Task type '%s' returned the following undeclared "
+                "keys in its result: %s" % (
+                    self.__class__, undeclared_returns))
+
+        return result
 
 
 def get_connection_info(ctxt, data):

+ 2 - 2
coriolis/tasks/factory.py

@@ -81,9 +81,9 @@ _TASKS_MAP = {
 }
 
 
-def get_task_runner(task_type):
+def get_task_runner_class(task_type):
     cls = _TASKS_MAP.get(task_type)
     if not cls:
         raise exception.NotFound(
             "TaskRunner not found for task type: %s" % task_type)
-    return cls()
+    return cls

+ 18 - 9
coriolis/tasks/migration_tasks.py

@@ -13,29 +13,38 @@ LOG = logging.getLogger(__name__)
 
 
 class GetOptimalFlavorTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_INSTANCE_FLAVOR,
             event_handler)
 
         connection_info = base.get_connection_info(ctxt, destination)
-        target_environment = destination.get("target_environment") or {}
+        target_environment = task_info["target_environment"]
         export_info = task_info["export_info"]
 
         flavor = provider.get_optimal_flavor(
             ctxt, connection_info, target_environment, export_info)
 
-        if task_info.get("instance_deployment_info") is None:
-            task_info["instance_deployment_info"] = {}
-        task_info["instance_deployment_info"]["selected_flavor"] = flavor
+        instance_deployment_info = task_info.get("instance_deployment_info")
+        if instance_deployment_info is None:
+            instance_deployment_info = {}
+        instance_deployment_info["selected_flavor"] = flavor
 
         events.EventManager(event_handler).progress_update(
             "Selected flavor: %s" % flavor)
 
-        task_info["retain_export_path"] = True
-
-        return task_info
+        return {
+            "instance_deployment_info": instance_deployment_info}
 
 
 class DeployMigrationSourceResourcesTask(

+ 53 - 17
coriolis/tasks/osmorphing_tasks.py

@@ -9,17 +9,27 @@ from coriolis.tasks import base
 
 
 class OSMorphingTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
 
-        origin_provider_type = task_info["origin_provider_type"]
-        destination_provider_type = task_info["destination_provider_type"]
+    @property
+    def required_task_info_properties(self):
+        return [
+            "osmorphing_info", "osmorphing_connection_info",
+            "user_scripts"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
 
         origin_provider = providers_factory.get_provider(
-            origin["type"], origin_provider_type, event_handler)
+            origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
+            event_handler)
 
         destination_provider = providers_factory.get_provider(
-            destination["type"], destination_provider_type, event_handler)
+            destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
+            event_handler)
 
         osmorphing_connection_info = base.unmarshal_migr_conn_info(
             task_info['osmorphing_connection_info'])
@@ -43,12 +53,23 @@ class OSMorphingTask(base.TaskRunner):
             instance_script,
             event_handler)
 
-        return task_info
+        return {}
 
 
 class DeployOSMorphingResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return [
+            "os_morphing_resources", "osmorphing_info",
+            "osmorphing_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_OS_MORPHING,
             event_handler)
@@ -58,21 +79,34 @@ class DeployOSMorphingResourcesTask(base.TaskRunner):
         import_info = provider.deploy_os_morphing_resources(
             ctxt, connection_info, instance_deployment_info)
 
-        task_info["os_morphing_resources"] = import_info.get(
+        result = {}
+        result["os_morphing_resources"] = import_info.get(
             "os_morphing_resources")
-        task_info["osmorphing_info"] = import_info.get("osmorphing_info", {})
-        task_info["osmorphing_connection_info"] = base.marshal_migr_conn_info(
+        result["osmorphing_info"] = import_info.get("osmorphing_info", {})
+        result["osmorphing_connection_info"] = base.marshal_migr_conn_info(
             import_info["osmorphing_connection_info"])
 
         schemas.validate_value(
-            task_info, schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA)
+            task_info, schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
 
-        return task_info
+        return result
 
 
 class DeleteOSMorphingResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["os_morphing_resources"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["os_morphing_resources", "osmorphing_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_OS_MORPHING,
             event_handler)
@@ -82,4 +116,6 @@ class DeleteOSMorphingResourcesTask(base.TaskRunner):
         provider.delete_os_morphing_resources(
             ctxt, connection_info, os_morphing_resources)
 
-        return task_info
+        return {
+            "os_morphing_resources": None,
+            "osmorphing_connection_info": None}

+ 297 - 129
coriolis/tasks/replica_tasks.py

@@ -64,42 +64,74 @@ def _check_ensure_volumes_info_ordering(export_info, volumes_info):
 
 
 class GetInstanceInfoTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    """ Task which gathers the export info for a VM.  """
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["export_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment']
         export_info = provider.get_replica_instance_info(
             ctxt, connection_info, source_environment, instance)
 
         # Validate the output
         schemas.validate_value(
             export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
-        task_info["export_info"] = export_info
 
-        return task_info
+        return {
+            'export_info': export_info}
 
 
 class ShutdownInstanceTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    """ Task which shuts down a VM. """
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment']
         provider.shutdown_instance(ctxt, connection_info, source_environment,
                                    instance)
-
-        return task_info
+        return {}
 
 
 class ReplicateDisksTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return [
+            "export_info", "volumes_info", "source_environment",
+            "source_resources",
+            "source_resources_connection_info",
+            "target_resources_connection_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
@@ -112,7 +144,7 @@ class ReplicateDisksTask(base.TaskRunner):
             {"volumes_info": volumes_info},
             schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
 
-        migr_source_conn_info = task_info["migr_source_connection_info"]
+        migr_source_conn_info = task_info["source_resources_connection_info"]
         if migr_source_conn_info:
             schemas.validate_value(
                 migr_source_conn_info,
@@ -120,11 +152,17 @@ class ReplicateDisksTask(base.TaskRunner):
         migr_source_conn_info = base.unmarshal_migr_conn_info(
             migr_source_conn_info)
 
-        migr_target_conn_info = task_info["migr_target_connection_info"]
+        migr_target_conn_info = task_info["target_resources_connection_info"]
         incremental = task_info.get("incremental", True)
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment']
 
+        # TODO(aznashwan): in order to facilitate parallelized setups,
+        # we should modify the replicate_disks provider method to allow for the
+        # passing in of source_resources info as well.
+        # This could be used to for example pass in the ID/info of a
+        # pre-created source worker VM which can then be (re)used by the
+        # Replicate disks task during PMR.
         volumes_info = provider.replicate_disks(
             ctxt, connection_info, source_environment, instance,
             migr_source_conn_info, migr_target_conn_info, volumes_info,
@@ -135,15 +173,24 @@ class ReplicateDisksTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            'volumes_info': volumes_info}
 
 
 class DeployReplicaDisksTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return [
+            "export_info", "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
-        target_environment = destination.get("target_environment") or {}
+        target_environment = task_info['target_environment']
         export_info = task_info["export_info"]
 
         provider = providers_factory.get_provider(
@@ -151,11 +198,7 @@ class DeployReplicaDisksTask(base.TaskRunner):
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
 
-        volumes_info = task_info.get("volumes_info", [])
-        if volumes_info is None:
-            # In case Replica disks were deleted:
-            volumes_info = []
-
+        volumes_info = task_info.get("volumes_info")
         volumes_info = provider.deploy_replica_disks(
             ctxt, connection_info, target_environment, instance, export_info,
             volumes_info)
@@ -165,13 +208,22 @@ class DeployReplicaDisksTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            'volumes_info': volumes_info}
 
 
 class DeleteReplicaDisksTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return [
+            "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         if not task_info.get("volumes_info"):
             LOG.debug(
@@ -185,6 +237,7 @@ class DeleteReplicaDisksTask(base.TaskRunner):
 
         volumes_info = _get_volumes_info(task_info)
 
+        # TODO (aznashwan): add target_env options to `delete_replica_disks`:
         volumes_info = provider.delete_replica_disks(
             ctxt, connection_info, volumes_info)
         if volumes_info:
@@ -192,25 +245,31 @@ class DeleteReplicaDisksTask(base.TaskRunner):
                 "'volumes_info' should have been void after disk "
                 "deletion: %s" % volumes_info)
 
-        task_info["volumes_info"] = []
-
-        return task_info
+        return {
+            'volumes_info': volumes_info}
 
 
 class DeployReplicaSourceResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["source_resources", "source_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment'] or {}
         replica_resources_info = provider.deploy_replica_source_resources(
             ctxt, connection_info, source_environment)
 
-        task_info["migr_source_resources"] = replica_resources_info[
-            "migr_resources"]
         migr_connection_info = replica_resources_info.get("connection_info")
         if migr_connection_info:
             migr_connection_info = base.marshal_migr_conn_info(
@@ -222,37 +281,55 @@ class DeployReplicaSourceResourcesTask(base.TaskRunner):
                 # can [try] to deal with the temporary resources.
                 raise_on_error=False)
 
-        task_info["migr_source_connection_info"] = migr_connection_info
-
-        return task_info
+        return {
+            "source_resources": replica_resources_info["migr_resources"],
+            "source_resources_connection_info": migr_connection_info}
 
 
 class DeleteReplicaSourceResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment", "source_resources"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["source_resources", "source_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
 
-        migr_resources = task_info.get("migr_source_resources")
-
-        source_environment = origin.get("source_environment", {})
+        migr_resources = task_info["source_resources"]
+        source_environment = origin["source_environment"]
 
         if migr_resources:
             provider.delete_replica_source_resources(
                 ctxt, connection_info, source_environment, migr_resources)
 
-        task_info["migr_source_resources"] = None
-        task_info["migr_source_connection_info"] = None
-
-        return task_info
+        return {
+            "source_resources": None,
+            "source_resources_connection_info": None}
 
 
 class DeployReplicaTargetResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
-        target_environment = destination.get("target_environment") or {}
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return [
+            "volumes_info", "target_resources",
+            "target_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        target_environment = task_info["target_environment"]
         export_info = task_info['export_info']
 
         provider = providers_factory.get_provider(
@@ -274,47 +351,64 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, replica_resources_info["volumes_info"])
 
-        task_info["volumes_info"] = volumes_info
-        task_info["migr_target_resources"] = replica_resources_info[
-            "migr_resources"]
-
         migr_connection_info = replica_resources_info["connection_info"]
         try:
             backup_writers.BackupWritersFactory(
                 migr_connection_info, None).get_writer()
         except BaseException as err:
-            LOG.exception(
-                "Invalid connection info: %s" % err)
-
-        task_info["migr_target_connection_info"] = migr_connection_info
+            LOG.warn(
+                "Seemingly invalid connection info. Replica will likely "
+                "fail during disk Replication. Error is: %s" % err)
 
-        return task_info
+        return {
+            "volumes_info": volumes_info,
+            "target_resources": replica_resources_info["migr_resources"],
+            "target_resources_connection_info": migr_connection_info}
 
 
 class DeleteReplicaTargetResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return ["target_resources", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return [
+            "target_resources", "target_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
 
-        migr_resources = task_info.get("migr_target_resources")
+        migr_resources = task_info.get("target_resources")
 
         if migr_resources:
+            # TODO (aznashwan): add 'target_env' param to call:
             provider.delete_replica_target_resources(
                 ctxt, connection_info, migr_resources)
 
-        task_info["migr_target_resources"] = None
-        task_info["migr_target_connection_info"] = None
-
-        return task_info
+        return {
+            "target_resources": None,
+            "target_resources_connection_info": None}
 
 
 class DeployReplicaInstanceTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
-        target_environment = destination.get("target_environment") or {}
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment", "clone_disks"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        target_environment = task_info["target_environment"]
         export_info = task_info["export_info"]
 
         provider = providers_factory.get_provider(
@@ -330,23 +424,23 @@ class DeployReplicaInstanceTask(base.TaskRunner):
             ctxt, connection_info, target_environment, instance,
             export_info, volumes_info, clone_disks)
 
-        if task_info.get("instance_deployment_info") is None:
-            task_info["instance_deployment_info"] = {}
-        task_info["instance_deployment_info"].update(import_info[
-            "instance_deployment_info"])
+        return {
+            "instance_deployment_info": import_info[
+                'instance_deployment_info']}
 
-        task_info[
-            "origin_provider_type"] = constants.PROVIDER_TYPE_REPLICA_EXPORT
-        task_info[
-            "destination_provider_type"
-        ] = constants.PROVIDER_TYPE_REPLICA_IMPORT
 
-        return task_info
+class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
 
+    @property
+    def required_task_info_properties(self):
+        return ["instance_deployment_info"]
 
-class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    @property
+    def returned_task_info_properties(self):
+        return ["transfer_result"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
@@ -355,19 +449,27 @@ class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
 
         result = provider.finalize_replica_instance_deployment(
             ctxt, connection_info, instance_deployment_info)
-        if result is not None:
-            task_info["transfer_result"] = result
-        else:
+        if result is None:
             LOG.warn(
                 "'None' was returned as result for Finalize Replica Instance "
                 "deployment task '%s'.", task_info)
 
-        return task_info
+        return {
+            "transfer_result": result}
 
 
 class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
@@ -378,12 +480,22 @@ class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
         provider.cleanup_failed_replica_instance_deployment(
             ctxt, connection_info, instance_deployment_info)
 
-        return task_info
+        return {
+            "instance_deployment_info": None}
 
 
 class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
@@ -400,14 +512,22 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}
 
 
 class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         export_info = task_info['export_info']
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
@@ -424,14 +544,22 @@ class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}
 
 
 class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
@@ -448,14 +576,22 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}
 
 
 class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         origin_type = origin["type"]
         source_provider = providers_factory.get_provider(
@@ -469,12 +605,21 @@ class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
         else:
             source_provider.validate_replica_export_input(
                 ctxt, origin_connection_info, instance,
-                source_environment=origin.get("source_environment", {}))
+                source_environment=task_info["source_environment"])
 
-        return task_info
+        return {}
 
 
 class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
     def _validate_provider_replica_import_input(
             self, provider, ctxt, conn_info, target_environment, export_info):
         provider.validate_replica_import_input(
@@ -482,8 +627,8 @@ class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
             check_os_morphing_resources=False,
             check_final_vm_params=False)
 
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         destination_type = destination["type"]
 
@@ -506,19 +651,26 @@ class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
                 "Replica Import validation for destination platform "
                 "'%s'" % destination_type)
 
-        # NOTE: the target environment JSON schema should have been validated
-        # upon accepting the Replica API creation request.
-        target_environment = destination.get("target_environment", {})
+        target_environment = task_info["target_environment"]
         self._validate_provider_replica_import_input(
             destination_provider, ctxt, destination_connection_info,
             target_environment, export_info)
 
-        return task_info
+        return {}
 
 
 class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         destination_connection_info = base.get_connection_info(
             ctxt, destination)
@@ -542,16 +694,25 @@ class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
 
         # NOTE: the target environment JSON schema should have been validated
         # upon accepting the Replica API creation request.
-        target_environment = destination.get("target_environment", {})
+        target_environment = task_info['target_environment']
         destination_provider.validate_replica_deployment_input(
             ctxt, destination_connection_info, target_environment, export_info)
 
-        return task_info
+        return {}
 
 
 class UpdateSourceReplicaTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["volumes_info", "source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         new_source_env = task_info.get('source_environment', {})
         if not new_source_env:
@@ -574,7 +735,7 @@ class UpdateSourceReplicaTask(base.TaskRunner):
         # NOTE: the `source_environment` in the `origin` is the one set
         # in the dedicated DB column of the Replica and thus stores
         # the previous value of it:
-        old_source_env = origin.get('source_environment', {})
+        old_source_env = origin.get('source_environment')
         volumes_info = (
             source_provider.check_update_source_environment_params(
                 ctxt, origin_connection_info, instance, volumes_info,
@@ -583,16 +744,24 @@ class UpdateSourceReplicaTask(base.TaskRunner):
             schemas.validate_value(
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
 
-        task_info['volumes_info'] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}
 
 
 class UpdateDestinationReplicaTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         event_manager = events.EventManager(event_handler)
-        new_destination_env = task_info.get('destination_environment', {})
+        new_destination_env = task_info.get('target_environment', {})
         if not new_destination_env:
             event_manager.progress_update(
                 "No new destination environment options provided")
@@ -628,6 +797,5 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
             volumes_info = _check_ensure_volumes_info_ordering(
                 export_info, volumes_info)
 
-        task_info['volumes_info'] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}

+ 29 - 48
coriolis/worker/rpc/server.py

@@ -25,14 +25,8 @@ from coriolis.tasks import factory as task_runners_factory
 from coriolis import utils
 
 
-worker_opts = [
-    cfg.StrOpt('export_base_path',
-               default='/tmp',
-               help='The path used for hosting exported disks.'),
-]
-
 CONF = cfg.CONF
-CONF.register_opts(worker_opts, 'worker')
+CONF.register_opts([], 'worker')
 
 LOG = logging.getLogger(__name__)
 
@@ -146,7 +140,8 @@ class WorkerServerEndpoint(object):
         libraries needed by the source/destination providers.
         """
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
-        task_runner = task_runners_factory.get_task_runner(task_type)
+        task_runner = task_runners_factory.get_task_runner_class(
+            task_type)()
 
         return task_runner.get_shared_libs_for_providers(
             ctxt, origin, destination, event_handler)
@@ -184,8 +179,18 @@ class WorkerServerEndpoint(object):
 
         self._start_process_with_custom_library_paths(p, extra_library_paths)
         LOG.info("Task process started: %s", task_id)
-        self._rpc_conductor_client.set_task_host(
-            ctxt, task_id, self._server, p.pid)
+        try:
+            self._rpc_conductor_client.set_task_host(
+                ctxt, task_id, self._server, p.pid)
+        except (Exception, KeyboardInterrupt) as ex:
+            # NOTE: because the task error classes are wrapped,
+            # it's easiest to just check that the messages align:
+            cancelling_msg = exception.TASK_ALREADY_CANCELLING_EXCEPTION_FMT % {
+                "task_id": task_id}
+            if cancelling_msg in str(ex):
+                raise exception.TaskIsCancelling(
+                    "Task '%s' was already in cancelling status." % task_id)
+            raise
 
         evt = eventlet.spawn(self._wait_for_process, p, mp_q)
         eventlet.spawn(self._handle_mp_log_events, p, mp_log_q)
@@ -193,7 +198,7 @@ class WorkerServerEndpoint(object):
         result = evt.wait()
         p.join()
 
-        if not result:
+        if result is None:
             raise exception.CoriolisException("Task canceled")
 
         if isinstance(result, str):
@@ -202,39 +207,21 @@ class WorkerServerEndpoint(object):
 
     def exec_task(self, ctxt, task_id, task_type, origin, destination,
                   instance, task_info):
-        export_path = task_info.get("export_path")
-        if not export_path:
-            export_path = _get_task_export_path(task_id, create=True)
-            task_info["export_path"] = export_path
-        retain_export_path = False
-        task_info["retain_export_path"] = retain_export_path
-
         try:
-            new_task_info = self._exec_task_process(
+            task_result = self._exec_task_process(
                 ctxt, task_id, task_type, origin, destination,
                 instance, task_info)
 
-            if new_task_info:
-                LOG.info(
-                    "Task info: %s",
-                    utils.filter_chunking_info_for_task(new_task_info))
-
-            # TODO(alexpilotti): replace the temp storage with a host
-            # independent option
-            retain_export_path = new_task_info.get("retain_export_path", False)
-            if not retain_export_path:
-                del new_task_info["export_path"]
+            LOG.info(
+                "Output of completed %s task with ID %s: %s",
+                task_type, task_type,
+                utils.filter_chunking_info_for_task(task_result))
 
-            LOG.info("Task completed: %s", task_id)
-            self._rpc_conductor_client.task_completed(ctxt, task_id,
-                                                      new_task_info)
+            self._rpc_conductor_client.task_completed(
+                ctxt, task_id, task_result)
         except Exception as ex:
             LOG.exception(ex)
-            self._check_remove_dir(export_path)
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))
-        finally:
-            if not retain_export_path:
-                self._check_remove_dir(export_path)
 
     def get_endpoint_instances(self, ctxt, platform_name, connection_info,
                                source_environment, marker, limit,
@@ -429,18 +416,11 @@ class WorkerServerEndpoint(object):
             schemas["source_environment_schema"] = schema
 
         return schemas
-    
+
     def get_diagnostics(self, ctxt):
         return utils.get_diagnostics_info()
 
 
-def _get_task_export_path(task_id, create=False):
-    export_path = os.path.join(CONF.worker.export_base_path, task_id)
-    if create and not os.path.exists(export_path):
-        os.makedirs(export_path)
-    return export_path
-
-
 def _setup_task_process(mp_log_q):
     # Setting up logging and cfg, needed since this is a new process
     cfg.CONF(sys.argv[1:], project='coriolis', version="1.0.0")
@@ -458,7 +438,8 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
     try:
         _setup_task_process(mp_log_q)
 
-        task_runner = task_runners_factory.get_task_runner(task_type)
+        task_runner = task_runners_factory.get_task_runner_class(
+            task_type)()
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
 
         LOG.debug("Executing task: %(task_id)s, type: %(task_type)s, "
@@ -470,11 +451,11 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
                    "task_info": utils.filter_chunking_info_for_task(
                        task_info)})
 
-        new_task_info = task_runner.run(
+        task_result = task_runner.run(
             ctxt, instance, origin, destination, task_info, event_handler)
         # mq_p.put() doesn't raise if new_task_info is not serializable
-        utils.is_serializable(new_task_info)
-        mp_q.put(new_task_info)
+        utils.is_serializable(task_result)
+        mp_q.put(task_result)
     except Exception as ex:
         mp_q.put(str(ex))
         LOG.exception(ex)