Просмотр исходного кода

Fixes task cancellation sync issue

Alessandro Pilotti 10 лет назад
Родитель
Сommit
86faef6d02
1 измененных файлов с 13 добавлено и 8 удалено
  1. 13 8
      coriolis/worker/rpc/server.py

+ 13 - 8
coriolis/worker/rpc/server.py

@@ -1,6 +1,7 @@
 from logging import handlers
-import os
 import multiprocessing
+import os
+import queue
 import shutil
 import sys
 
@@ -93,13 +94,17 @@ class WorkerServerEndpoint(object):
         except psutil.NoSuchProcess:
             LOG.info("Task process not found: %s", process_id)
 
-    def _handle_mp_log_events(self, mp_log_q):
+    def _handle_mp_log_events(self, p, mp_log_q):
         while True:
-            record = mp_log_q.get()
-            if record is None:
-                break
-            logger = logging.getLogger(record.name).logger
-            logger.handle(record)
+            try:
+                record = mp_log_q.get(timeout=1)
+                if record is None:
+                    break
+                logger = logging.getLogger(record.name).logger
+                logger.handle(record)
+            except queue.Empty:
+                if not p.is_alive():
+                    break
 
     def _exec_task_process(self, ctxt, task_id, task_type, origin, destination,
                            instance, task_info):
@@ -116,7 +121,7 @@ class WorkerServerEndpoint(object):
         self._rpc_conductor_client.set_task_host(
             ctxt, task_id, self._server, p.pid)
 
-        self._handle_mp_log_events(mp_log_q)
+        self._handle_mp_log_events(p, mp_log_q)
         p.join()
 
         if mp_q.empty():