Просмотр исходного кода

Fix worker RPC client instantiation for task cancellation.

Nashwan Azhari 5 лет назад
Родитель
Сommit
25df888723
2 измененных файлов с 15 добавлено и 22 удалено
  1. 14 21
      coriolis/conductor/rpc/server.py
  2. 1 1
      coriolis/worker/rpc/server.py

+ 14 - 21
coriolis/conductor/rpc/server.py

@@ -49,13 +49,6 @@ TASK_DEADLOCK_ERROR_MESSAGE = (
     "A fatal deadlock has occurred. Further debugging is required. "
     "Please review the Conductor logs and contact support for assistance.")
 
-RPC_TOPIC_TO_CLIENT_CLASS_MAP = {
-    constants.WORKER_MAIN_MESSAGING_TOPIC: rpc_worker_client.WorkerClient,
-    constants.SCHEDULER_MAIN_MESSAGING_TOPIC: (
-        rpc_scheduler_client.SchedulerClient),
-    constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC: (
-        rpc_cron_client.ReplicaCronClient)}
-
 
 def endpoint_synchronized(func):
     @functools.wraps(func)
@@ -246,19 +239,18 @@ class ConductorServerEndpoint(object):
         worker_diagnostics = []
         for worker_service in self._scheduler_client.get_workers_for_specs(
                 ctxt):
-            worker_rpc = self._get_worker_rpc_for_host(worker_service['host'])
-            diagnostics.append(worker_rpc.get_diagnostics(ctxt))
+            worker_rpc = rpc_worker_client.WorkerClient.from_service_definition(
+                worker_service)
+            try:
+                diagnostics.append(worker_rpc.get_diagnostics(ctxt))
+            except Exception as ex:
+                LOG.warn(
+                    "Exception occurred while fetching diagnostics for "
+                    "worker service '%s'. Error was: %s",
+                    worker_service['host'], utils.get_exception_details())
 
         return diagnostics
 
-    def _get_worker_rpc_for_host(self, host, *client_args, **client_kwargs):
-        rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP[
-            constants.WORKER_MAIN_MESSAGING_TOPIC]
-        topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
-            "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
-            "host": host})
-        return rpc_client_class(*client_args, topic=topic, **client_kwargs)
-
     def _get_worker_service_rpc_for_specs(
             self, ctxt, provider_requirements=None, region_sets=None,
             enabled=True, random_choice=False, raise_on_no_matches=True):
@@ -2232,11 +2224,11 @@ class ConductorServerEndpoint(object):
                     db_api.set_task_status(
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
                     try:
-                        worker_rpc = self._get_worker_rpc_for_host(
+                        worker_rpc = rpc_worker_client.WorkerClient(
                             # NOTE: we intetionally lowball the timeout for the
                             # cancellation call to prevent the conductor from
                             # hanging an excessive amount of time:
-                            task.host, timeout=10)
+                            host=task.host, timeout=10)
                         worker_rpc.cancel_task(
                             ctxt, task.id, task.process_id, force)
                     except (Exception, KeyboardInterrupt):
@@ -3619,7 +3611,8 @@ class ConductorServerEndpoint(object):
         service.status = constants.SERVICE_STATUS_UP
 
         if None in (providers, specs):
-            worker_rpc = self._get_worker_rpc_for_host(service['host'])
+            worker_rpc = (
+                rpc_worker_client.WorkerClient(host=service.host))
             status = worker_rpc.get_service_status(ctxt)
 
             service.providers = status["providers"]
@@ -3668,7 +3661,7 @@ class ConductorServerEndpoint(object):
     def refresh_service_status(self, ctxt, service_id):
         LOG.debug("Updating registration for worker service '%s'", service_id)
         service = db_api.get_service(ctxt, service_id)
-        worker_rpc = self._get_worker_rpc_for_host(service['host'])
+        worker_rpc = rpc_worker_client.WorkerClient(host=service.host)
         status = worker_rpc.get_service_status(ctxt)
         updated_values = {
             "providers": status["providers"],

+ 1 - 1
coriolis/worker/rpc/server.py

@@ -306,7 +306,7 @@ class WorkerServerEndpoint(object):
                     ctxt, task_id, p.pid)
             LOG.debug(
                 "Successfully started and reported task process for task "
-                "with ID '%s'.", task_id)
+                "with ID '%s' (PID %d)", task_id, p.pid)
         except (Exception, KeyboardInterrupt) as ex:
             LOG.debug(
                 "Exception occurred whilst setting host for task '%s'. Error "