Browse Source

Task RPC refactoring

Alessandro Pilotti 10 years ago
parent
commit
dd8e01708d

+ 2 - 6
coriolis/conductor/rpc/client.py

@@ -31,13 +31,9 @@ class ConductorClient(object):
             ctxt, 'set_task_host', task_id=task_id, host=host,
             ctxt, 'set_task_host', task_id=task_id, host=host,
             process_id=process_id)
             process_id=process_id)
 
 
-    def export_completed(self, ctxt, task_id, export_info):
+    def task_completed(self, ctxt, task_id, task_info):
         self._client.call(
         self._client.call(
-            ctxt, 'export_completed', task_id=task_id,
-            export_info=export_info)
-
-    def import_completed(self, ctxt, task_id):
-        self._client.call(ctxt, 'import_completed', task_id=task_id)
+            ctxt, 'task_completed', task_id=task_id, task_info=task_info)
 
 
     def set_task_error(self, ctxt, task_id, exception_details):
     def set_task_error(self, ctxt, task_id, exception_details):
         self._client.call(ctxt, 'set_task_error', task_id=task_id,
         self._client.call(ctxt, 'set_task_error', task_id=task_id,

+ 24 - 18
coriolis/conductor/rpc/server.py

@@ -35,14 +35,14 @@ class ConductorServerEndpoint(object):
             task_export.migration = migration
             task_export.migration = migration
             task_export.instance = instance
             task_export.instance = instance
             task_export.status = constants.TASK_STATUS_WAITING
             task_export.status = constants.TASK_STATUS_WAITING
-            task_export.task_type = constants.TASK_TYPE_EXPORT
+            task_export.task_type = constants.TASK_TYPE_EXPORT_INSTANCE
 
 
             task_import = models.Task()
             task_import = models.Task()
             task_import.id = str(uuid.uuid4())
             task_import.id = str(uuid.uuid4())
             task_import.migration = migration
             task_import.migration = migration
             task_import.instance = instance
             task_import.instance = instance
             task_import.status = constants.TASK_STATUS_WAITING
             task_import.status = constants.TASK_STATUS_WAITING
-            task_import.task_type = constants.TASK_TYPE_IMPORT
+            task_import.task_type = constants.TASK_TYPE_IMPORT_INSTANCE
             task_import.depends_on = [task_export.id]
             task_import.depends_on = [task_export.id]
 
 
         db_api.add_migration(ctxt, migration)
         db_api.add_migration(ctxt, migration)
@@ -50,8 +50,14 @@ class ConductorServerEndpoint(object):
 
 
         for task in migration.tasks:
         for task in migration.tasks:
             if not task.depends_on:
             if not task.depends_on:
-                self._rpc_worker_client.begin_export_instance(
-                    ctxt, task.id, origin, instance)
+                self._rpc_worker_client.begin_task(
+                    ctxt, server=None,
+                    task_id=task.id,
+                    task_type=task.task_type,
+                    origin=migration.origin,
+                    destination=migration.destination,
+                    instance=task.instance,
+                    task_info=None)
 
 
         return self.get_migration(ctxt, migration.id)
         return self.get_migration(ctxt, migration.id)
 
 
@@ -73,19 +79,22 @@ class ConductorServerEndpoint(object):
             if (task.depends_on and parent_task.id in task.depends_on and
             if (task.depends_on and parent_task.id in task.depends_on and
                     task.status == constants.TASK_STATUS_WAITING):
                     task.status == constants.TASK_STATUS_WAITING):
                 has_pending_tasks = True
                 has_pending_tasks = True
-                if task.task_type == constants.TASK_TYPE_IMPORT:
-                    # Needs to be executed on the same host
-                    self._rpc_worker_client.begin_import_instance(
-                        ctxt, parent_task.host, task.id,
-                        migration.destination,
-                        task.instance,
-                        task_info)
+                # instance imports needs to be executed on the same host
+                server = None
+                if task.task_type == constants.TASK_TYPE_IMPORT_INSTANCE:
+                    server = parent_task.host
+
+                self._rpc_worker_client.begin_task(
+                    ctxt, server=server,
+                    task_id=task.id,
+                    task_type=task.task_type,
+                    origin=migration.origin,
+                    destination=migration.destination,
+                    instance=task.instance,
+                    task_info=task_info)
         return has_pending_tasks
         return has_pending_tasks
 
 
-    def export_completed(self, ctxt, task_id, export_info):
-        self._task_completed(ctxt, task_id, export_info)
-
-    def _task_completed(self, ctxt, task_id, task_info):
+    def task_completed(self, ctxt, task_id, task_info):
         LOG.info("Task completed: %s", task_id)
         LOG.info("Task completed: %s", task_id)
 
 
         db_api.set_task_status(
         db_api.set_task_status(
@@ -103,9 +112,6 @@ class ConductorServerEndpoint(object):
             db_api.set_migration_status(
             db_api.set_migration_status(
                 ctxt, migration.id, constants.MIGRATION_STATUS_COMPLETED)
                 ctxt, migration.id, constants.MIGRATION_STATUS_COMPLETED)
 
 
-    def import_completed(self, ctxt, task_id):
-        self._task_completed(ctxt, task_id, None)
-
     def set_task_error(self, ctxt, task_id, exception_details):
     def set_task_error(self, ctxt, task_id, exception_details):
         LOG.error("Task error: %(task_id)s - %(ex)s",
         LOG.error("Task error: %(task_id)s - %(ex)s",
                   {"task_id": task_id, "ex": exception_details})
                   {"task_id": task_id, "ex": exception_details})

+ 2 - 2
coriolis/constants.py

@@ -8,8 +8,8 @@ TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
 
 
-TASK_TYPE_EXPORT = "EXPORT"
-TASK_TYPE_IMPORT = "IMPORT"
+TASK_TYPE_EXPORT_INSTANCE = "EXPORT_INSTANCE"
+TASK_TYPE_IMPORT_INSTANCE = "IMPORT_INSTANCE"
 
 
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2
 PROVIDER_TYPE_EXPORT = 2

+ 7 - 13
coriolis/worker/rpc/client.py

@@ -10,24 +10,18 @@ class WorkerClient(object):
         target = messaging.Target(topic='coriolis_worker', version=VERSION)
         target = messaging.Target(topic='coriolis_worker', version=VERSION)
         self._client = rpc.get_client(target)
         self._client = rpc.get_client(target)
 
 
-    def begin_export_instance(self, ctxt, task_id, origin, instance):
-        self._client.cast(
-            ctxt, 'export_instance', task_id=task_id, origin=origin,
-            instance=instance)
+    def begin_task(self, ctxt, server, task_id, task_type, origin, destination,
+                   instance, task_info):
+        cctxt = self._client.prepare(server=server)
+        cctxt.cast(
+            ctxt, 'exec_task', task_id=task_id, task_type=task_type,
+            origin=origin, destination=destination, instance=instance,
+            task_info=task_info)
 
 
     def stop_task(self, ctxt, server, process_id):
     def stop_task(self, ctxt, server, process_id):
         # Needs to be executed on the same server
         # Needs to be executed on the same server
         cctxt = self._client.prepare(server=server)
         cctxt = self._client.prepare(server=server)
         cctxt.call(ctxt, 'stop_task', process_id=process_id)
         cctxt.call(ctxt, 'stop_task', process_id=process_id)
 
 
-    def begin_import_instance(self, ctxt, server, task_id, destination,
-                              instance, export_info):
-        # Needs to be executed on the same server
-        cctxt = self._client.prepare(server=server)
-        cctxt.cast(
-            ctxt, 'import_instance', task_id=task_id,
-            destination=destination, instance=instance,
-            export_info=export_info)
-
     def update_migration_status(self, ctxt, task_id, status):
     def update_migration_status(self, ctxt, task_id, status):
         self._client.call(ctxt, "update_migration_status", status=status)
         self._client.call(ctxt, "update_migration_status", status=status)

+ 33 - 39
coriolis/worker/rpc/server.py

@@ -1,6 +1,5 @@
 import os
 import os
 import multiprocessing
 import multiprocessing
-import queue
 import shutil
 import shutil
 
 
 from oslo_config import cfg
 from oslo_config import cfg
@@ -71,47 +70,42 @@ class WorkerServerEndpoint(object):
             raise exception.TaskProcessException(result)
             raise exception.TaskProcessException(result)
         return result
         return result
 
 
-    def export_instance(self, ctxt, task_id, origin, instance):
+    def exec_task(self, ctxt, task_id, task_type, origin, destination,
+                  instance, task_info):
         try:
         try:
-            export_provider = factory.get_provider(
-                origin["type"], constants.PROVIDER_TYPE_EXPORT)
-            export_path = self._get_task_export_path(task_id)
-            if not os.path.exists(export_path):
-                os.makedirs(export_path)
-
-            vm_info = self._exec_task_process(
-                ctxt, task_id, _export_instance,
-                (export_provider, origin["connection_info"],
-                 instance, export_path))
-
-            LOG.info("Exported VM: %s" % vm_info)
-            self._rpc_conductor_client.export_completed(
-                ctxt, task_id, vm_info)
-        except Exception as ex:
-            LOG.exception(ex)
-            if isinstance(ex, exception.TaskProcessException):
-                stack_trace = ex.message
+            task_info = None
+
+            if task_type == constants.TASK_TYPE_EXPORT_INSTANCE:
+                provider = factory.get_provider(
+                    origin["type"], constants.PROVIDER_TYPE_EXPORT)
+                export_path = self._get_task_export_path(task_id)
+                if not os.path.exists(export_path):
+                    os.makedirs(export_path)
+
+                task_info = self._exec_task_process(
+                    ctxt, task_id, _export_instance,
+                    (provider, origin["connection_info"],
+                     instance, export_path))
+
+            elif task_type == constants.TASK_TYPE_IMPORT_INSTANCE:
+                provider = factory.get_provider(
+                    destination["type"], constants.PROVIDER_TYPE_IMPORT)
+
+                self._exec_task_process(
+                    ctxt, task_id, _import_instance,
+                    (provider, destination["connection_info"],
+                     destination["target_environment"],
+                     instance, task_info))
             else:
             else:
-                stack_trace = utils.get_exception_details()
+                raise Exception("Unknown task type: %s" % task_type)
 
 
-            self._cleanup_task_resources(task_id)
-            self._rpc_conductor_client.set_task_error(
-                ctxt, task_id, stack_trace)
+            LOG.info("Task completed: %s", task_id)
+            LOG.info("Task info: %s", task_info)
+            self._rpc_conductor_client.task_completed(ctxt, task_id, task_info)
 
 
-    def import_instance(self, ctxt, task_id, destination, instance,
-                        export_info):
-        try:
-            import_provider = factory.get_provider(
-                destination["type"], constants.PROVIDER_TYPE_IMPORT)
-
-            self._exec_task_process(
-                ctxt, task_id, _import_instance,
-                (import_provider, destination["connection_info"],
-                 destination["target_environment"],
-                 instance, export_info))
-
-            LOG.info("Import completed")
-            self._rpc_conductor_client.import_completed(ctxt, task_id)
+            # Resources are needed by dependent import tasks
+            if task_type != constants.TASK_TYPE_EXPORT_INSTANCE:
+                self._cleanup_task_resources(task_id)
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)
             if isinstance(ex, exception.TaskProcessException):
             if isinstance(ex, exception.TaskProcessException):
@@ -121,7 +115,7 @@ class WorkerServerEndpoint(object):
 
 
             self._rpc_conductor_client.set_task_error(
             self._rpc_conductor_client.set_task_error(
                 ctxt, task_id, stack_trace)
                 ctxt, task_id, stack_trace)
-        finally:
+
             self._cleanup_task_resources(task_id)
             self._cleanup_task_resources(task_id)