2
0
Эх сурвалжийг харах

Fixes multiprocessing logging

Alessandro Pilotti 10 жил өмнө
parent
commit
0d106b4d8a

+ 25 - 5
coriolis/worker/rpc/server.py

@@ -1,3 +1,4 @@
+from logging import handlers
 import os
 import multiprocessing
 import shutil
@@ -92,20 +93,30 @@ class WorkerServerEndpoint(object):
         except psutil.NoSuchProcess:
             LOG.info("Task process not found: %s", process_id)
 
+    def _handle_mp_log_events(self, mp_log_q):
+        while True:
+            record = mp_log_q.get()
+            if record is None:
+                break
+            logger = logging.getLogger(record.name).logger
+            logger.handle(record)
+
     def _exec_task_process(self, ctxt, task_id, task_type, origin, destination,
                            instance, task_info):
         mp_ctx = multiprocessing.get_context('spawn')
         mp_q = mp_ctx.Queue()
+        mp_log_q = mp_ctx.Queue()
         p = mp_ctx.Process(
             target=_task_process,
             args=(ctxt, task_id, task_type, origin, destination, instance,
-                  task_info, mp_q))
+                  task_info, mp_q, mp_log_q))
 
         p.start()
         LOG.info("Task process started: %s", task_id)
         self._rpc_conductor_client.set_task_host(
             ctxt, task_id, self._server, p.pid)
 
+        self._handle_mp_log_events(mp_log_q)
         p.join()
 
         if mp_q.empty():
@@ -147,16 +158,22 @@ def _get_task_export_path(task_id, create=False):
     return export_path
 
 
-def _setup_task_process():
+def _setup_task_process(mp_log_q):
     # Setting up logging and cfg, needed since this is a new process
-    utils.setup_logging()
     cfg.CONF(sys.argv[1:], project='coriolis', version="1.0.0")
+    utils.setup_logging()
+
+    # Log events need to be handled in the parent process
+    log_root = logging.getLogger(None).logger
+    for handler in log_root.handlers:
+        log_root.removeHandler(handler)
+    log_root.addHandler(handlers.QueueHandler(mp_log_q))
 
 
 def _task_process(ctxt, task_id, task_type, origin, destination, instance,
-                  task_info, mp_q):
+                  task_info, mp_q, mp_log_q):
     try:
-        _setup_task_process()
+        _setup_task_process(mp_log_q)
 
         if task_type == constants.TASK_TYPE_EXPORT_INSTANCE:
             provider_type = constants.PROVIDER_TYPE_EXPORT
@@ -194,3 +211,6 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
     except Exception as ex:
         mp_q.put(str(ex))
         LOG.exception(ex)
+    finally:
+        # Signal the log event handler that there are no more events
+        mp_log_q.put(None)