rpc.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # Copyright 2016 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import contextlib
  4. import oslo_messaging as messaging
  5. from oslo_config import cfg
  6. from oslo_log import log as logging
  7. import coriolis.exception
  8. from coriolis import context
  9. from coriolis import utils
  10. rpc_opts = [
  11. cfg.StrOpt('messaging_transport_url',
  12. default="rabbit://guest:guest@127.0.0.1:5672/",
  13. help='Messaging transport url'),
  14. cfg.IntOpt('default_messaging_timeout',
  15. default=60,
  16. help='Number of seconds for messaging timeouts.')
  17. ]
  18. CONF = cfg.CONF
  19. CONF.register_opts(rpc_opts)
  20. LOG = logging.getLogger(__name__)
  21. ALLOWED_EXMODS = [
  22. coriolis.exception.__name__]
  23. _TRANSPORT = None
  24. class RequestContextSerializer(messaging.Serializer):
  25. def __init__(self, base):
  26. self._base = base
  27. def serialize_entity(self, ctxt, entity):
  28. if not self._base:
  29. return entity
  30. return self._base.serialize_entity(ctxt, entity)
  31. def deserialize_entity(self, ctxt, entity):
  32. if not self._base:
  33. return entity
  34. return self._base.deserialize_entity(ctxt, entity)
  35. def serialize_context(self, ctxt):
  36. return ctxt.to_dict()
  37. def deserialize_context(self, ctxt):
  38. return context.RequestContext.from_dict(ctxt)
  39. def _get_transport():
  40. return messaging.get_transport(
  41. cfg.CONF, CONF.messaging_transport_url,
  42. allowed_remote_exmods=ALLOWED_EXMODS)
  43. def get_server(target, endpoints, serializer=None):
  44. serializer = RequestContextSerializer(serializer)
  45. return messaging.get_rpc_server(_get_transport(), target, endpoints,
  46. executor='eventlet',
  47. serializer=serializer)
  48. def init():
  49. global _TRANSPORT
  50. if _TRANSPORT is None:
  51. _TRANSPORT = _get_transport()
  52. return _TRANSPORT
  53. class BaseRPCClient(object):
  54. """ Wrapper for 'oslo_messaging.RPCClient' which automatically
  55. instantiates and cleans up transports for each call.
  56. """
  57. def __init__(self, target, timeout=None, serializer=None):
  58. self._target = target
  59. self._timeout = timeout
  60. if self._timeout is None:
  61. self._timeout = CONF.default_messaging_timeout
  62. self._serializer = RequestContextSerializer(serializer)
  63. self._transport_conn = None
  64. def __repr__(self):
  65. return "<RPCClient(target=%s, timeout=%s)>" % (
  66. self._target, self._timeout)
  67. @property
  68. def _transport(self):
  69. global _TRANSPORT
  70. if _TRANSPORT is None:
  71. if self._transport_conn is None:
  72. self._transport_conn = _get_transport()
  73. return self._transport_conn
  74. else:
  75. return _TRANSPORT
  76. def _rpc_client(self):
  77. return messaging.RPCClient(
  78. self._transport, self._target,
  79. serializer=self._serializer,
  80. timeout=self._timeout)
  81. def _call(self, ctxt, method, **kwargs):
  82. client = self._rpc_client()
  83. return client.call(ctxt, method, **kwargs)
  84. def _call_on_host(self, host, ctxt, method, **kwargs):
  85. client = self._rpc_client()
  86. cctxt = client.prepare(server=host)
  87. return cctxt.call(ctxt, method, **kwargs)
  88. def _cast(self, ctxt, method, **kwargs):
  89. client = self._rpc_client()
  90. client.cast(ctxt, method, **kwargs)
  91. def _cast_for_host(self, host, ctxt, method, **kwargs):
  92. client = self._rpc_client()
  93. cctxt = client.prepare(server=host)
  94. cctxt.cast(ctxt, method, **kwargs)