|
|
@@ -109,6 +109,46 @@ class WorkerServerEndpoint(object):
|
|
|
if not p.is_alive():
|
|
|
break
|
|
|
|
|
|
+ def _start_process_with_custom_library_paths(
|
|
|
+ self, process, extra_library_paths):
|
|
|
+ """ Given a process instance, this method will add any shared libs
|
|
|
+ needed by the origin/destination provider plugins to the
|
|
|
+ 'LD_LIBRARY_PATH' env variable and start the process.
|
|
|
+ This method will always restore the 'LD_LIBRARY_PATH' to the
|
|
|
+ original value for the parent process.
|
|
|
+ param process: multiprocessing.Process: Process instance to run
|
|
|
+ with the modified 'LD_LIBRARY_PATH'
|
|
|
+ param extra_library_paths: list(str): list of paths with extra
|
|
|
+ libraries which should be available to the worker process.
|
|
|
+ """
|
|
|
+ original_ld_path = os.environ.get('LD_LIBRARY_PATH', "")
|
|
|
+ new_ld_path = None
|
|
|
+ extra_libdirs = ":".join(extra_library_paths)
|
|
|
+ if not original_ld_path:
|
|
|
+ new_ld_path = extra_libdirs
|
|
|
+ else:
|
|
|
+ new_ld_path = "%s:%s" % (original_ld_path, extra_libdirs)
|
|
|
+
|
|
|
+ LOG.debug(
|
|
|
+ "Starting new worker process with extra libraries: '%s'",
|
|
|
+ extra_library_paths)
|
|
|
+ try:
|
|
|
+ os.environ['LD_LIBRARY_PATH'] = new_ld_path
|
|
|
+ process.start()
|
|
|
+ finally:
|
|
|
+ os.environ['LD_LIBRARY_PATH'] = original_ld_path
|
|
|
+
|
|
|
+ def _get_extra_library_paths_for_providers(
|
|
|
+ self, ctxt, task_id, task_type, origin, destination):
|
|
|
+ """ Returns a list of strings with paths on the worker with shared
|
|
|
+ libraries needed by the source/destination providers.
|
|
|
+ """
|
|
|
+ event_handler = _ConductorProviderEventHandler(ctxt, task_id)
|
|
|
+ task_runner = task_runners_factory.get_task_runner(task_type)
|
|
|
+
|
|
|
+ return task_runner.get_shared_libs_for_providers(
|
|
|
+ ctxt, origin, destination, event_handler)
|
|
|
+
|
|
|
def _exec_task_process(self, ctxt, task_id, task_type, origin, destination,
|
|
|
instance, task_info):
|
|
|
mp_ctx = multiprocessing.get_context('spawn')
|
|
|
@@ -119,7 +159,10 @@ class WorkerServerEndpoint(object):
|
|
|
args=(ctxt, task_id, task_type, origin, destination, instance,
|
|
|
task_info, mp_q, mp_log_q))
|
|
|
|
|
|
- p.start()
|
|
|
+ extra_library_paths = self._get_extra_library_paths_for_providers(
|
|
|
+ ctxt, task_id, task_type, origin, destination)
|
|
|
+
|
|
|
+ self._start_process_with_custom_library_paths(p, extra_library_paths)
|
|
|
LOG.info("Task process started: %s", task_id)
|
|
|
self._rpc_conductor_client.set_task_host(
|
|
|
ctxt, task_id, self._server, p.pid)
|