Explorar o código

Set extra library paths in worker processes.

The 'master worker process' will now extend the 'LD_LIBRARY_PATH' environment
variable to include any paths containing shared libraries required by
the source/destination provider classes whenever any new processes which
were spawned to run a task are created.
Nashwan Azhari %!s(int64=7) %!d(string=hai) anos
pai
achega
22904135ac
Modificáronse 1 ficheiros con 44 adicións e 1 borrados
  1. 44 1
      coriolis/worker/rpc/server.py

+ 44 - 1
coriolis/worker/rpc/server.py

@@ -109,6 +109,46 @@ class WorkerServerEndpoint(object):
                 if not p.is_alive():
                     break
 
+    def _start_process_with_custom_library_paths(
+            self, process, extra_library_paths):
+        """ Given a process instance, this method will add any shared libs
+        needed by the origin/destination provider plugins to the
+        'LD_LIBRARY_PATH' env variable and start the process.
+        This method will always restore the 'LD_LIBRARY_PATH' to the
+        original value for the parent process.
+        param process: multiprocessing.Process: Process instance to run
+        with the modified 'LD_LIBRARY_PATH'
+        param extra_library_paths: list(str): list of paths with extra
+        libraries which should be available to the worker process.
+        """
+        original_ld_path = os.environ.get('LD_LIBRARY_PATH', "")
+        new_ld_path = None
+        extra_libdirs = ":".join(extra_library_paths)
+        if not original_ld_path:
+            new_ld_path = extra_libdirs
+        else:
+            new_ld_path = "%s:%s" % (original_ld_path, extra_libdirs)
+
+        LOG.debug(
+            "Starting new worker process with extra libraries: '%s'",
+            extra_library_paths)
+        try:
+            os.environ['LD_LIBRARY_PATH'] = new_ld_path
+            process.start()
+        finally:
+            os.environ['LD_LIBRARY_PATH'] = original_ld_path
+
+    def _get_extra_library_paths_for_providers(
+            self, ctxt, task_id, task_type, origin, destination):
+        """ Returns a list of strings with paths on the worker with shared
+        libraries needed by the source/destination providers.
+        """
+        event_handler = _ConductorProviderEventHandler(ctxt, task_id)
+        task_runner = task_runners_factory.get_task_runner(task_type)
+
+        return task_runner.get_shared_libs_for_providers(
+            ctxt, origin, destination, event_handler)
+
     def _exec_task_process(self, ctxt, task_id, task_type, origin, destination,
                            instance, task_info):
         mp_ctx = multiprocessing.get_context('spawn')
@@ -119,7 +159,10 @@ class WorkerServerEndpoint(object):
             args=(ctxt, task_id, task_type, origin, destination, instance,
                   task_info, mp_q, mp_log_q))
 
-        p.start()
+        extra_library_paths = self._get_extra_library_paths_for_providers(
+            ctxt, task_id, task_type, origin, destination)
+
+        self._start_process_with_custom_library_paths(p, extra_library_paths)
         LOG.info("Task process started: %s", task_id)
         self._rpc_conductor_client.set_task_host(
             ctxt, task_id, self._server, p.pid)