Kaynağa Gözat

Merge pull request #114 from aznashwan/on-error-always-tasks

Overhaul task execution loop.
Nashwan Azhari 6 yıl önce
ebeveyn
işleme
bc19aea3b6

+ 11 - 25
coriolis/api/v1/views/replica_tasks_execution_view.py

@@ -23,31 +23,17 @@ CONF = conf.CONF
 CONF.register_opts(REPLICA_EXECUTION_API_OPTS)
 CONF.register_opts(REPLICA_EXECUTION_API_OPTS)
 
 
 
 
-def _sort_tasks(tasks):
-    non_error_only_tasks = [t for t in tasks if
-                            not t["depends_on"] and not t["on_error"]]
-
-    def _add_non_error_tasks(task_id):
-        for t in tasks:
-            if (t["depends_on"] and task_id in t["depends_on"] and
-                    t not in non_error_only_tasks):
-                non_error_only_tasks.append(t)
-                _add_non_error_tasks(t["id"])
-
-    for t in non_error_only_tasks:
-        _add_non_error_tasks(t["id"])
-
-    # Include error only tasks only if executed
-    error_only_tasks = [t for t in tasks if t["status"] !=
-                        constants.TASK_STATUS_ON_ERROR_ONLY and
-                        t not in non_error_only_tasks]
-
-    sorted_tasks = sorted(
-        non_error_only_tasks, key=lambda t: t.get('index', 0))
-    sorted_tasks.extend(sorted(
-        error_only_tasks, key=lambda t: t.get('index', 0)))
-
-    return sorted_tasks
+def _sort_tasks(tasks, filter_error_only_tasks=True):
+    """ Sorts the given list of dicts representing tasks.
+    Tasks are sorted primarily based on their index.
+    """
+    if filter_error_only_tasks:
+        tasks = [
+            t for t in tasks
+            if t['status'] != (
+                constants.TASK_STATUS_ON_ERROR_ONLY)]
+    return sorted(
+        tasks, key=lambda t: t.get('index', 0))
 
 
 
 
 def format_replica_tasks_execution(req, execution, keys=None):
 def format_replica_tasks_execution(req, execution, keys=None):

+ 1 - 0
coriolis/cmd/conductor.py

@@ -16,6 +16,7 @@ def main():
     CONF(sys.argv[1:], project='coriolis',
     CONF(sys.argv[1:], project='coriolis',
          version="1.0.0")
          version="1.0.0")
     utils.setup_logging()
     utils.setup_logging()
+    service.check_locks_dir_empty()
 
 
     server = service.MessagingService(
     server = service.MessagingService(
         'coriolis_conductor', [rpc_server.ConductorServerEndpoint()],
         'coriolis_conductor', [rpc_server.ConductorServerEndpoint()],

+ 9 - 4
coriolis/conductor/rpc/client.py

@@ -236,9 +236,14 @@ class ConductorClient(object):
             ctxt, 'set_task_host', task_id=task_id, host=host,
             ctxt, 'set_task_host', task_id=task_id, host=host,
             process_id=process_id)
             process_id=process_id)
 
 
-    def task_completed(self, ctxt, task_id, task_info):
+    def task_completed(self, ctxt, task_id, task_result):
         self._client.call(
         self._client.call(
-            ctxt, 'task_completed', task_id=task_id, task_info=task_info)
+            ctxt, 'task_completed', task_id=task_id, task_result=task_result)
+
+    def confirm_task_cancellation(self, ctxt, task_id, cancellation_details):
+        self._client.call(
+            ctxt, 'confirm_task_cancellation', task_id=task_id,
+            cancellation_details=cancellation_details)
 
 
     def set_task_error(self, ctxt, task_id, exception_details):
     def set_task_error(self, ctxt, task_id, exception_details):
         self._client.call(ctxt, 'set_task_error', task_id=task_id,
         self._client.call(ctxt, 'set_task_error', task_id=task_id,
@@ -292,11 +297,11 @@ class ConductorClient(object):
             schedule_id=schedule_id,
             schedule_id=schedule_id,
             expired=expired)
             expired=expired)
 
 
-    def update_replica(self, ctxt, replica_id, properties):
+    def update_replica(self, ctxt, replica_id, updated_properties):
         return self._client.call(
         return self._client.call(
             ctxt, 'update_replica',
             ctxt, 'update_replica',
             replica_id=replica_id,
             replica_id=replica_id,
-            properties=properties)
+            updated_properties=updated_properties)
 
 
     def get_diagnostics(self, ctxt):
     def get_diagnostics(self, ctxt):
         return self._client.call(ctxt, 'get_diagnostics')
         return self._client.call(ctxt, 'get_diagnostics')

Dosya farkı çok büyük olduğundan ihmal edildi
+ 942 - 214
coriolis/conductor/rpc/server.py


+ 66 - 0
coriolis/constants.py

@@ -4,15 +4,66 @@
 EXECUTION_STATUS_RUNNING = "RUNNING"
 EXECUTION_STATUS_RUNNING = "RUNNING"
 EXECUTION_STATUS_COMPLETED = "COMPLETED"
 EXECUTION_STATUS_COMPLETED = "COMPLETED"
 EXECUTION_STATUS_ERROR = "ERROR"
 EXECUTION_STATUS_ERROR = "ERROR"
+EXECUTION_STATUS_DEADLOCKED = "DEADLOCKED"
+EXECUTION_STATUS_CANCELED = "CANCELED"
+EXECUTION_STATUS_CANCELLING = "CANCELLING"
+EXECUTION_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
+
+ACTIVE_EXECUTION_STATUSES = [
+    EXECUTION_STATUS_RUNNING,
+    EXECUTION_STATUS_CANCELLING
+]
+
+FINALIZED_EXECUTION_STATUSES = [
+    EXECUTION_STATUS_COMPLETED,
+    EXECUTION_STATUS_CANCELED,
+    EXECUTION_STATUS_ERROR,
+    EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
+    EXECUTION_STATUS_DEADLOCKED
+]
 
 
+TASK_STATUS_SCHEDULED = "SCHEDULED"
 TASK_STATUS_PENDING = "PENDING"
 TASK_STATUS_PENDING = "PENDING"
+TASK_STATUS_UNSCHEDULED = "UNSCHEDULED"
 TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_ERROR = "ERROR"
+TASK_STATUS_FORCE_CANCELED = "FORCE_CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
+TASK_STATUS_CANCELED_AFTER_COMPLETION = "CANCELED_AFTER_COMPLETION"
+TASK_STATUS_CANCELLING = "CANCELLING"
+TASK_STATUS_CANCELLING_AFTER_COMPLETION = "CANCELLING_AFTER_COMPLETION"
 TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
 TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
+TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 
 
+ACTIVE_TASK_STATUSES = [
+    TASK_STATUS_PENDING,
+    TASK_STATUS_RUNNING,
+    TASK_STATUS_CANCELLING,
+    TASK_STATUS_CANCELLING_AFTER_COMPLETION
+]
+
+CANCELED_TASK_STATUSES = [
+    TASK_STATUS_CANCELED,
+    TASK_STATUS_UNSCHEDULED,
+    TASK_STATUS_FORCE_CANCELED,
+    TASK_STATUS_CANCELED_AFTER_COMPLETION,
+    TASK_STATUS_CANCELED_FOR_DEBUGGING,
+    TASK_STATUS_CANCELED_FROM_DEADLOCK
+]
+
+FINALIZED_TASK_STATUSES = [
+    TASK_STATUS_COMPLETED,
+    TASK_STATUS_ERROR,
+    TASK_STATUS_UNSCHEDULED,
+    TASK_STATUS_CANCELED,
+    TASK_STATUS_FORCE_CANCELED,
+    TASK_STATUS_CANCELED_FOR_DEBUGGING,
+    TASK_STATUS_CANCELED_FROM_DEADLOCK,
+    TASK_STATUS_CANCELED_AFTER_COMPLETION
+]
+
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
     "DEPLOY_MIGRATION_SOURCE_RESOURCES")
     "DEPLOY_MIGRATION_SOURCE_RESOURCES")
 TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES = (
 TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES = (
@@ -132,3 +183,18 @@ EXECUTION_TYPE_REPLICA_DISKS_DELETE = "replica_disks_delete"
 EXECUTION_TYPE_REPLICA_DEPLOY = "replica_deploy"
 EXECUTION_TYPE_REPLICA_DEPLOY = "replica_deploy"
 EXECUTION_TYPE_MIGRATION = "migration"
 EXECUTION_TYPE_MIGRATION = "migration"
 EXECUTION_TYPE_REPLICA_UPDATE = "replica_update"
 EXECUTION_TYPE_REPLICA_UPDATE = "replica_update"
+
+TASK_LOCK_NAME_FORMAT = "task-%s"
+EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
+ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s"
+MIGRATION_LOCK_NAME_FORMAT = "migration-%s"
+REPLICA_LOCK_NAME_FORMAT = "replica-%s"
+SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
+
+EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = {
+    EXECUTION_TYPE_MIGRATION: MIGRATION_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_EXECUTION: REPLICA_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_DEPLOY: REPLICA_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_UPDATE: REPLICA_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_REPLICA_DISKS_DELETE: REPLICA_LOCK_NAME_FORMAT
+}

+ 58 - 10
coriolis/db/api.py

@@ -5,6 +5,7 @@ from oslo_config import cfg
 from oslo_db import api as db_api
 from oslo_db import api as db_api
 from oslo_db import options as db_options
 from oslo_db import options as db_options
 from oslo_db.sqlalchemy import enginefacade
 from oslo_db.sqlalchemy import enginefacade
+from oslo_log import log as logging
 from oslo_utils import timeutils
 from oslo_utils import timeutils
 from sqlalchemy import func
 from sqlalchemy import func
 from sqlalchemy import or_
 from sqlalchemy import or_
@@ -17,6 +18,7 @@ from coriolis import exception
 CONF = cfg.CONF
 CONF = cfg.CONF
 db_options.set_defaults(CONF)
 db_options.set_defaults(CONF)
 
 
+LOG = logging.getLogger(__name__)
 
 
 _BACKEND_MAPPING = {'sqlalchemy': 'coriolis.db.sqlalchemy.api'}
 _BACKEND_MAPPING = {'sqlalchemy': 'coriolis.db.sqlalchemy.api'}
 IMPL = db_api.DBAPI.from_config(CONF, backend_mapping=_BACKEND_MAPPING)
 IMPL = db_api.DBAPI.from_config(CONF, backend_mapping=_BACKEND_MAPPING)
@@ -449,17 +451,42 @@ def get_action(context, action_id):
 
 
 
 
 @enginefacade.writer
 @enginefacade.writer
-def set_transfer_action_info(context, action_id, instance, instance_info):
+def update_transfer_action_info_for_instance(
+        context, action_id, instance, new_instance_info):
+    """ Updates the info for the given action with the provided dict.
+    Returns the updated value.
+    Sub-fields of the dict already in the info will get overwritten entirely!
+    """
     action = get_action(context, action_id)
     action = get_action(context, action_id)
+    if not new_instance_info:
+        LOG.debug(
+            "No new info provided for action '%s' and instance '%s'. "
+            "Nothing to update in the DB.",
+            action_id, instance)
+        return action.info.get(instance, {})
 
 
     # Copy is needed, otherwise sqlalchemy won't save the changes
     # Copy is needed, otherwise sqlalchemy won't save the changes
     action_info = action.info.copy()
     action_info = action.info.copy()
     if instance in action_info:
     if instance in action_info:
-        instance_info_old = action_info[instance].copy()
-        instance_info_old.update(instance_info)
-        action_info[instance] = instance_info_old
-    else:
-        action_info[instance] = instance_info
+        instance_info_old = action_info[instance]
+        old_keys = set(instance_info_old.keys())
+        new_keys = set(new_instance_info.keys())
+        overwritten_keys = old_keys.intersection(new_keys)
+        if overwritten_keys:
+            LOG.debug(
+                "Overwriting the values of the following keys for info of "
+                "instance '%s' of action with ID '%s': %s",
+                instance, action_id, overwritten_keys)
+        newly_added_keys = new_keys.difference(old_keys)
+        if newly_added_keys:
+            LOG.debug(
+                "The following new keys will be added for info of instance "
+                "'%s' in action with ID '%s': %s",
+                instance, action_id, newly_added_keys)
+
+        instance_info_old_copy = instance_info_old.copy()
+        instance_info_old_copy.update(new_instance_info)
+        action_info[instance] = instance_info_old_copy
     action.info = action_info
     action.info = action_info
 
 
     return action_info[instance]
     return action_info[instance]
@@ -561,10 +588,31 @@ def update_replica(context, replica_id, updated_values):
     replica = get_replica(context, replica_id)
     replica = get_replica(context, replica_id)
     if not replica:
     if not replica:
         raise exception.NotFound("Replica not found")
         raise exception.NotFound("Replica not found")
-    for n in ["source_environment", "destination_environment", "notes",
-              "network_map", "storage_mappings"]:
-        if n in updated_values:
-            setattr(replica, n, updated_values[n])
+
+    mapped_info_fields = {
+        'destination_environment': 'target_environment'}
+
+    updateable_fields = [
+        "source_environment", "destination_environment", "notes",
+        "network_map", "storage_mappings"]
+    for field in updateable_fields:
+        if mapped_info_fields.get(field, field) in updated_values:
+            LOG.debug(
+                "Updating the '%s' field of Replica '%s' to: '%s'",
+                field, replica_id, updated_values[
+                    mapped_info_fields.get(field, field)])
+            setattr(
+                replica, field,
+                updated_values[mapped_info_fields.get(field, field)])
+
+    non_updateable_fields = set(
+        updated_values.keys()).difference({
+            mapped_info_fields.get(field, field)
+            for field in updateable_fields})
+    if non_updateable_fields:
+        LOG.warn(
+            "The following Replica fields can NOT be updated: %s",
+            non_updateable_fields)
 
 
     # the oslo_db library uses this method for both the `created_at` and
     # the oslo_db library uses this method for both the `created_at` and
     # `updated_at` fields
     # `updated_at` fields

+ 21 - 0
coriolis/exception.py

@@ -32,6 +32,10 @@ LOG = logging.getLogger(__name__)
 CONF = cfg.CONF
 CONF = cfg.CONF
 
 
 
 
+TASK_ALREADY_CANCELLING_EXCEPTION_FMT = (
+    "Task %(task_id)s is in CANCELLING status.")
+
+
 class ConvertedException(webob.exc.WSGIHTTPException):
 class ConvertedException(webob.exc.WSGIHTTPException):
 
 
     def __init__(self, code=500, title="", explanation=""):
     def __init__(self, code=500, title="", explanation=""):
@@ -180,6 +184,19 @@ class InvalidConfigurationValue(Invalid):
                 'configuration option "%(option)s"')
                 'configuration option "%(option)s"')
 
 
 
 
+class InvalidTaskState(Invalid):
+    message = _(
+        'Task "%(task_id)s" in in an invalid state: %(task_state)s')
+
+
+class TaskIsCancelling(InvalidTaskState):
+    message = _(TASK_ALREADY_CANCELLING_EXCEPTION_FMT)
+
+
+class InvalidTaskResult(InvalidTaskState):
+    message = _('Task returned an invalid result.')
+
+
 class InvalidActionTasksExecutionState(Invalid):
 class InvalidActionTasksExecutionState(Invalid):
     message = _("Invalid tasks execution state: %(reason)s")
     message = _("Invalid tasks execution state: %(reason)s")
 
 
@@ -309,6 +326,10 @@ class TaskProcessException(CoriolisException):
     safe = True
     safe = True
 
 
 
 
+class TaskProcessCanceledException(TaskProcessException):
+    pass
+
+
 class OperatingSystemNotFound(NotFound):
 class OperatingSystemNotFound(NotFound):
     pass
     pass
 
 

+ 2 - 2
coriolis/replicas/api.py

@@ -16,9 +16,9 @@ class API(object):
             source_environment, destination_environment, instances,
             source_environment, destination_environment, instances,
             network_map, storage_mappings, notes)
             network_map, storage_mappings, notes)
 
 
-    def update(self, ctxt, replica_id, properties):
+    def update(self, ctxt, replica_id, updated_properties):
         return self._rpc_client.update_replica(
         return self._rpc_client.update_replica(
-            ctxt, replica_id, properties)
+            ctxt, replica_id, updated_properties)
 
 
     def delete(self, ctxt, replica_id):
     def delete(self, ctxt, replica_id):
         self._rpc_client.delete_replica(ctxt, replica_id)
         self._rpc_client.delete_replica(ctxt, replica_id)

+ 41 - 0
coriolis/service.py

@@ -1,6 +1,7 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 # All Rights Reserved.
 
 
+import os
 import platform
 import platform
 
 
 from oslo_concurrency import processutils
 from oslo_concurrency import processutils
@@ -34,6 +35,46 @@ CONF.register_opts(service_opts)
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
 
 
 
 
+def check_locks_dir_empty():
+    """ Checks whether the locks dir is empty and warns otherwise.
+
+    NOTE: external oslo_concurrency locks work based on listing open file
+    descriptors so this check is not necessarily conclusive, though all freshly
+    started/restarted conductor services should ideally be given a clean slate.
+    """
+    oslo_concurrency_group = getattr(CONF, 'oslo_concurrency', {})
+    if not oslo_concurrency_group:
+        LOG.warn("No 'oslo_concurrency' group defined in config file!")
+        return
+
+    locks_dir = oslo_concurrency_group.get('lock_path', "")
+    if not locks_dir:
+        LOG.warn("No locks directory path was configured!")
+        return
+
+    if not os.path.exists(locks_dir):
+        LOG.warn(
+            "Configured 'lock_path' directory '%s' does NOT exist!", locks_dir)
+        return
+
+    if not os.path.isdir(locks_dir):
+        LOG.warn(
+            "Configured 'lock_path' directory '%s' is NOT a directory!",
+            locks_dir)
+        return
+
+    locks_dir_contents = os.listdir(locks_dir)
+    if locks_dir_contents:
+        LOG.warn(
+            "Configured 'lock_path' directory '%s' is NOT empty: %s",
+            locks_dir, locks_dir_contents)
+        return
+
+    LOG.info(
+        "Successfully checked 'lock_path' directory '%s' exists and is empty.",
+        locks_dir)
+
+
 class WSGIService(service.ServiceBase):
 class WSGIService(service.ServiceBase):
     def __init__(self, name):
     def __init__(self, name):
         self._host = CONF.api_migration_listen
         self._host = CONF.api_migration_listen

+ 68 - 1
coriolis/tasks/base.py

@@ -9,6 +9,7 @@ from oslo_log import log as logging
 from six import with_metaclass
 from six import with_metaclass
 
 
 from coriolis import constants
 from coriolis import constants
+from coriolis import exception
 from coriolis import utils
 from coriolis import utils
 from coriolis.providers import factory as providers_factory
 from coriolis.providers import factory as providers_factory
 
 
@@ -51,10 +52,76 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
 
 
         return required_libs
         return required_libs
 
 
+    @property
     @abc.abstractmethod
     @abc.abstractmethod
+    def required_task_info_properties(self):
+        """ Returns a list of the string fields which are required
+        to be present during the tasks' run method. """
+        pass
+
+    @property
+    @abc.abstractmethod
+    def returned_task_info_properties(self):
+        """ Returns a list of the string fields which are returned by the
+        tasks' run method to be added to the task info.
+        """
+        pass
+
+    @abc.abstractmethod
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        """ The actual logic run by the task.
+        Should return a dict with all the fields declared by
+        'self.returned_task_info_properties'.
+        Must be implemented in all child classes.
+        """
+        pass
+
     def run(self, ctxt, instance, origin, destination, task_info,
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):
-        pass
+        """ Runs the task with the given params and returns
+        a dict with the results.
+        NOTE: This should NOT modify the existing task_info in any way.
+        """
+        missing_info_props = [
+            prop for prop in self.required_task_info_properties
+            if prop not in task_info]
+        if missing_info_props:
+            raise exception.CoriolisException(
+                "Task type '%s' asked to run on task info with "
+                "missing properties: %s" % (
+                    self.__class__, missing_info_props))
+
+        result = self._run(
+            ctxt, instance, origin, destination, task_info, event_handler)
+
+        if type(result) is not dict:
+            raise exception.CoriolisException(
+                "Task type '%s' returned result of type %s "
+                "instead of a dict: %s" % (
+                    self.__class__, type(result), result))
+
+        missing_returns = [
+            prop for prop in self.returned_task_info_properties
+            if prop not in result.keys()]
+        if missing_returns:
+            raise exception.CoriolisException(
+                "Task type '%s' failed to return the following "
+                "declared return values in its result: %s. "
+                "Result was: %s" % (
+                    self.__class__, missing_returns,
+                    utils.filter_chunking_info_for_task(result)))
+
+        undeclared_returns = [
+            prop for prop in result.keys()
+            if prop not in self.returned_task_info_properties]
+        if undeclared_returns:
+            raise exception.CoriolisException(
+                "Task type '%s' returned the following undeclared "
+                "keys in its result: %s" % (
+                    self.__class__, undeclared_returns))
+
+        return result
 
 
 
 
 def get_connection_info(ctxt, data):
 def get_connection_info(ctxt, data):

+ 2 - 2
coriolis/tasks/factory.py

@@ -81,9 +81,9 @@ _TASKS_MAP = {
 }
 }
 
 
 
 
-def get_task_runner(task_type):
+def get_task_runner_class(task_type):
     cls = _TASKS_MAP.get(task_type)
     cls = _TASKS_MAP.get(task_type)
     if not cls:
     if not cls:
         raise exception.NotFound(
         raise exception.NotFound(
             "TaskRunner not found for task type: %s" % task_type)
             "TaskRunner not found for task type: %s" % task_type)
-    return cls()
+    return cls

+ 18 - 9
coriolis/tasks/migration_tasks.py

@@ -13,29 +13,38 @@ LOG = logging.getLogger(__name__)
 
 
 
 
 class GetOptimalFlavorTask(base.TaskRunner):
 class GetOptimalFlavorTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_INSTANCE_FLAVOR,
             destination["type"], constants.PROVIDER_TYPE_INSTANCE_FLAVOR,
             event_handler)
             event_handler)
 
 
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
-        target_environment = destination.get("target_environment") or {}
+        target_environment = task_info["target_environment"]
         export_info = task_info["export_info"]
         export_info = task_info["export_info"]
 
 
         flavor = provider.get_optimal_flavor(
         flavor = provider.get_optimal_flavor(
             ctxt, connection_info, target_environment, export_info)
             ctxt, connection_info, target_environment, export_info)
 
 
-        if task_info.get("instance_deployment_info") is None:
-            task_info["instance_deployment_info"] = {}
-        task_info["instance_deployment_info"]["selected_flavor"] = flavor
+        instance_deployment_info = task_info.get("instance_deployment_info")
+        if instance_deployment_info is None:
+            instance_deployment_info = {}
+        instance_deployment_info["selected_flavor"] = flavor
 
 
         events.EventManager(event_handler).progress_update(
         events.EventManager(event_handler).progress_update(
             "Selected flavor: %s" % flavor)
             "Selected flavor: %s" % flavor)
 
 
-        task_info["retain_export_path"] = True
-
-        return task_info
+        return {
+            "instance_deployment_info": instance_deployment_info}
 
 
 
 
 class DeployMigrationSourceResourcesTask(
 class DeployMigrationSourceResourcesTask(

+ 82 - 22
coriolis/tasks/osmorphing_tasks.py

@@ -1,25 +1,41 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 # All Rights Reserved.
 
 
+from oslo_log import log as logging
+
 from coriolis import constants
 from coriolis import constants
+from coriolis import exception
+from coriolis import schemas
 from coriolis.osmorphing import manager as osmorphing_manager
 from coriolis.osmorphing import manager as osmorphing_manager
 from coriolis.providers import factory as providers_factory
 from coriolis.providers import factory as providers_factory
-from coriolis import schemas
 from coriolis.tasks import base
 from coriolis.tasks import base
 
 
 
 
+LOG = logging.getLogger(__name__)
+
+
 class OSMorphingTask(base.TaskRunner):
 class OSMorphingTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
 
 
-        origin_provider_type = task_info["origin_provider_type"]
-        destination_provider_type = task_info["destination_provider_type"]
+    @property
+    def required_task_info_properties(self):
+        return [
+            "osmorphing_info", "osmorphing_connection_info",
+            "user_scripts"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
 
 
         origin_provider = providers_factory.get_provider(
         origin_provider = providers_factory.get_provider(
-            origin["type"], origin_provider_type, event_handler)
+            origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
+            event_handler)
 
 
         destination_provider = providers_factory.get_provider(
         destination_provider = providers_factory.get_provider(
-            destination["type"], destination_provider_type, event_handler)
+            destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
+            event_handler)
 
 
         osmorphing_connection_info = base.unmarshal_migr_conn_info(
         osmorphing_connection_info = base.unmarshal_migr_conn_info(
             task_info['osmorphing_connection_info'])
             task_info['osmorphing_connection_info'])
@@ -43,12 +59,23 @@ class OSMorphingTask(base.TaskRunner):
             instance_script,
             instance_script,
             event_handler)
             event_handler)
 
 
-        return task_info
+        return {}
 
 
 
 
 class DeployOSMorphingResourcesTask(base.TaskRunner):
 class DeployOSMorphingResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return [
+            "os_morphing_resources", "osmorphing_info",
+            "osmorphing_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_OS_MORPHING,
             destination["type"], constants.PROVIDER_TYPE_OS_MORPHING,
             event_handler)
             event_handler)
@@ -58,21 +85,52 @@ class DeployOSMorphingResourcesTask(base.TaskRunner):
         import_info = provider.deploy_os_morphing_resources(
         import_info = provider.deploy_os_morphing_resources(
             ctxt, connection_info, instance_deployment_info)
             ctxt, connection_info, instance_deployment_info)
 
 
-        task_info["os_morphing_resources"] = import_info.get(
-            "os_morphing_resources")
-        task_info["osmorphing_info"] = import_info.get("osmorphing_info", {})
-        task_info["osmorphing_connection_info"] = base.marshal_migr_conn_info(
-            import_info["osmorphing_connection_info"])
-
         schemas.validate_value(
         schemas.validate_value(
-            task_info, schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA)
-
-        return task_info
+            import_info, schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
+
+        os_morphing_resources = import_info.get('os_morphing_resources')
+        if not os_morphing_resources:
+            raise exception.InvalidTaskResult(
+                "Target provider for '%s' did NOT return any "
+                "'os_morphing_resources'." % (
+                    destination["type"]))
+
+        osmorphing_connection_info = import_info.get(
+            'osmorphing_connection_info')
+        if not osmorphing_connection_info:
+            raise exception.InvalidTaskResult(
+                "Target provider '%s' did NOT return any "
+                "'osmorphing_connection_info'." % (
+                    destination["type"]))
+
+        os_morphing_info = import_info.get("osmorphing_info", {})
+        if not os_morphing_info:
+            LOG.warn(
+                "Target provider for '%s' did NOT return any "
+                "'osmorphing_info'. Defaulting to %s",
+                destination["type"], os_morphing_info)
+
+        return {
+            "os_morphing_resources": os_morphing_resources,
+            "osmorphing_connection_info": osmorphing_connection_info,
+            "osmorphing_info": os_morphing_info}
 
 
 
 
 class DeleteOSMorphingResourcesTask(base.TaskRunner):
 class DeleteOSMorphingResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["os_morphing_resources"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["os_morphing_resources", "osmorphing_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_OS_MORPHING,
             destination["type"], constants.PROVIDER_TYPE_OS_MORPHING,
             event_handler)
             event_handler)
@@ -82,4 +140,6 @@ class DeleteOSMorphingResourcesTask(base.TaskRunner):
         provider.delete_os_morphing_resources(
         provider.delete_os_morphing_resources(
             ctxt, connection_info, os_morphing_resources)
             ctxt, connection_info, os_morphing_resources)
 
 
-        return task_info
+        return {
+            "os_morphing_resources": None,
+            "osmorphing_connection_info": None}

+ 402 - 168
coriolis/tasks/replica_tasks.py

@@ -16,7 +16,7 @@ LOG = logging.getLogger(__name__)
 
 
 
 
 def _get_volumes_info(task_info):
 def _get_volumes_info(task_info):
-    volumes_info = task_info.get("volumes_info")
+    volumes_info = task_info.get("volumes_info", [])
     if not volumes_info:
     if not volumes_info:
         raise exception.InvalidActionTasksExecutionState(
         raise exception.InvalidActionTasksExecutionState(
             "No volumes information present")
             "No volumes information present")
@@ -37,21 +37,21 @@ def _check_ensure_volumes_info_ordering(export_info, volumes_info):
         matching_volumes = [
         matching_volumes = [
             vol for vol in volumes_info if vol['disk_id'] == disk_id]
             vol for vol in volumes_info if vol['disk_id'] == disk_id]
         if not matching_volumes:
         if not matching_volumes:
-            raise exception.CoriolisException(
+            raise exception.InvalidActionTasksExecutionState(
                 "Could not find source disk '%s' (ID '%s') in Replica "
                 "Could not find source disk '%s' (ID '%s') in Replica "
                 "volumes info: %s" % (disk, disk_id, volumes_info))
                 "volumes info: %s" % (disk, disk_id, volumes_info))
         elif len(matching_volumes) > 1:
         elif len(matching_volumes) > 1:
-            raise exception.CoriolisException(
+            raise exception.InvalidActionTasksExecutionState(
                 "Multiple disks with ID '%s' foind in Replica "
                 "Multiple disks with ID '%s' foind in Replica "
                 "volumes info: %s" % (disk_id, volumes_info))
                 "volumes info: %s" % (disk_id, volumes_info))
 
 
         ordered_volumes_info.append(matching_volumes[0])
         ordered_volumes_info.append(matching_volumes[0])
 
 
     vol_info_cpy = utils.filter_chunking_info_for_task(
     vol_info_cpy = utils.filter_chunking_info_for_task(
-        {"volumes_info": volumes_info}).get("volumes_info")
+        {"volumes_info": volumes_info}).get("volumes_info", [])
 
 
     ordered_vol_info_cpy = utils.filter_chunking_info_for_task(
     ordered_vol_info_cpy = utils.filter_chunking_info_for_task(
-        {"volumes_info": ordered_volumes_info}).get("volumes_info")
+        {"volumes_info": ordered_volumes_info}).get("volumes_info", [])
 
 
     LOG.debug(
     LOG.debug(
         "volumes_info returned by provider for instance "
         "volumes_info returned by provider for instance "
@@ -64,42 +64,74 @@ def _check_ensure_volumes_info_ordering(export_info, volumes_info):
 
 
 
 
 class GetInstanceInfoTask(base.TaskRunner):
 class GetInstanceInfoTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    """ Task which gathers the export info for a VM.  """
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["export_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
         connection_info = base.get_connection_info(ctxt, origin)
 
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment']
         export_info = provider.get_replica_instance_info(
         export_info = provider.get_replica_instance_info(
             ctxt, connection_info, source_environment, instance)
             ctxt, connection_info, source_environment, instance)
 
 
         # Validate the output
         # Validate the output
         schemas.validate_value(
         schemas.validate_value(
             export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
             export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
-        task_info["export_info"] = export_info
 
 
-        return task_info
+        return {
+            'export_info': export_info}
 
 
 
 
 class ShutdownInstanceTask(base.TaskRunner):
 class ShutdownInstanceTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    """ Task which shuts down a VM. """
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
         connection_info = base.get_connection_info(ctxt, origin)
 
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment']
         provider.shutdown_instance(ctxt, connection_info, source_environment,
         provider.shutdown_instance(ctxt, connection_info, source_environment,
                                    instance)
                                    instance)
-
-        return task_info
+        return {}
 
 
 
 
 class ReplicateDisksTask(base.TaskRunner):
 class ReplicateDisksTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return [
+            "export_info", "volumes_info", "source_environment",
+            "source_resources",
+            "source_resources_connection_info",
+            "target_resources_connection_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
@@ -112,7 +144,7 @@ class ReplicateDisksTask(base.TaskRunner):
             {"volumes_info": volumes_info},
             {"volumes_info": volumes_info},
             schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
             schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
 
 
-        migr_source_conn_info = task_info["migr_source_connection_info"]
+        migr_source_conn_info = task_info["source_resources_connection_info"]
         if migr_source_conn_info:
         if migr_source_conn_info:
             schemas.validate_value(
             schemas.validate_value(
                 migr_source_conn_info,
                 migr_source_conn_info,
@@ -120,11 +152,17 @@ class ReplicateDisksTask(base.TaskRunner):
         migr_source_conn_info = base.unmarshal_migr_conn_info(
         migr_source_conn_info = base.unmarshal_migr_conn_info(
             migr_source_conn_info)
             migr_source_conn_info)
 
 
-        migr_target_conn_info = task_info["migr_target_connection_info"]
+        migr_target_conn_info = task_info["target_resources_connection_info"]
         incremental = task_info.get("incremental", True)
         incremental = task_info.get("incremental", True)
 
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment']
 
 
+        # TODO(aznashwan): in order to facilitate parallelized setups,
+        # we should modify the replicate_disks provider method to allow for the
+        # passing in of source_resources info as well.
+        # This could be used to for example pass in the ID/info of a
+        # pre-created source worker VM which can then be (re)used by the
+        # Replicate disks task during PMR.
         volumes_info = provider.replicate_disks(
         volumes_info = provider.replicate_disks(
             ctxt, connection_info, source_environment, instance,
             ctxt, connection_info, source_environment, instance,
             migr_source_conn_info, migr_target_conn_info, volumes_info,
             migr_source_conn_info, migr_target_conn_info, volumes_info,
@@ -135,15 +173,24 @@ class ReplicateDisksTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
             export_info, volumes_info)
 
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            'volumes_info': volumes_info}
 
 
 
 
 class DeployReplicaDisksTask(base.TaskRunner):
 class DeployReplicaDisksTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return [
+            "export_info", "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):
-        target_environment = destination.get("target_environment") or {}
+        target_environment = task_info['target_environment']
         export_info = task_info["export_info"]
         export_info = task_info["export_info"]
 
 
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
@@ -152,10 +199,6 @@ class DeployReplicaDisksTask(base.TaskRunner):
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
 
 
         volumes_info = task_info.get("volumes_info", [])
         volumes_info = task_info.get("volumes_info", [])
-        if volumes_info is None:
-            # In case Replica disks were deleted:
-            volumes_info = []
-
         volumes_info = provider.deploy_replica_disks(
         volumes_info = provider.deploy_replica_disks(
             ctxt, connection_info, target_environment, instance, export_info,
             ctxt, connection_info, target_environment, instance, export_info,
             volumes_info)
             volumes_info)
@@ -165,18 +208,27 @@ class DeployReplicaDisksTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
             export_info, volumes_info)
 
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            'volumes_info': volumes_info}
 
 
 
 
 class DeleteReplicaDisksTask(base.TaskRunner):
 class DeleteReplicaDisksTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return [
+            "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):
         if not task_info.get("volumes_info"):
         if not task_info.get("volumes_info"):
             LOG.debug(
             LOG.debug(
                 "No volumes_info present. Skipping disk deletion.")
                 "No volumes_info present. Skipping disk deletion.")
-            return task_info
+            return {'volumes_info': []}
 
 
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
@@ -185,74 +237,123 @@ class DeleteReplicaDisksTask(base.TaskRunner):
 
 
         volumes_info = _get_volumes_info(task_info)
         volumes_info = _get_volumes_info(task_info)
 
 
+        # TODO (aznashwan): add target_env options to `delete_replica_disks`:
         volumes_info = provider.delete_replica_disks(
         volumes_info = provider.delete_replica_disks(
             ctxt, connection_info, volumes_info)
             ctxt, connection_info, volumes_info)
         if volumes_info:
         if volumes_info:
             LOG.warn(
             LOG.warn(
                 "'volumes_info' should have been void after disk "
                 "'volumes_info' should have been void after disk "
-                "deletion: %s" % volumes_info)
-
-        task_info["volumes_info"] = []
+                "deletion task but it is: %s" % volumes_info)
+        elif volumes_info is None:
+            volumes_info = []
 
 
-        return task_info
+        return {
+            'volumes_info': volumes_info}
 
 
 
 
 class DeployReplicaSourceResourcesTask(base.TaskRunner):
 class DeployReplicaSourceResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["source_resources", "source_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
         connection_info = base.get_connection_info(ctxt, origin)
 
 
-        source_environment = origin.get('source_environment') or {}
+        source_environment = task_info['source_environment'] or {}
         replica_resources_info = provider.deploy_replica_source_resources(
         replica_resources_info = provider.deploy_replica_source_resources(
             ctxt, connection_info, source_environment)
             ctxt, connection_info, source_environment)
 
 
-        task_info["migr_source_resources"] = replica_resources_info[
-            "migr_resources"]
-        migr_connection_info = replica_resources_info.get("connection_info")
-        if migr_connection_info:
-            migr_connection_info = base.marshal_migr_conn_info(
-                migr_connection_info)
-            schemas.validate_value(
-                migr_connection_info,
-                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
-                # NOTE: we avoid raising so that the cleanup task
-                # can [try] to deal with the temporary resources.
-                raise_on_error=False)
-
-        task_info["migr_source_connection_info"] = migr_connection_info
+        migr_connection_info = replica_resources_info.get(
+            "connection_info", {})
+        if 'connection_info' not in replica_resources_info:
+            LOG.warn(
+                "Replica source provider for '%s' did NOT return any "
+                "'connection_info'. Defaulting to '%s'",
+                origin["type"], migr_connection_info)
+        else:
+            migr_connection_info = replica_resources_info['connection_info']
+            if migr_connection_info:
+                migr_connection_info = base.marshal_migr_conn_info(
+                    migr_connection_info)
+                schemas.validate_value(
+                    migr_connection_info,
+                    schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+                    # NOTE: we avoid raising so that the cleanup task
+                    # can [try] to deal with the temporary resources.
+                    raise_on_error=False)
+            else:
+                LOG.warn(
+                    "Replica source provider for '%s' returned empty "
+                    "'connection_info' in source resources deployment: %s",
+                    origin["type"], migr_connection_info)
+
+        migr_resources = {}
+        if 'migr_resources' not in replica_resources_info:
+            LOG.warn(
+                "Replica source provider for '%s' did NOT return any "
+                "'migr_resources'. Defaulting to %s",
+                origin["type"], migr_resources)
+        else:
+            migr_resources = replica_resources_info['migr_resources']
 
 
-        return task_info
+        return {
+            "source_resources": migr_resources,
+            "source_resources_connection_info": migr_connection_info}
 
 
 
 
 class DeleteReplicaSourceResourcesTask(base.TaskRunner):
 class DeleteReplicaSourceResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment", "source_resources"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["source_resources", "source_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
         connection_info = base.get_connection_info(ctxt, origin)
 
 
-        migr_resources = task_info.get("migr_source_resources")
-
-        source_environment = origin.get("source_environment", {})
+        migr_resources = task_info["source_resources"]
+        source_environment = origin["source_environment"]
 
 
         if migr_resources:
         if migr_resources:
             provider.delete_replica_source_resources(
             provider.delete_replica_source_resources(
                 ctxt, connection_info, source_environment, migr_resources)
                 ctxt, connection_info, source_environment, migr_resources)
 
 
-        task_info["migr_source_resources"] = None
-        task_info["migr_source_connection_info"] = None
-
-        return task_info
+        return {
+            "source_resources": None,
+            "source_resources_connection_info": None}
 
 
 
 
 class DeployReplicaTargetResourcesTask(base.TaskRunner):
 class DeployReplicaTargetResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
-        target_environment = destination.get("target_environment") or {}
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return [
+            "volumes_info", "target_resources",
+            "target_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        target_environment = task_info["target_environment"]
         export_info = task_info['export_info']
         export_info = task_info['export_info']
 
 
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
@@ -271,50 +372,89 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
             # can [try] to deal with the temporary resources.
             # can [try] to deal with the temporary resources.
             raise_on_error=False)
             raise_on_error=False)
 
 
-        volumes_info = _check_ensure_volumes_info_ordering(
-            export_info, replica_resources_info["volumes_info"])
+        if "volumes_info" in replica_resources_info:
+            volumes_info = replica_resources_info["volumes_info"]
+            volumes_info = _check_ensure_volumes_info_ordering(
+                export_info, volumes_info)
+        else:
+            LOG.warn(
+                "Replica target provider for '%s' did not return any "
+                "'volumes_info'. Using the previous value of it.")
+
+        migr_connection_info = {}
+        if 'connection_info' in replica_resources_info:
+            migr_connection_info = replica_resources_info['connection_info']
+            try:
+                backup_writers.BackupWritersFactory(
+                    migr_connection_info, None).get_writer()
+            except Exception as err:
+                LOG.warn(
+                    "Seemingly invalid connection info. Replica will likely "
+                    "fail during disk Replication. Error is: %s" % str(err))
+        else:
+            LOG.warn(
+                "Replica target provider for '%s' did NOT return any "
+                "'connection_info'. Defaulting to %s",
+                destination["type"], migr_connection_info)
 
 
-        task_info["volumes_info"] = volumes_info
-        task_info["migr_target_resources"] = replica_resources_info[
-            "migr_resources"]
+        target_resources = {}
+        if 'migr_resources' not in replica_resources_info:
+            LOG.warn(
+                "Replica target provider for '%s' did NOT return any "
+                "'migr_resources'. Defaulting to %s",
+                destination["type"], target_resources)
+        else:
+            target_resources = replica_resources_info["migr_resources"]
 
 
-        migr_connection_info = replica_resources_info["connection_info"]
-        try:
-            backup_writers.BackupWritersFactory(
-                migr_connection_info, None).get_writer()
-        except BaseException as err:
-            LOG.exception(
-                "Invalid connection info: %s" % err)
+        return {
+            "volumes_info": volumes_info,
+            "target_resources": target_resources,
+            "target_resources_connection_info": migr_connection_info}
 
 
-        task_info["migr_target_connection_info"] = migr_connection_info
 
 
-        return task_info
+class DeleteReplicaTargetResourcesTask(base.TaskRunner):
 
 
+    @property
+    def required_task_info_properties(self):
+        return ["target_resources", "target_environment"]
 
 
-class DeleteReplicaTargetResourcesTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+    @property
+    def returned_task_info_properties(self):
+        return [
+            "target_resources", "target_resources_connection_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
 
 
-        migr_resources = task_info.get("migr_target_resources")
+        migr_resources = task_info.get("target_resources")
 
 
         if migr_resources:
         if migr_resources:
+            # TODO (aznashwan): add 'target_env' param to call:
             provider.delete_replica_target_resources(
             provider.delete_replica_target_resources(
                 ctxt, connection_info, migr_resources)
                 ctxt, connection_info, migr_resources)
 
 
-        task_info["migr_target_resources"] = None
-        task_info["migr_target_connection_info"] = None
-
-        return task_info
+        return {
+            "target_resources": None,
+            "target_resources_connection_info": None}
 
 
 
 
 class DeployReplicaInstanceTask(base.TaskRunner):
 class DeployReplicaInstanceTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
-        target_environment = destination.get("target_environment") or {}
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment", "clone_disks"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        target_environment = task_info["target_environment"]
         export_info = task_info["export_info"]
         export_info = task_info["export_info"]
 
 
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
@@ -330,23 +470,23 @@ class DeployReplicaInstanceTask(base.TaskRunner):
             ctxt, connection_info, target_environment, instance,
             ctxt, connection_info, target_environment, instance,
             export_info, volumes_info, clone_disks)
             export_info, volumes_info, clone_disks)
 
 
-        if task_info.get("instance_deployment_info") is None:
-            task_info["instance_deployment_info"] = {}
-        task_info["instance_deployment_info"].update(import_info[
-            "instance_deployment_info"])
+        return {
+            "instance_deployment_info": import_info[
+                'instance_deployment_info']}
 
 
-        task_info[
-            "origin_provider_type"] = constants.PROVIDER_TYPE_REPLICA_EXPORT
-        task_info[
-            "destination_provider_type"
-        ] = constants.PROVIDER_TYPE_REPLICA_IMPORT
 
 
-        return task_info
+class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
 
 
+    @property
+    def required_task_info_properties(self):
+        return ["instance_deployment_info"]
 
 
-class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    @property
+    def returned_task_info_properties(self):
+        return ["transfer_result"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
             event_handler)
@@ -355,19 +495,27 @@ class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
 
 
         result = provider.finalize_replica_instance_deployment(
         result = provider.finalize_replica_instance_deployment(
             ctxt, connection_info, instance_deployment_info)
             ctxt, connection_info, instance_deployment_info)
-        if result is not None:
-            task_info["transfer_result"] = result
-        else:
+        if result is None:
             LOG.warn(
             LOG.warn(
                 "'None' was returned as result for Finalize Replica Instance "
                 "'None' was returned as result for Finalize Replica Instance "
                 "deployment task '%s'.", task_info)
                 "deployment task '%s'.", task_info)
 
 
-        return task_info
+        return {
+            "transfer_result": result}
 
 
 
 
 class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
 class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["instance_deployment_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
             event_handler)
@@ -378,12 +526,22 @@ class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
         provider.cleanup_failed_replica_instance_deployment(
         provider.cleanup_failed_replica_instance_deployment(
             ctxt, connection_info, instance_deployment_info)
             ctxt, connection_info, instance_deployment_info)
 
 
-        return task_info
+        return {
+            "instance_deployment_info": None}
 
 
 
 
 class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
 class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
             event_handler)
@@ -400,14 +558,22 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
             export_info, volumes_info)
 
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}
 
 
 
 
 class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
 class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         export_info = task_info['export_info']
         export_info = task_info['export_info']
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
@@ -424,14 +590,22 @@ class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
             export_info, volumes_info)
 
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}
 
 
 
 
 class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
 class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
             event_handler)
@@ -448,14 +622,22 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
         volumes_info = _check_ensure_volumes_info_ordering(
         volumes_info = _check_ensure_volumes_info_ordering(
             export_info, volumes_info)
             export_info, volumes_info)
 
 
-        task_info["volumes_info"] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info}
 
 
 
 
 class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
 class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         event_manager = events.EventManager(event_handler)
         origin_type = origin["type"]
         origin_type = origin["type"]
         source_provider = providers_factory.get_provider(
         source_provider = providers_factory.get_provider(
@@ -469,12 +651,21 @@ class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
         else:
         else:
             source_provider.validate_replica_export_input(
             source_provider.validate_replica_export_input(
                 ctxt, origin_connection_info, instance,
                 ctxt, origin_connection_info, instance,
-                source_environment=origin.get("source_environment", {}))
+                source_environment=task_info["source_environment"])
 
 
-        return task_info
+        return {}
 
 
 
 
 class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
 class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
     def _validate_provider_replica_import_input(
     def _validate_provider_replica_import_input(
             self, provider, ctxt, conn_info, target_environment, export_info):
             self, provider, ctxt, conn_info, target_environment, export_info):
         provider.validate_replica_import_input(
         provider.validate_replica_import_input(
@@ -482,8 +673,8 @@ class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
             check_os_morphing_resources=False,
             check_os_morphing_resources=False,
             check_final_vm_params=False)
             check_final_vm_params=False)
 
 
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         event_manager = events.EventManager(event_handler)
         destination_type = destination["type"]
         destination_type = destination["type"]
 
 
@@ -497,28 +688,35 @@ class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
             event_manager.progress_update(
             event_manager.progress_update(
                 "Replica Import Provider for platform '%s' does not support "
                 "Replica Import Provider for platform '%s' does not support "
                 "Replica input validation" % destination_type)
                 "Replica input validation" % destination_type)
-            return task_info
+            return {}
 
 
         export_info = task_info.get("export_info")
         export_info = task_info.get("export_info")
         if not export_info:
         if not export_info:
-            raise exception.CoriolisException(
+            raise exception.InvalidActionTasksExecutionState(
                 "Instance export info is not set. Cannot perform "
                 "Instance export info is not set. Cannot perform "
                 "Replica Import validation for destination platform "
                 "Replica Import validation for destination platform "
                 "'%s'" % destination_type)
                 "'%s'" % destination_type)
 
 
-        # NOTE: the target environment JSON schema should have been validated
-        # upon accepting the Replica API creation request.
-        target_environment = destination.get("target_environment", {})
+        target_environment = task_info["target_environment"]
         self._validate_provider_replica_import_input(
         self._validate_provider_replica_import_input(
             destination_provider, ctxt, destination_connection_info,
             destination_provider, ctxt, destination_connection_info,
             target_environment, export_info)
             target_environment, export_info)
 
 
-        return task_info
+        return {}
 
 
 
 
 class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
 class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return []
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         event_manager = events.EventManager(event_handler)
         destination_connection_info = base.get_connection_info(
         destination_connection_info = base.get_connection_info(
             ctxt, destination)
             ctxt, destination)
@@ -538,43 +736,55 @@ class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
                 "Replica Deployment Provider for platform '%s' does not "
                 "Replica Deployment Provider for platform '%s' does not "
                 "support Replica Deployment input validation" % (
                 "support Replica Deployment input validation" % (
                     destination_type))
                     destination_type))
-            return task_info
+            return {}
 
 
         # NOTE: the target environment JSON schema should have been validated
         # NOTE: the target environment JSON schema should have been validated
         # upon accepting the Replica API creation request.
         # upon accepting the Replica API creation request.
-        target_environment = destination.get("target_environment", {})
+        target_environment = task_info['target_environment']
         destination_provider.validate_replica_deployment_input(
         destination_provider.validate_replica_deployment_input(
             ctxt, destination_connection_info, target_environment, export_info)
             ctxt, destination_connection_info, target_environment, export_info)
 
 
-        return task_info
+        return {}
 
 
 
 
 class UpdateSourceReplicaTask(base.TaskRunner):
 class UpdateSourceReplicaTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
+
+    @property
+    def required_task_info_properties(self):
+        return ["volumes_info", "source_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info", "source_environment"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
         event_manager = events.EventManager(event_handler)
         event_manager = events.EventManager(event_handler)
+
+        volumes_info = task_info.get("volumes_info", [])
         new_source_env = task_info.get('source_environment', {})
         new_source_env = task_info.get('source_environment', {})
+        # NOTE: the `source_environment` in the `origin` is the one set
+        # in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_source_env = origin.get('source_environment')
         if not new_source_env:
         if not new_source_env:
             event_manager.progress_update(
             event_manager.progress_update(
                 "No new source environment options provided")
                 "No new source environment options provided")
-            return task_info
+            return {
+                'volumes_info': volumes_info,
+                'source_environment': old_source_env}
 
 
         source_provider = providers_factory.get_provider(
         source_provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_SOURCE_REPLICA_UPDATE,
             origin["type"], constants.PROVIDER_TYPE_SOURCE_REPLICA_UPDATE,
             event_handler, raise_if_not_found=False)
             event_handler, raise_if_not_found=False)
         if not source_provider:
         if not source_provider:
-            raise exception.CoriolisException(
+            raise exception.InvalidActionTasksExecutionState(
                 "Replica source provider plugin for '%s' does not support"
                 "Replica source provider plugin for '%s' does not support"
                 " updating Replicas" % origin["type"])
                 " updating Replicas" % origin["type"])
 
 
         origin_connection_info = base.get_connection_info(ctxt, origin)
         origin_connection_info = base.get_connection_info(ctxt, origin)
-        volumes_info = task_info.get("volumes_info", [])
 
 
         LOG.info("Checking source provider environment params")
         LOG.info("Checking source provider environment params")
-        # NOTE: the `source_environment` in the `origin` is the one set
-        # in the dedicated DB column of the Replica and thus stores
-        # the previous value of it:
-        old_source_env = origin.get('source_environment', {})
         volumes_info = (
         volumes_info = (
             source_provider.check_update_source_environment_params(
             source_provider.check_update_source_environment_params(
                 ctxt, origin_connection_info, instance, volumes_info,
                 ctxt, origin_connection_info, instance, volumes_info,
@@ -582,41 +792,59 @@ class UpdateSourceReplicaTask(base.TaskRunner):
         if volumes_info:
         if volumes_info:
             schemas.validate_value(
             schemas.validate_value(
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+        else:
+            LOG.warn(
+                "Source update method for '%s' source provider did NOT "
+                "return any volumes info. Defaulting to old value.",
+                origin["type"])
+            volumes_info = task_info.get("volumes_info", [])
 
 
-        task_info['volumes_info'] = volumes_info
-
-        return task_info
+        return {
+            "volumes_info": volumes_info,
+            "source_environment": new_source_env}
 
 
 
 
 class UpdateDestinationReplicaTask(base.TaskRunner):
 class UpdateDestinationReplicaTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
+
+    @property
+    def required_task_info_properties(self):
+        return ["export_info", "volumes_info", "target_environment"]
+
+    @property
+    def returned_task_info_properties(self):
+        return ["volumes_info", "target_environment"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):
         event_manager = events.EventManager(event_handler)
         event_manager = events.EventManager(event_handler)
-        new_destination_env = task_info.get('destination_environment', {})
+
+        volumes_info = task_info.get("volumes_info", [])
+        new_destination_env = task_info.get('target_environment', {})
+        # NOTE: the `target_environment` in the `destination` is the one
+        # set in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_destination_env = destination.get('target_environment', {})
         if not new_destination_env:
         if not new_destination_env:
             event_manager.progress_update(
             event_manager.progress_update(
                 "No new destination environment options provided")
                 "No new destination environment options provided")
-            return task_info
+            return {
+                "target_environment": old_destination_env,
+                "volumes_info": volumes_info}
 
 
         destination_provider = providers_factory.get_provider(
         destination_provider = providers_factory.get_provider(
             destination["type"],
             destination["type"],
             constants.PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE,
             constants.PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE,
             event_handler, raise_if_not_found=False)
             event_handler, raise_if_not_found=False)
         if not destination_provider:
         if not destination_provider:
-            raise exception.CoriolisException(
+            raise exception.InvalidActionTasksExecutionState(
                 "Replica destination provider plugin for '%s' does not "
                 "Replica destination provider plugin for '%s' does not "
                 "support updating Replicas" % destination["type"])
                 "support updating Replicas" % destination["type"])
 
 
         destination_connection_info = base.get_connection_info(
         destination_connection_info = base.get_connection_info(
             ctxt, destination)
             ctxt, destination)
         export_info = task_info.get("export_info", {})
         export_info = task_info.get("export_info", {})
-        volumes_info = task_info.get("volumes_info", [])
 
 
         LOG.info("Checking destination provider environment params")
         LOG.info("Checking destination provider environment params")
-        # NOTE: the `target_environment` in the `destination` is the one
-        # set in the dedicated DB column of the Replica and thus stores
-        # the previous value of it:
-        old_destination_env = destination.get('target_environment', {})
         volumes_info = (
         volumes_info = (
             destination_provider.check_update_destination_environment_params(
             destination_provider.check_update_destination_environment_params(
                 ctxt, destination_connection_info, export_info, volumes_info,
                 ctxt, destination_connection_info, export_info, volumes_info,
@@ -627,7 +855,13 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
             volumes_info = _check_ensure_volumes_info_ordering(
             volumes_info = _check_ensure_volumes_info_ordering(
                 export_info, volumes_info)
                 export_info, volumes_info)
-
-        task_info['volumes_info'] = volumes_info
-
-        return task_info
+        else:
+            LOG.warn(
+                "Destination update method for '%s' dest provider did NOT "
+                "return any volumes info. Defaulting to old value.",
+                destination["type"])
+            volumes_info = task_info.get("volumes_info", [])
+
+        return {
+            "volumes_info": volumes_info,
+            "target_environment": new_destination_env}

+ 25 - 9
coriolis/utils.py

@@ -580,15 +580,31 @@ def filter_chunking_info_for_task(task_info):
     """ Returns a copy of the given task info with any chunking
     """ Returns a copy of the given task info with any chunking
     info on volumes removed.
     info on volumes removed.
     """
     """
-    cpy = copy.deepcopy(task_info)
-    if not cpy.get("volumes_info"):
-        return cpy
-
-    for vol in cpy['volumes_info']:
-        if vol.get("replica_state", {}).get("chunks"):
-            vol["replica_state"]["chunks"] = "<redacted>"
-
-    return cpy
+    new = {}
+    for key in task_info.keys():
+        if key != "volumes_info":
+            new[key] = copy.deepcopy(task_info[key])
+
+    if 'volumes_info' in task_info:
+        new['volumes_info'] = []
+        for vol in task_info['volumes_info']:
+            vol_cpy = {}
+            for key in vol:
+                if key != "replica_state":
+                    vol_cpy[key] = copy.deepcopy(vol[key])
+                else:
+                    vol_cpy['replica_state'] = {}
+                    for statekey in vol['replica_state']:
+                        if statekey != "chunks":
+                            vol_cpy['replica_state'][statekey] = (
+                                copy.deepcopy(
+                                    vol['replica_state'][statekey]))
+                        else:
+                            vol_cpy['replica_state']["chunks"] = (
+                                ["<redacted>"])
+            new['volumes_info'].append(vol_cpy)
+
+    return new
 
 
 
 
 def _write_systemd(ssh, cmdline, svcname, run_as=None, start=True):
 def _write_systemd(ssh, cmdline, svcname, run_as=None, start=True):

+ 43 - 52
coriolis/worker/rpc/server.py

@@ -25,14 +25,8 @@ from coriolis.tasks import factory as task_runners_factory
 from coriolis import utils
 from coriolis import utils
 
 
 
 
-worker_opts = [
-    cfg.StrOpt('export_base_path',
-               default='/tmp',
-               help='The path used for hosting exported disks.'),
-]
-
 CONF = cfg.CONF
 CONF = cfg.CONF
-CONF.register_opts(worker_opts, 'worker')
+CONF.register_opts([], 'worker')
 
 
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
 
 
@@ -95,9 +89,14 @@ class WorkerServerEndpoint(object):
                 LOG.info("Sending SIGINT to process: %s", process_id)
                 LOG.info("Sending SIGINT to process: %s", process_id)
                 p.send_signal(signal.SIGINT)
                 p.send_signal(signal.SIGINT)
         except psutil.NoSuchProcess:
         except psutil.NoSuchProcess:
-            err_msg = "Task process not found: %s" % process_id
-            LOG.info(err_msg)
-            self._rpc_conductor_client.set_task_error(ctxt, task_id, err_msg)
+            msg = (
+                "Unable to find process '%s' for task '%s' for cancellation. "
+                "Presuming it was already canceled or had "
+                "completed/error'd." % (
+                    process_id, task_id))
+            LOG.error(msg)
+            self._rpc_conductor_client.confirm_task_cancellation(
+                ctxt, task_id, msg)
 
 
     def _handle_mp_log_events(self, p, mp_log_q):
     def _handle_mp_log_events(self, p, mp_log_q):
         while True:
         while True:
@@ -146,7 +145,8 @@ class WorkerServerEndpoint(object):
         libraries needed by the source/destination providers.
         libraries needed by the source/destination providers.
         """
         """
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
-        task_runner = task_runners_factory.get_task_runner(task_type)
+        task_runner = task_runners_factory.get_task_runner_class(
+            task_type)()
 
 
         return task_runner.get_shared_libs_for_providers(
         return task_runner.get_shared_libs_for_providers(
             ctxt, origin, destination, event_handler)
             ctxt, origin, destination, event_handler)
@@ -184,8 +184,18 @@ class WorkerServerEndpoint(object):
 
 
         self._start_process_with_custom_library_paths(p, extra_library_paths)
         self._start_process_with_custom_library_paths(p, extra_library_paths)
         LOG.info("Task process started: %s", task_id)
         LOG.info("Task process started: %s", task_id)
-        self._rpc_conductor_client.set_task_host(
-            ctxt, task_id, self._server, p.pid)
+        try:
+            self._rpc_conductor_client.set_task_host(
+                ctxt, task_id, self._server, p.pid)
+        except (Exception, KeyboardInterrupt) as ex:
+            # NOTE: because the task error classes are wrapped,
+            # it's easiest to just check that the messages align:
+            cancelling_msg = exception.TASK_ALREADY_CANCELLING_EXCEPTION_FMT % {
+                "task_id": task_id}
+            if cancelling_msg in str(ex):
+                raise exception.TaskIsCancelling(
+                    "Task '%s' was already in cancelling status." % task_id)
+            raise
 
 
         evt = eventlet.spawn(self._wait_for_process, p, mp_q)
         evt = eventlet.spawn(self._wait_for_process, p, mp_q)
         eventlet.spawn(self._handle_mp_log_events, p, mp_log_q)
         eventlet.spawn(self._handle_mp_log_events, p, mp_log_q)
@@ -193,8 +203,9 @@ class WorkerServerEndpoint(object):
         result = evt.wait()
         result = evt.wait()
         p.join()
         p.join()
 
 
-        if not result:
-            raise exception.CoriolisException("Task canceled")
+        if result is None:
+            raise exception.TaskProcessCanceledException(
+                "Task was canceled.")
 
 
         if isinstance(result, str):
         if isinstance(result, str):
             raise exception.TaskProcessException(result)
             raise exception.TaskProcessException(result)
@@ -202,39 +213,25 @@ class WorkerServerEndpoint(object):
 
 
     def exec_task(self, ctxt, task_id, task_type, origin, destination,
     def exec_task(self, ctxt, task_id, task_type, origin, destination,
                   instance, task_info):
                   instance, task_info):
-        export_path = task_info.get("export_path")
-        if not export_path:
-            export_path = _get_task_export_path(task_id, create=True)
-            task_info["export_path"] = export_path
-        retain_export_path = False
-        task_info["retain_export_path"] = retain_export_path
-
         try:
         try:
-            new_task_info = self._exec_task_process(
+            task_result = self._exec_task_process(
                 ctxt, task_id, task_type, origin, destination,
                 ctxt, task_id, task_type, origin, destination,
                 instance, task_info)
                 instance, task_info)
 
 
-            if new_task_info:
-                LOG.info(
-                    "Task info: %s",
-                    utils.filter_chunking_info_for_task(new_task_info))
+            LOG.info(
+                "Output of completed %s task with ID %s: %s",
+                task_type, task_type,
+                utils.filter_chunking_info_for_task(task_result))
 
 
-            # TODO(alexpilotti): replace the temp storage with a host
-            # independent option
-            retain_export_path = new_task_info.get("retain_export_path", False)
-            if not retain_export_path:
-                del new_task_info["export_path"]
-
-            LOG.info("Task completed: %s", task_id)
-            self._rpc_conductor_client.task_completed(ctxt, task_id,
-                                                      new_task_info)
+            self._rpc_conductor_client.task_completed(
+                ctxt, task_id, task_result)
+        except exception.TaskProcessCanceledException as ex:
+            LOG.exception(ex)
+            self._rpc_conductor_client.confirm_task_cancellation(
+                ctxt, task_id, str(ex))
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)
-            self._check_remove_dir(export_path)
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))
-        finally:
-            if not retain_export_path:
-                self._check_remove_dir(export_path)
 
 
     def get_endpoint_instances(self, ctxt, platform_name, connection_info,
     def get_endpoint_instances(self, ctxt, platform_name, connection_info,
                                source_environment, marker, limit,
                                source_environment, marker, limit,
@@ -429,18 +426,11 @@ class WorkerServerEndpoint(object):
             schemas["source_environment_schema"] = schema
             schemas["source_environment_schema"] = schema
 
 
         return schemas
         return schemas
-    
+
     def get_diagnostics(self, ctxt):
     def get_diagnostics(self, ctxt):
         return utils.get_diagnostics_info()
         return utils.get_diagnostics_info()
 
 
 
 
-def _get_task_export_path(task_id, create=False):
-    export_path = os.path.join(CONF.worker.export_base_path, task_id)
-    if create and not os.path.exists(export_path):
-        os.makedirs(export_path)
-    return export_path
-
-
 def _setup_task_process(mp_log_q):
 def _setup_task_process(mp_log_q):
     # Setting up logging and cfg, needed since this is a new process
     # Setting up logging and cfg, needed since this is a new process
     cfg.CONF(sys.argv[1:], project='coriolis', version="1.0.0")
     cfg.CONF(sys.argv[1:], project='coriolis', version="1.0.0")
@@ -458,7 +448,8 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
     try:
     try:
         _setup_task_process(mp_log_q)
         _setup_task_process(mp_log_q)
 
 
-        task_runner = task_runners_factory.get_task_runner(task_type)
+        task_runner = task_runners_factory.get_task_runner_class(
+            task_type)()
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
         event_handler = _ConductorProviderEventHandler(ctxt, task_id)
 
 
         LOG.debug("Executing task: %(task_id)s, type: %(task_type)s, "
         LOG.debug("Executing task: %(task_id)s, type: %(task_type)s, "
@@ -470,11 +461,11 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
                    "task_info": utils.filter_chunking_info_for_task(
                    "task_info": utils.filter_chunking_info_for_task(
                        task_info)})
                        task_info)})
 
 
-        new_task_info = task_runner.run(
+        task_result = task_runner.run(
             ctxt, instance, origin, destination, task_info, event_handler)
             ctxt, instance, origin, destination, task_info, event_handler)
         # mq_p.put() doesn't raise if new_task_info is not serializable
         # mq_p.put() doesn't raise if new_task_info is not serializable
-        utils.is_serializable(new_task_info)
-        mp_q.put(new_task_info)
+        utils.is_serializable(task_result)
+        mp_q.put(task_result)
     except Exception as ex:
     except Exception as ex:
         mp_q.put(str(ex))
         mp_q.put(str(ex))
         LOG.exception(ex)
         LOG.exception(ex)

Bu fark içinde çok fazla dosya değişikliği olduğu için bazı dosyalar gösterilmiyor