Explorar o código

Adds RequestContext serialization

Alessandro Pilotti %!s(int64=10) %!d(string=hai) anos
pai
achega
a2ad6cf2f2

+ 2 - 6
coriolis/conductor/rpc/client.py

@@ -1,8 +1,6 @@
-from oslo_config import cfg
 import oslo_messaging as messaging
 
-CONF = cfg.CONF
-CONF.import_opt("messaging_transport_url", "coriolis.service")
+from coriolis import rpc
 
 VERSION = "1.0"
 
@@ -10,9 +8,7 @@ VERSION = "1.0"
 class ConductorClient(object):
     def __init__(self):
         target = messaging.Target(topic='coriolis_conductor', version=VERSION)
-        transport = messaging.get_transport(
-            cfg.CONF, CONF.messaging_transport_url)
-        self._client = messaging.RPCClient(transport, target)
+        self._client = rpc.get_client(target)
 
     def get_migrations(self, ctxt):
         return self._client.call(ctxt, 'get_migrations')

+ 3 - 4
coriolis/conductor/rpc/server.py

@@ -3,7 +3,6 @@ import uuid
 import json
 
 from oslo_log import log as logging
-import oslo_messaging as messaging
 
 from coriolis import constants
 from coriolis.db import api as db_api
@@ -45,14 +44,14 @@ class ConductorServerEndpoint(object):
 
         for task in migration.tasks:
             self._rpc_worker_client.begin_export_instance(
-                ctxt.to_dict(), task.id, origin, instance)
+                ctxt, task.id, origin, instance)
 
     def stop_instances_migration(self, ctxt, migration_id):
         migration = db_api.get_migration(ctxt, migration_id)
         for task in migration.tasks:
             if task.status == constants.TASK_STATUS_STARTED:
                 self._rpc_worker_client.stop_task(
-                    ctxt.to_dict(), task.host, task.process_id)
+                    ctxt, task.host, task.process_id)
 
     def set_task_host(self, ctxt, task_id, host, process_id):
         db_api.set_task_host(ctxt, task_id, host, process_id)
@@ -72,7 +71,7 @@ class ConductorServerEndpoint(object):
         db_api.add(ctxt, op_import)
 
         self._rpc_worker_client.begin_import_instance(
-            ctxt.to_dict(), op_export.host, op_import.id,
+            ctxt, op_export.host, op_import.id,
             json.loads(op_import.migration.destination),
             op_import.instance,
             export_info)

+ 5 - 6
coriolis/migrations/api.py

@@ -6,15 +6,14 @@ class API(object):
         self._rpc_client = rpc_client.ConductorClient()
 
     def start(self, ctxt, origin, destination, instances):
-        self._rpc_client.begin_migrate_instances(
-            ctxt.to_dict(), origin, destination, instances)
+        self._rpc_client.begin_migrate_instances(ctxt, origin, destination,
+                                                 instances)
 
     def stop(ctxt, self, migration_id):
-        self._rpc_client.stop_instances_migration(
-            ctxt.to_dict(), migration_id)
+        self._rpc_client.stop_instances_migration(ctxt, migration_id)
 
     def get_migrations(self, ctxt):
-        return self._rpc_client.get_migrations(ctxt.to_dict())
+        return self._rpc_client.get_migrations(ctxt)
 
     def get_migration(self, ctxt, migration_id):
-        return self._rpc_client.get_migration(ctxt.to_dict(), migration_id)
+        return self._rpc_client.get_migration(ctxt, migration_id)

+ 51 - 0
coriolis/rpc.py

@@ -0,0 +1,51 @@
+from oslo_config import cfg
+import oslo_messaging as messaging
+
+from coriolis import context
+
+rpc_opts = [
+    cfg.StrOpt('messaging_transport_url',
+               default="rabbit://guest:guest@127.0.0.1:5672/",
+               help='Messaging transport url'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(rpc_opts)
+
+
+class RequestContextSerializer(messaging.Serializer):
+
+    def __init__(self, base):
+        self._base = base
+
+    def serialize_entity(self, ctxt, entity):
+        if not self._base:
+            return entity
+        return self._base.serialize_entity(ctxt, entity)
+
+    def deserialize_entity(self, ctxt, entity):
+        if not self._base:
+            return entity
+        return self._base.deserialize_entity(ctxt, entity)
+
+    def serialize_context(self, ctxt):
+        return ctxt.to_dict()
+
+    def deserialize_context(self, ctxt):
+        return context.RequestContext.from_dict(ctxt)
+
+
+def get_client(target, serializer=None):
+    transport = messaging.get_transport(cfg.CONF, CONF.messaging_transport_url)
+    serializer = RequestContextSerializer(serializer)
+
+    return messaging.RPCClient(transport, target, serializer=serializer)
+
+
+def get_server(target, endpoints, serializer=None):
+    transport = messaging.get_transport(cfg.CONF, CONF.messaging_transport_url)
+    serializer = RequestContextSerializer(serializer)
+
+    return messaging.get_rpc_server(transport, target, endpoints,
+                                    executor='eventlet',
+                                    serializer=serializer)

+ 5 - 9
coriolis/service.py

@@ -7,6 +7,7 @@ import oslo_messaging as messaging
 from oslo_service import service
 from oslo_service import wsgi
 
+from coriolis import rpc
 from coriolis import utils
 
 
@@ -20,9 +21,6 @@ service_opts = [
     cfg.IntOpt('api_migration_workers',
                help='Number of workers for the Migration API service. '
                     'The default is equal to the number of CPUs available.'),
-    cfg.StrOpt('messaging_transport_url',
-               default="rabbit://guest:guest@127.0.0.1:5672/",
-               help='Messaging transport url'),
     cfg.IntOpt('messaging_workers',
                help='Number of workers for the messaging service. '
                     'The default is equal to the number of CPUs available.'),
@@ -67,12 +65,10 @@ class WSGIService(service.ServiceBase):
 
 class MessagingService(service.ServiceBase):
     def __init__(self, topic, endpoints, version):
-        target = messaging.Target(
-            topic=topic, server=utils.get_hostname(), version=version)
-        transport = messaging.get_transport(CONF, CONF.messaging_transport_url)
-
-        self._server = messaging.get_rpc_server(
-            transport, target, endpoints, executor='eventlet')
+        target = messaging.Target(topic=topic,
+                                  server=utils.get_hostname(),
+                                  version=version)
+        self._server = rpc.get_server(target, endpoints)
 
         self._workers = (CONF.messaging_workers or
                          processutils.get_worker_count())

+ 2 - 6
coriolis/worker/rpc/client.py

@@ -1,8 +1,6 @@
-from oslo_config import cfg
 import oslo_messaging as messaging
 
-CONF = cfg.CONF
-CONF.import_opt("messaging_transport_url", "coriolis.service")
+from coriolis import rpc
 
 VERSION = "1.0"
 
@@ -10,9 +8,7 @@ VERSION = "1.0"
 class WorkerClient(object):
     def __init__(self):
         target = messaging.Target(topic='coriolis_worker', version=VERSION)
-        transport = messaging.get_transport(
-            cfg.CONF, CONF.messaging_transport_url)
-        self._client = messaging.RPCClient(transport, target)
+        self._client = rpc.get_client(target)
 
     def begin_export_instance(self, ctxt, task_id, origin, instance):
         self._client.cast(