Просмотр исходного кода

Merge pull request #208 from aznashwan/per-provider-extra-lib-loads

Only load libraries of required plugin during task execution.
Nashwan Azhari 4 лет назад
Родитель
Сommit
8968dd4770
2 измененных файлов с 31 добавлено и 23 удалено
  1. 24 17
      coriolis/tasks/base.py
  2. 7 6
      coriolis/worker/rpc/server.py

+ 24 - 17
coriolis/tasks/base.py

@@ -32,23 +32,30 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
         for both the source and destination providers. """
         required_libs = []
 
-        origin_provider = providers_factory.get_provider(
-            origin["type"], constants.PROVIDER_TYPE_SETUP_LIBS, event_handler,
-            raise_if_not_found=False)
-        if origin_provider:
-            conn_info = get_connection_info(ctxt, origin)
-            required_libs.extend(
-                origin_provider.get_shared_library_directories(
-                    ctxt, conn_info))
-
-        destination_provider = providers_factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_SETUP_LIBS,
-            event_handler, raise_if_not_found=False)
-        if destination_provider:
-            conn_info = get_connection_info(ctxt, destination)
-            required_libs.extend(
-                destination_provider.get_shared_library_directories(
-                    ctxt, conn_info))
+        platform = self.get_required_platform()
+        if platform in [
+                constants.TASK_PLATFORM_SOURCE,
+                constants.TASK_PLATFORM_BILATERAL]:
+            origin_provider = providers_factory.get_provider(
+                origin["type"], constants.PROVIDER_TYPE_SETUP_LIBS,
+                event_handler, raise_if_not_found=False)
+            if origin_provider:
+                conn_info = get_connection_info(ctxt, origin)
+                required_libs.extend(
+                    origin_provider.get_shared_library_directories(
+                        ctxt, conn_info))
+
+        if platform in [
+                constants.TASK_PLATFORM_DESTINATION,
+                constants.TASK_PLATFORM_BILATERAL]:
+            destination_provider = providers_factory.get_provider(
+                destination["type"], constants.PROVIDER_TYPE_SETUP_LIBS,
+                event_handler, raise_if_not_found=False)
+            if destination_provider:
+                conn_info = get_connection_info(ctxt, destination)
+                required_libs.extend(
+                    destination_provider.get_shared_library_directories(
+                        ctxt, conn_info))
 
         return required_libs
 

+ 7 - 6
coriolis/worker/rpc/server.py

@@ -19,7 +19,6 @@ from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis.conductor.rpc import utils as conductor_rpc_utils
 from coriolis import constants
 from coriolis import context
-from coriolis import events
 from coriolis import exception
 from coriolis.minion_manager.rpc import client as rpc_minion_manager_client
 from coriolis.providers import factory as providers_factory
@@ -234,7 +233,8 @@ class WorkerServerEndpoint(object):
             LOG.info("Task process started: %s", task_id)
             if report_to_conductor:
                 LOG.debug(
-                    "Attempting to set task process on Conductor for task '%s'.",
+                    "Attempting to set task process on Conductor "
+                    "for task '%s'.",
                     task_id)
                 self._rpc_conductor_client.set_task_process(
                     ctxt, task_id, p.pid)
@@ -247,8 +247,9 @@ class WorkerServerEndpoint(object):
                 "was: %s", task_id, utils.get_exception_details())
             # NOTE: because the task error classes are wrapped,
             # it's easiest to just check that the messages align:
-            cancelling_msg = exception.TASK_ALREADY_CANCELLING_EXCEPTION_FMT % {
-                "task_id": task_id}
+            cancelling_msg = (
+                exception.TASK_ALREADY_CANCELLING_EXCEPTION_FMT % {
+                    "task_id": task_id})
             if cancelling_msg in str(ex):
                 raise exception.TaskIsCancelling(
                     "Task '%s' was already in cancelling status." % task_id)
@@ -302,7 +303,7 @@ class WorkerServerEndpoint(object):
                     ctxt, task_id, str(ex))
             else:
                 raise
-        except exception.NoSuitableWorkerServiceError as ex:
+        except exception.NoSuitableWorkerServiceError:
             if report_to_conductor:
                 LOG.warn(
                     "A conductor-side scheduling error has occurred following "
@@ -318,7 +319,7 @@ class WorkerServerEndpoint(object):
                     task_id, utils.get_exception_details())
                 LOG.exception(ex)
                 self._rpc_conductor_client.set_task_error(
-                        ctxt, task_id, str(ex))
+                    ctxt, task_id, str(ex))
             else:
                 raise