Browse Source

Fixes parallel task execution

Alessandro Pilotti 9 years ago
parent
commit
300e9efe9c

+ 67 - 38
coriolis/conductor/rpc/server.py

@@ -111,16 +111,30 @@ class ConductorServerEndpoint(object):
                     instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
                     execution, depends_on=[get_instance_info_task.id])
 
+                deploy_replica_export_resources_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
+                    execution, depends_on=[deploy_replica_disks_task.id])
+
                 deploy_replica_resources_task = self._create_task(
-                    instance, constants.TASK_TYPE_DEPLOY_REPLICA_RESOURCES,
+                    instance,
+                    constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
                     execution, depends_on=[deploy_replica_disks_task.id])
 
                 replicate_disks_task = self._create_task(
                     instance, constants.TASK_TYPE_REPLICATE_DISKS,
-                    execution, depends_on=[deploy_replica_resources_task.id])
+                    execution, depends_on=[
+                        deploy_replica_export_resources_task.id,
+                        deploy_replica_resources_task.id])
+
+                self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
+                    execution, depends_on=[replicate_disks_task.id])
 
                 self._create_task(
-                    instance, constants.TASK_TYPE_DELETE_REPLICA_RESOURCES,
+                    instance,
+                    constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
                     execution, depends_on=[replicate_disks_task.id])
 
         db_api.add_replica_tasks_execution(ctxt, execution)
@@ -375,23 +389,33 @@ class ConductorServerEndpoint(object):
     def _start_pending_tasks(self, ctxt, execution, parent_task, task_info):
         has_pending_tasks = False
         for task in execution.tasks:
-            if (task.depends_on and parent_task.id in task.depends_on and
-                    task.status == constants.TASK_STATUS_PENDING):
+            if task.status == constants.TASK_STATUS_PENDING:
                 has_pending_tasks = True
-                # instance imports need to be executed on the same host
-                server = None
-                if task.task_type == constants.TASK_TYPE_IMPORT_INSTANCE:
-                    server = parent_task.host
-
-                action = execution.action
-                self._rpc_worker_client.begin_task(
-                    ctxt, server=server,
-                    task_id=task.id,
-                    task_type=task.task_type,
-                    origin=action.origin,
-                    destination=action.destination,
-                    instance=task.instance,
-                    task_info=task_info)
+                if task.depends_on and parent_task.id in task.depends_on:
+                    start_task = True
+                    for depend_task_id in task.depends_on:
+                        if depend_task_id != parent_task.id:
+                            depend_task = db_api.get_task(ctxt, depend_task_id)
+                            if (depend_task.status !=
+                                    constants.TASK_STATUS_COMPLETED):
+                                start_task = False
+                                break
+                    if start_task:
+                        # instance imports need to be executed on the same host
+                        server = None
+                        if (task.task_type ==
+                                constants.TASK_TYPE_IMPORT_INSTANCE):
+                            server = parent_task.host
+
+                        action = execution.action
+                        self._rpc_worker_client.begin_task(
+                            ctxt, server=server,
+                            task_id=task.id,
+                            task_type=task.task_type,
+                            origin=action.origin,
+                            destination=action.destination,
+                            instance=task.instance,
+                            task_info=task_info)
         return has_pending_tasks
 
     @task_synchronized
@@ -405,18 +429,22 @@ class ConductorServerEndpoint(object):
             ctxt, task_id, include_execution_tasks=True)
 
         execution = task.execution
-        has_pending_tasks = self._start_pending_tasks(ctxt, execution, task,
-                                                      task_info)
-
-        LOG.info("Setting instance %(instance)s action info: %(task_info)s",
-                 {"instance": task.instance, "task_info": task_info})
-        db_api.set_transfer_action_info(
-            ctxt, execution.action_id, task.instance, task_info)
-
-        if not has_pending_tasks:
-            LOG.info("Tasks execution completed: %s", execution.id)
-            db_api.set_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_COMPLETED)
+        with lockutils.lock(execution.action_id):
+            LOG.info("Setting instance %(instance)s "
+                     "action info: %(task_info)s",
+                     {"instance": task.instance, "task_info": task_info})
+            updated_task_info = db_api.set_transfer_action_info(
+                ctxt, execution.action_id, task.instance, task_info)
+
+            if execution.status == constants.EXECUTION_STATUS_RUNNING:
+                has_pending_tasks = self._start_pending_tasks(
+                    ctxt, execution, task, updated_task_info)
+
+                if not has_pending_tasks:
+                    LOG.info("Tasks execution completed: %s", execution.id)
+                    db_api.set_execution_status(
+                        ctxt, execution.id,
+                        constants.EXECUTION_STATUS_COMPLETED)
 
     @task_synchronized
     def set_task_error(self, ctxt, task_id, exception_details):
@@ -430,14 +458,15 @@ class ConductorServerEndpoint(object):
             ctxt, task_id, include_execution_tasks=True)
         execution = task.execution
 
-        for task in execution.tasks:
-            if task.status == constants.TASK_STATUS_PENDING:
-                db_api.set_task_status(
-                    ctxt, task.id, constants.TASK_STATUS_CANCELED)
+        with lockutils.lock(execution.action_id):
+            for task in execution.tasks:
+                if task.status == constants.TASK_STATUS_PENDING:
+                    db_api.set_task_status(
+                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
-        LOG.error("Tasks execution failed: %s", execution.id)
-        db_api.set_execution_status(
-            ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+            LOG.error("Tasks execution failed: %s", execution.id)
+            db_api.set_execution_status(
+                ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
 
     @task_synchronized
     def task_event(self, ctxt, task_id, level, message):

+ 4 - 2
coriolis/constants.py

@@ -18,8 +18,10 @@ TASK_TYPE_GET_INSTANCE_INFO = "GET_INSTANCE_INFO"
 TASK_TYPE_DEPLOY_REPLICA_DISKS = "DEPLOY_REPLICA_DISKS"
 TASK_TYPE_DELETE_REPLICA_DISKS = "DELETE_REPLICA_DISKS"
 TASK_TYPE_REPLICATE_DISKS = "REPLICATE_DISKS"
-TASK_TYPE_DEPLOY_REPLICA_RESOURCES = "DEPLOY_REPLICA_RESOURCES"
-TASK_TYPE_DELETE_REPLICA_RESOURCES = "DELETE_REPLICA_RESOURCES"
+TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES = "DEPLOY_REPLICA_SOURCE_RESOURCES"
+TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES = "DELETE_REPLICA_SOURCE_RESOURCES"
+TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES = "DEPLOY_REPLICA_TARGET_RESOURCES"
+TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES = "DELETE_REPLICA_TARGET_RESOURCES"
 TASK_TYPE_SHUTDOWN_INSTANCE = "SHUTDOWN_INSTANCE"
 TASK_TYPE_DEPLOY_REPLICA_INSTANCE = "DEPLOY_REPLICA_INSTANCE"
 TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS = "CREATE_REPLICA_DISK_SNAPSHOTS"

+ 8 - 1
coriolis/db/api.py

@@ -223,9 +223,16 @@ def set_transfer_action_info(context, action_id, instance, instance_info):
 
     # Copy is needed, otherwise sqlalchemy won't save the changes
     action_info = action.info.copy()
-    action_info[instance] = instance_info
+    if instance in action_info:
+        instance_info_old = action_info[instance].copy()
+        instance_info_old.update(instance_info)
+        action_info[instance] = instance_info_old
+    else:
+        action_info[instance] = instance_info
     action.info = action_info
 
+    return action_info[instance]
+
 
 @enginefacade.reader
 def get_tasks_execution(context, execution_id):

+ 15 - 5
coriolis/providers/base.py

@@ -76,13 +76,13 @@ class BaseReplicaImportProvider(BaseImportProvider):
         pass
 
     @abc.abstractmethod
-    def deploy_replica_resources(self, ctxt, connection_info,
-                                 target_environment, volumes_info):
+    def deploy_replica_target_resources(self, ctxt, connection_info,
+                                        target_environment, volumes_info):
         pass
 
     @abc.abstractmethod
-    def delete_replica_resources(self, ctxt, connection_info,
-                                 migr_resources_dict):
+    def delete_replica_target_resources(self, ctxt, connection_info,
+                                        migr_resources_dict):
         pass
 
     @abc.abstractmethod
@@ -124,9 +124,19 @@ class BaseReplicaExportProvider(BaseExportProvider):
     def get_replica_instance_info(self, ctxt, connection_info, instance_name):
         pass
 
+    @abc.abstractmethod
+    def deploy_replica_source_resources(self, ctxt, connection_info):
+        pass
+
+    @abc.abstractmethod
+    def delete_replica_source_resources(self, ctxt, connection_info,
+                                        migr_resources_dict):
+        pass
+
     @abc.abstractmethod
     def replicate_disks(self, ctxt, connection_info, instance_name,
-                        target_conn_info, volumes_info, incremental):
+                        source_conn_info, target_conn_info, volumes_info,
+                        incremental):
         pass
 
     @abc.abstractmethod

+ 4 - 4
coriolis/providers/openstack/__init__.py

@@ -921,8 +921,8 @@ class ImportProvider(base.BaseReplicaImportProvider):
 
         return volumes_info
 
-    def deploy_replica_resources(self, ctxt, connection_info,
-                                 target_environment, volumes_info):
+    def deploy_replica_target_resources(self, ctxt, connection_info,
+                                        target_environment, volumes_info):
         session = keystone.create_keystone_session(ctxt, connection_info)
 
         glance_api_version = connection_info.get("image_api_version",
@@ -963,8 +963,8 @@ class ImportProvider(base.BaseReplicaImportProvider):
             migr_resources.delete()
             raise
 
-    def delete_replica_resources(self, ctxt, connection_info,
-                                 migr_resources_dict):
+    def delete_replica_target_resources(self, ctxt, connection_info,
+                                        migr_resources_dict):
         session = keystone.create_keystone_session(ctxt, connection_info)
 
         nova = nova_client.Client(NOVA_API_VERSION, session=session)

+ 9 - 1
coriolis/providers/vmware_vsphere/__init__.py

@@ -686,7 +686,8 @@ class ExportProvider(base.BaseReplicaExportProvider):
             self._shutdown_vm(vm)
 
     def replicate_disks(self, ctxt, connection_info, instance_name,
-                        target_conn_info, volumes_info, incremental):
+                        source_conn_info, target_conn_info, volumes_info,
+                        incremental):
         ip = target_conn_info["ip"]
         port = target_conn_info.get("port", 22)
         username = target_conn_info["username"]
@@ -722,3 +723,10 @@ class ExportProvider(base.BaseReplicaExportProvider):
                 volume_info["change_id"] = change_id
 
         return volumes_info
+
+    def deploy_replica_source_resources(self, ctxt, connection_info):
+        return {"migr_resources": None, "connection_info": None}
+
+    def delete_replica_source_resources(self, ctxt, connection_info,
+                                        migr_resources_dict):
+        pass

+ 8 - 4
coriolis/tasks/factory.py

@@ -21,10 +21,14 @@ _TASKS_MAP = {
         replica_tasks.DeployReplicaDisksTask,
     constants.TASK_TYPE_DELETE_REPLICA_DISKS:
         replica_tasks.DeleteReplicaDisksTask,
-    constants.TASK_TYPE_DEPLOY_REPLICA_RESOURCES:
-        replica_tasks.DeployReplicaResourcesTask,
-    constants.TASK_TYPE_DELETE_REPLICA_RESOURCES:
-        replica_tasks.DeleteReplicaResourcesTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES:
+        replica_tasks.DeployReplicaTargetResourcesTask,
+    constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES:
+        replica_tasks.DeleteReplicaTargetResourcesTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES:
+        replica_tasks.DeployReplicaSourceResourcesTask,
+    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES:
+        replica_tasks.DeleteReplicaSourceResourcesTask,
     constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE:
         replica_tasks.DeployReplicaInstanceTask,
     constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS:

+ 73 - 19
coriolis/tasks/replica_tasks.py

@@ -12,6 +12,22 @@ from oslo_log import log as logging
 LOG = logging.getLogger(__name__)
 
 
+def _marshal_migr_conn_info(migr_connection_info):
+    if migr_connection_info and "pkey" in migr_connection_info:
+        migr_connection_info = migr_connection_info.copy()
+        migr_connection_info["pkey"] = utils.serialize_key(
+            migr_connection_info["pkey"])
+    return migr_connection_info
+
+
+def _unmarshal_migr_conn_info(migr_connection_info):
+    if migr_connection_info and "pkey" in migr_connection_info:
+        migr_connection_info = migr_connection_info.copy()
+        pkey_str = migr_connection_info["pkey"]
+        migr_connection_info["pkey"] = utils.deserialize_key(pkey_str)
+    return migr_connection_info
+
+
 class GetInstanceInfoTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
@@ -51,17 +67,18 @@ class ReplicateDisksTask(base.TaskRunner):
 
         volumes_info = task_info["volumes_info"]
 
-        migr_conn_info = task_info["migr_connection_info"]
-        pkey_str = migr_conn_info["pkey"]
-        migr_conn_info["pkey"] = utils.deserialize_key(pkey_str)
+        migr_source_conn_info = _unmarshal_migr_conn_info(
+            task_info["migr_source_connection_info"])
+
+        migr_target_conn_info = _unmarshal_migr_conn_info(
+            task_info["migr_target_connection_info"])
 
         incremental = task_info.get("incremental", True)
 
         volumes_info = provider.replicate_disks(
-            ctxt, connection_info, instance, migr_conn_info,
-            volumes_info, incremental)
+            ctxt, connection_info, instance, migr_source_conn_info,
+            migr_target_conn_info, volumes_info, incremental)
 
-        task_info["migr_connection_info"] = pkey_str
         task_info["volumes_info"] = volumes_info
 
         return task_info
@@ -100,12 +117,49 @@ class DeleteReplicaDisksTask(base.TaskRunner):
         provider.delete_replica_disks(
             ctxt, connection_info, volumes_info)
 
-        del task_info["volumes_info"]
+        task_info["volumes_info"] = None
 
         return task_info
 
 
-class DeployReplicaResourcesTask(base.TaskRunner):
+class DeployReplicaSourceResourcesTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, origin)
+
+        replica_resources_info = provider.deploy_replica_source_resources(
+            ctxt, connection_info)
+
+        task_info["migr_source_resources"] = replica_resources_info[
+            "migr_resources"]
+        migr_connection_info = _marshal_migr_conn_info(
+            replica_resources_info["connection_info"])
+        task_info["migr_source_connection_info"] = migr_connection_info
+
+        return task_info
+
+
+class DeleteReplicaSourceResourcesTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_EXPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, origin)
+
+        migr_resources = task_info["migr_source_resources"]
+
+        provider.delete_replica_source_resources(
+            ctxt, connection_info, migr_resources)
+
+        task_info["migr_source_resources"] = None
+        task_info["migr_source_connection_info"] = None
+
+        return task_info
+
+
+class DeployReplicaTargetResourcesTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         target_environment = destination.get("target_environment") or {}
@@ -116,34 +170,34 @@ class DeployReplicaResourcesTask(base.TaskRunner):
 
         volumes_info = task_info["volumes_info"]
 
-        replica_resources_info = provider.deploy_replica_resources(
+        replica_resources_info = provider.deploy_replica_target_resources(
             ctxt, connection_info, target_environment, volumes_info)
 
         task_info["volumes_info"] = replica_resources_info["volumes_info"]
-        task_info["migr_resources"] = replica_resources_info["migr_resources"]
+        task_info["migr_target_resources"] = replica_resources_info[
+            "migr_resources"]
 
-        migr_connection_info = replica_resources_info["connection_info"]
-        migr_connection_info["pkey"] = utils.serialize_key(
-            migr_connection_info["pkey"])
-        task_info["migr_connection_info"] = migr_connection_info
+        migr_connection_info = _marshal_migr_conn_info(
+            replica_resources_info["connection_info"])
+        task_info["migr_target_connection_info"] = migr_connection_info
 
         return task_info
 
 
-class DeleteReplicaResourcesTask(base.TaskRunner):
+class DeleteReplicaTargetResourcesTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
 
-        migr_resources = task_info["migr_resources"]
+        migr_resources = task_info["migr_target_resources"]
 
-        provider.delete_replica_resources(
+        provider.delete_replica_target_resources(
             ctxt, connection_info, migr_resources)
 
-        del task_info["migr_resources"]
-        del task_info["migr_connection_info"]
+        task_info["migr_target_resources"] = None
+        task_info["migr_target_connection_info"] = None
 
         return task_info