Просмотр исходного кода

Adapt minion manager client for RPC cleanup.

Nashwan Azhari 5 лет назад
Родитель
Сommit
cf56851b6f

+ 4 - 4
coriolis/conductor/rpc/client.py

@@ -405,26 +405,26 @@ class ConductorClient(rpc.BaseRPCClient):
 
     def confirm_replica_minions_allocation(
             self, ctxt, replica_id, minion_machine_allocations):
-        self._client.call(
+        self._call(
             ctxt, 'confirm_replica_minions_allocation', replica_id=replica_id,
             minion_machine_allocations=minion_machine_allocations)
 
     def report_replica_minions_allocation_error(
             self, ctxt, replica_id, minion_allocation_error_details):
-        self._client.call(
+        self._call(
             ctxt, 'report_replica_minions_allocation_error', replica_id=replica_id,
             minion_allocation_error_details=minion_allocation_error_details)
 
     def confirm_migration_minions_allocation(
             self, ctxt, migration_id, minion_machine_allocations):
-        self._client.call(
+        self._call(
             ctxt, 'confirm_migration_minions_allocation',
             migration_id=migration_id,
             minion_machine_allocations=minion_machine_allocations)
 
     def report_migration_minions_allocation_error(
             self, ctxt, migration_id, minion_allocation_error_details):
-        self._client.call(
+        self._call(
             ctxt, 'report_migration_minions_allocation_error',
             migration_id=migration_id,
             minion_allocation_error_details=minion_allocation_error_details)

+ 9 - 24
coriolis/conductor/rpc/server.py

@@ -223,8 +223,9 @@ class ConductorServerEndpoint(object):
             worker_services = self._scheduler_client.get_workers_for_specs(
                 ctxt)
             client_objects.update({
-                "worker_%s" % wrk['host']: self._get_rpc_client_for_service(
-                    wrk)
+                "worker_%s" % wrk['host']: (
+                    rpc_worker_client.WorkerClient.from_service_definition(
+                        wrk, timeout=10))
                 for wrk in worker_services})
         except Exception as ex:
             LOG.warn(
@@ -250,24 +251,6 @@ class ConductorServerEndpoint(object):
 
         return diagnostics
 
-    def _get_rpc_client_for_service(self, service, *client_args, **client_kwargs):
-        rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP.get(service['topic'])
-        if not rpc_client_class:
-            raise exception.NotFound(
-                "No RPC client class for service with topic '%s'." % (
-                    service.topic))
-
-        topic = service['topic']
-        if service['topic'] == constants.WORKER_MAIN_MESSAGING_TOPIC:
-            # NOTE: coriolis.service.MessagingService-type services (such
-            # as the worker), always have a dedicated per-host queue
-            # which can be used to target the service:
-            topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
-                "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
-                "host": service['host']})
-
-        return rpc_client_class(*client_args, topic=topic, **client_kwargs)
-
     def _get_worker_rpc_for_host(self, host, *client_args, **client_kwargs):
         rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP[
             constants.WORKER_MAIN_MESSAGING_TOPIC]
@@ -285,7 +268,8 @@ class ConductorServerEndpoint(object):
             random_choice=random_choice,
             raise_on_no_matches=raise_on_no_matches)
         service = db_api.get_service(ctxt, selected_service["id"])
-        return self._get_rpc_client_for_service(service)
+        return rpc_worker_client.WorkerClient.from_service_definition(
+            service)
 
     def _check_delete_reservation_for_transfer(self, transfer_action):
         action_id = transfer_action.base_id
@@ -542,13 +526,13 @@ class ConductorServerEndpoint(object):
             ctxt, endpoint.type, source_env)
 
     def get_available_providers(self, ctxt):
-        worker_rpc = self._get_rpc_client_for_service(
+        worker_rpc = rpc_worker_client.WorkerClient.from_service_definition(
             self._scheduler_client.get_any_worker_service(ctxt))
         return worker_rpc.get_available_providers(ctxt)
 
     def get_provider_schemas(self, ctxt, platform_name, provider_type):
         # TODO(aznashwan): merge or version/namespace schemas for each worker?
-        worker_rpc = self._get_rpc_client_for_service(
+        worker_rpc = rpc_worker_client.WorkerClient.from_service_definition(
             self._scheduler_client.get_any_worker_service(ctxt))
         return worker_rpc.get_provider_schemas(
             ctxt, platform_name, provider_type)
@@ -631,7 +615,8 @@ class ConductorServerEndpoint(object):
                 exception_details=str(ex))
             raise
 
-        return self._get_rpc_client_for_service(worker_service)
+        return rpc_worker_client.WorkerClient.from_service_definition(
+            worker_service)
 
     def _begin_tasks(
             self, ctxt, action, execution, task_info_override=None,

+ 32 - 30
coriolis/minion_manager/rpc/client.py

@@ -18,66 +18,68 @@ CONF = cfg.CONF
 CONF.register_opts(scheduler_opts, 'minion_manager')
 
 
-class MinionManagerClient(object):
+class MinionManagerClient(rpc.BaseRPCClient):
+
     def __init__(self, timeout=None):
         target = messaging.Target(topic='coriolis_minion_manager', version=VERSION)
         if timeout is None:
             timeout = CONF.minion_manager.minion_mananger_rpc_timeout
-        self._client = rpc.get_client(target, timeout=timeout)
+        super(MinionManagerClient, self).__init__(
+            target, timeout=timeout)
 
     def add_minion_pool_progress_update(
             self, ctxt, minion_pool_id, total_steps, message):
-        return self._client.call(
+        return self._cast(
             ctxt, 'add_minion_pool_progress_update',
             minion_pool_id=minion_pool_id,
             total_steps=total_steps, message=message)
 
     def update_minion_pool_progress_update(
             self, ctxt, minion_pool_id, step, total_steps, message):
-        return self._client.call(
+        return self._cast(
             ctxt, 'update_minion_pool_progress_update',
             minion_pool_id=minion_pool_id,
             step=step, total_steps=total_steps, message=message)
 
     def get_minion_pool_progress_step(self, ctxt, minion_pool_id):
-        return self._client.call(
+        return self._cast(
             ctxt, 'get_minion_pool_progress_step',
             minion_pool_id=minion_pool_id)
 
     def add_minion_pool_event(self, ctxt, minion_pool_id, level, message):
-        return self._client.call(
+        return self._cast(
             ctxt, 'add_minion_pool_event', minion_pool_id=minion_pool_id,
             level=level, message=message)
 
     def get_diagnostics(self, ctxt):
-        return self._client.call(ctxt, 'get_diagnostics')
+        return self._call(ctxt, 'get_diagnostics')
 
     def validate_minion_pool_selections_for_action(self, ctxt, action):
-        return self._client.call(
+        return self._call(
             ctxt, 'validate_minion_pool_selections_for_action',
             action=action)
 
     def allocate_minion_machines_for_replica(
             self, ctxt, replica):
-        return self._client.cast(
+        return self._cast(
             ctxt, 'allocate_minion_machines_for_replica', replica=replica)
 
     def allocate_minion_machines_for_migration(
             self, ctxt, migration, include_transfer_minions=True,
             include_osmorphing_minions=True):
-        return self._client.cast(
+        return self._cast(
             ctxt, 'allocate_minion_machines_for_migration',
             migration=migration,
             include_transfer_minions=include_transfer_minions,
             include_osmorphing_minions=include_osmorphing_minions)
 
     def deallocate_minion_machine(self, ctxt, minion_machine_id):
-         return self._client.cast(
+         return self._cast(
             ctxt, 'deallocate_minion_machine',
             minion_machine_id=minion_machine_id)
 
     def deallocate_minion_machines_for_action(self, ctxt, action_id):
-        return self._client.cast(
+        return self._cast(
             ctxt, 'deallocate_minion_machines_for_action', action_id=action_id)
 
     def create_minion_pool(
@@ -85,7 +87,7 @@ class MinionManagerClient(object):
             environment_options, minimum_minions, maximum_minions,
             minion_max_idle_time, minion_retention_strategy, notes=None,
             skip_allocation=False):
-        return self._client.call(
+        return self._call(
             ctxt, 'create_minion_pool', name=name, endpoint_id=endpoint_id,
             pool_platform=pool_platform, pool_os_type=pool_os_type,
             environment_options=environment_options,
@@ -96,94 +98,94 @@ class MinionManagerClient(object):
             notes=notes, skip_allocation=skip_allocation)
 
     def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
-        return self._client.call(
+        return self._call(
             ctxt, "set_up_shared_minion_pool_resources",
             minion_pool_id=minion_pool_id)
 
     def tear_down_shared_minion_pool_resources(
             self, ctxt, minion_pool_id, force=False):
-        return self._client.call(
+        return self._call(
             ctxt, "tear_down_shared_minion_pool_resources",
             minion_pool_id=minion_pool_id, force=force)
 
     def allocate_minion_pool(self, ctxt, minion_pool_id):
-        return self._client.call(
+        return self._call(
             ctxt, "allocate_minion_pool",
             minion_pool_id=minion_pool_id)
 
     def refresh_minion_pool(self, ctxt, minion_pool_id):
-        return self._client.call(
+        return self._call(
             ctxt, "refresh_minion_pool",
             minion_pool_id=minion_pool_id)
 
     def deallocate_minion_pool(
             self, ctxt, minion_pool_id, force=False):
-        return self._client.call(
+        return self._call(
             ctxt, "deallocate_minion_pool",
             minion_pool_id=minion_pool_id,
             force=force)
 
     def get_minion_pools(self, ctxt):
-        return self._client.call(ctxt, 'get_minion_pools')
+        return self._call(ctxt, 'get_minion_pools')
 
     def get_minion_pool(self, ctxt, minion_pool_id):
-        return self._client.call(
+        return self._call(
             ctxt, 'get_minion_pool', minion_pool_id=minion_pool_id)
 
     def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
-        return self._client.call(
+        return self._call(
             ctxt, 'update_minion_pool',
             minion_pool_id=minion_pool_id, updated_values=updated_values)
 
     def delete_minion_pool(self, ctxt, minion_pool_id):
-        return self._client.call(
+        return self._call(
             ctxt, 'delete_minion_pool', minion_pool_id=minion_pool_id)
 
     def get_minion_pool_lifecycle_executions(
             self, ctxt, minion_pool_id, include_tasks=False):
-        return self._client.call(
+        return self._call(
             ctxt, 'get_minion_pool_lifecycle_executions',
             minion_pool_id=minion_pool_id, include_tasks=include_tasks)
 
     def get_minion_pool_lifecycle_execution(
             self, ctxt, minion_pool_id, execution_id):
-        return self._client.call(
+        return self._call(
             ctxt, 'get_minion_pool_lifecycle_execution',
             minion_pool_id=minion_pool_id, execution_id=execution_id)
 
     def delete_minion_pool_lifecycle_execution(
             self, ctxt, minion_pool_id, execution_id):
-        return self._client.call(
+        return self._call(
             ctxt, 'delete_minion_pool_lifecycle_execution',
             minion_pool_id=minion_pool_id, execution_id=execution_id)
 
     def cancel_minion_pool_lifecycle_execution(
             self, ctxt, minion_pool_id, execution_id, force):
-        return self._client.call(
+        return self._call(
             ctxt, 'cancel_minion_pool_lifecycle_execution',
             minion_pool_id=minion_pool_id, execution_id=execution_id,
             force=force)
 
     def get_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, env, option_names):
-        return self._client.call(
+        return self._call(
             ctxt, 'get_endpoint_source_minion_pool_options',
             endpoint_id=endpoint_id, env=env, option_names=option_names)
 
     def get_endpoint_destination_minion_pool_options(
             self, ctxt, endpoint_id, env, option_names):
-        return self._client.call(
+        return self._call(
             ctxt, 'get_endpoint_destination_minion_pool_options',
             endpoint_id=endpoint_id, env=env, option_names=option_names)
 
     def validate_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
-        return self._client.call(
+        return self._call(
             ctxt, 'validate_endpoint_source_minion_pool_options',
             endpoint_id=endpoint_id, pool_environment=pool_environment)
 
     def validate_endpoint_destination_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
-        return self._client.call(
+        return self._call(
             ctxt, 'validate_endpoint_destination_minion_pool_options',
             endpoint_id=endpoint_id, pool_environment=pool_environment)

+ 1 - 1
coriolis/taskflow/base.py

@@ -209,7 +209,7 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
                 "on worker service." % (
                     self._task_id, self._task_name, task_type))
             res = worker_rpc.run_task(
-                ctxt, None, self._task_id, task_type, origin, destination,
+                ctxt, self._task_id, task_type, origin, destination,
                 self._task_instance, task_info)
             LOG.debug(
                 "[Task '%s'] Taskflow worker task '%s' (type %s) has "