service.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. # Copyright 2016 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import os
  4. import platform
  5. from oslo_concurrency import processutils
  6. from oslo_config import cfg
  7. from oslo_log import log as logging
  8. import oslo_messaging as messaging
  9. from oslo_service import service
  10. from oslo_service import wsgi
  11. from coriolis import rpc
  12. from coriolis import utils
  13. service_opts = [
  14. cfg.StrOpt('api_migration_listen',
  15. default="0.0.0.0",
  16. help='IP address on which the Migration API listens'),
  17. cfg.PortOpt('api_migration_listen_port',
  18. default=7667,
  19. help='Port on which the Migration API listens'),
  20. cfg.IntOpt('api_migration_workers',
  21. help='Number of workers for the Migration API service. '
  22. 'The default is equal to the number of CPUs available.'),
  23. cfg.IntOpt('messaging_workers',
  24. help='Number of workers for the messaging service. '
  25. 'The default is equal to the number of CPUs available.'),
  26. ]
  27. CONF = cfg.CONF
  28. CONF.register_opts(service_opts)
  29. LOG = logging.getLogger(__name__)
  30. def check_locks_dir_empty():
  31. """ Checks whether the locks dir is empty and warns otherwise.
  32. NOTE: external oslo_concurrency locks work based on listing open file
  33. descriptors so this check is not necessarily conclusive, though all freshly
  34. started/restarted conductor services should ideally be given a clean slate.
  35. """
  36. oslo_concurrency_group = getattr(CONF, 'oslo_concurrency', {})
  37. if not oslo_concurrency_group:
  38. LOG.warn("No 'oslo_concurrency' group defined in config file!")
  39. return
  40. locks_dir = oslo_concurrency_group.get('lock_path', "")
  41. if not locks_dir:
  42. LOG.warn("No locks directory path was configured!")
  43. return
  44. if not os.path.exists(locks_dir):
  45. LOG.warn(
  46. "Configured 'lock_path' directory '%s' does NOT exist!", locks_dir)
  47. return
  48. if not os.path.isdir(locks_dir):
  49. LOG.warn(
  50. "Configured 'lock_path' directory '%s' is NOT a directory!",
  51. locks_dir)
  52. return
  53. locks_dir_contents = os.listdir(locks_dir)
  54. if locks_dir_contents:
  55. LOG.warn(
  56. "Configured 'lock_path' directory '%s' is NOT empty: %s",
  57. locks_dir, locks_dir_contents)
  58. return
  59. LOG.info(
  60. "Successfully checked 'lock_path' directory '%s' exists and is empty.",
  61. locks_dir)
  62. class WSGIService(service.ServiceBase):
  63. def __init__(self, name):
  64. self._host = CONF.api_migration_listen
  65. self._port = CONF.api_migration_listen_port
  66. if platform.system() == "Windows":
  67. self._workers = 1
  68. else:
  69. self._workers = (
  70. CONF.api_migration_workers or processutils.get_worker_count())
  71. self._loader = wsgi.Loader(CONF)
  72. self._app = self._loader.load_app(name)
  73. self._server = wsgi.Server(CONF,
  74. name,
  75. self._app,
  76. host=self._host,
  77. port=self._port)
  78. def get_workers_count(self):
  79. return self._workers
  80. def start(self):
  81. self._server.start()
  82. def stop(self):
  83. self._server.stop()
  84. def wait(self):
  85. self._server.wait()
  86. def reset(self):
  87. self._server.reset()
  88. class MessagingService(service.ServiceBase):
  89. def __init__(self, topic, endpoints, version, worker_count=None):
  90. target = messaging.Target(topic=topic,
  91. server=utils.get_hostname(),
  92. version=version)
  93. self._server = rpc.get_server(target, endpoints)
  94. self._workers = (worker_count or CONF.messaging_workers or
  95. processutils.get_worker_count())
  96. def get_workers_count(self):
  97. return self._workers
  98. def start(self):
  99. self._server.start()
  100. def stop(self):
  101. self._server.stop()
  102. def wait(self):
  103. pass
  104. def reset(self):
  105. self._server.reset()