rpc.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # Copyright 2016 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import coriolis.exception
  4. import oslo_messaging as messaging
  5. from oslo_config import cfg
  6. from oslo_log import log as logging
  7. from coriolis import context
  8. rpc_opts = [
  9. cfg.StrOpt('messaging_transport_url',
  10. default="rabbit://guest:guest@127.0.0.1:5672/",
  11. help='Messaging transport url'),
  12. cfg.IntOpt('default_messaging_timeout',
  13. default=60,
  14. help='Number of seconds for messaging timeouts.')
  15. ]
  16. CONF = cfg.CONF
  17. CONF.register_opts(rpc_opts)
  18. LOG = logging.getLogger(__name__)
  19. ALLOWED_EXMODS = [
  20. coriolis.exception.__name__]
  21. _TRANSPORT = None
  22. class RequestContextSerializer(messaging.Serializer):
  23. def __init__(self, base):
  24. self._base = base
  25. def serialize_entity(self, ctxt, entity):
  26. if not self._base:
  27. return entity
  28. return self._base.serialize_entity(ctxt, entity)
  29. def deserialize_entity(self, ctxt, entity):
  30. if not self._base:
  31. return entity
  32. return self._base.deserialize_entity(ctxt, entity)
  33. def serialize_context(self, ctxt):
  34. return ctxt.to_dict()
  35. def deserialize_context(self, ctxt):
  36. return context.RequestContext.from_dict(ctxt)
  37. def _get_transport():
  38. return messaging.get_transport(
  39. cfg.CONF, CONF.messaging_transport_url,
  40. allowed_remote_exmods=ALLOWED_EXMODS)
  41. def get_server(target, endpoints, serializer=None):
  42. serializer = RequestContextSerializer(serializer)
  43. return messaging.get_rpc_server(_get_transport(), target, endpoints,
  44. executor='eventlet',
  45. serializer=serializer)
  46. def init():
  47. global _TRANSPORT
  48. if _TRANSPORT is None:
  49. _TRANSPORT = _get_transport()
  50. return _TRANSPORT
  51. class BaseRPCClient(object):
  52. """ Wrapper for 'oslo_messaging.RPCClient' which automatically
  53. instantiates and cleans up transports for each call.
  54. """
  55. def __init__(self, target, timeout=None, serializer=None):
  56. self._target = target
  57. self._timeout = timeout
  58. if self._timeout is None:
  59. self._timeout = CONF.default_messaging_timeout
  60. self._serializer = RequestContextSerializer(serializer)
  61. self._transport_conn = None
  62. def __repr__(self):
  63. return "<RPCClient(target=%s, timeout=%s)>" % (
  64. self._target, self._timeout)
  65. @property
  66. def _transport(self):
  67. global _TRANSPORT
  68. if _TRANSPORT is None:
  69. if self._transport_conn is None:
  70. self._transport_conn = _get_transport()
  71. return self._transport_conn
  72. else:
  73. return _TRANSPORT
  74. def _rpc_client(self):
  75. return messaging.RPCClient(
  76. self._transport, self._target,
  77. serializer=self._serializer,
  78. timeout=self._timeout)
  79. def _call(self, ctxt, method, **kwargs):
  80. client = self._rpc_client()
  81. return client.call(ctxt, method, **kwargs)
  82. def _call_on_host(self, host, ctxt, method, **kwargs):
  83. client = self._rpc_client()
  84. cctxt = client.prepare(server=host)
  85. return cctxt.call(ctxt, method, **kwargs)
  86. def _cast(self, ctxt, method, **kwargs):
  87. client = self._rpc_client()
  88. client.cast(ctxt, method, **kwargs)
  89. def _cast_for_host(self, host, ctxt, method, **kwargs):
  90. client = self._rpc_client()
  91. cctxt = client.prepare(server=host)
  92. cctxt.cast(ctxt, method, **kwargs)