harness.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. # Copyright 2026 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. """
  4. Base test harness for Coriolis integration tests.
  5. Starts conductor, scheduler, and worker services in-process using
  6. oslo.messaging's fake:// transport and a temporary SQLite database. Serves
  7. the Coriolis REST API via cheroot on a random local port. No RabbitMQ,
  8. Keystone, or Barbican are required.
  9. Tasks are executed in-process as greenlets rather than subprocesses. The
  10. fake:// oslo.messaging transport is in-memory and process-local; subprocess
  11. tasks would initialise their own isolated transport with no conductor listener,
  12. causing every event-handler RPC call from the task to block indefinitely.
  13. Must be run as root (scsi_debug block device setup requires it).
  14. """
  15. import atexit
  16. import importlib
  17. import os
  18. import queue
  19. import shutil
  20. import socket
  21. import subprocess
  22. import tempfile
  23. from unittest import mock
  24. import uuid
  25. from cheroot.workers import threadpool as cheroot_threadpool
  26. from cheroot import wsgi as cheroot_wsgi
  27. from oslo_config import cfg
  28. from oslo_log import log as logging
  29. from oslo_middleware import request_id as request_id_middleware
  30. from oslo_service import wsgi as base_wsgi
  31. import webob.dec
  32. from coriolis import api as api_module
  33. from coriolis.api.middleware import fault as fault_middleware
  34. from coriolis.api.v1 import router as api_v1_router
  35. from coriolis.api import wsgi as api_wsgi
  36. from coriolis.conductor.rpc import server as conductor_rpc_server
  37. from coriolis import conf as coriolis_conf
  38. from coriolis import constants
  39. from coriolis import context
  40. from coriolis.db import api as db_api
  41. from coriolis.db.sqlalchemy import api as sqlalchemy_api
  42. from coriolis.db.sqlalchemy import migration as db_migration
  43. from coriolis.deployer_manager.rpc import server as deployer_manager_rpc_server
  44. from coriolis import exception
  45. from coriolis import policy as policy_module
  46. from coriolis import rpc as rpc_module
  47. from coriolis.scheduler.rpc import server as scheduler_rpc_server
  48. from coriolis import service
  49. from coriolis.tasks import factory as task_runners_factory
  50. from coriolis.tests.integration import utils as test_utils
  51. from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server
  52. from coriolis import utils as coriolis_utils
  53. from coriolis.worker.rpc import server as worker_rpc_server
  54. CONF = cfg.CONF
  55. LOG = logging.getLogger(__name__)
  56. # Dotted paths to the export (source) and import (destination) provider
  57. # classes.
  58. _TEST_EXPORT_PROVIDER = (
  59. "coriolis.tests.integration.providers.test_provider.exp.TestExportProvider"
  60. )
  61. _TEST_IMPORT_PROVIDER = (
  62. "coriolis.tests.integration.providers.test_provider.imp.TestImportProvider"
  63. )
  64. # Fixed project used for all test requests.
  65. _TEST_PROJECT_ID = 'integration-project'
  66. def _provider_platform(dotted_path):
  67. """Return the ``platform`` attribute of the class at *dotted_path*."""
  68. module_path, class_name = dotted_path.rsplit('.', 1)
  69. cls = getattr(importlib.import_module(module_path), class_name)
  70. return cls.platform
  71. class DaemonCherootWorker(cheroot_threadpool.WorkerThread):
  72. def __init__(self, *args, **kwargs):
  73. super().__init__(*args, **kwargs)
  74. self.daemon = True
  75. # Mark the cheroot threads as daemons so that the main thread won't
  76. # wait for them when closing. The WSGI will be stopped using "atexit",
  77. # which runs a bit later.
  78. #
  79. # Possible alternatives if this becomes a problem:
  80. # * Use threading._register_atexit instead of atexit
  81. # * may become public:
  82. # https://github.com/python/cpython/issues/86128
  83. # * Cleanup the harness in tearDownClass
  84. # * we tried to avoid spinning up the services for every test class
  85. # * Move the api services to a separate process
  86. # * we're currently relying on the "fake" messaging backend, which
  87. # doesn't work with separate processes.
  88. cheroot_threadpool.WorkerThread = DaemonCherootWorker
  89. class _NoAuthMiddleware(api_wsgi.Middleware):
  90. """Injects a fixed admin RequestContext; replaces keystonecontext."""
  91. @webob.dec.wsgify(RequestClass=api_wsgi.Request)
  92. def __call__(self, req):
  93. req.environ['coriolis.context'] = context.RequestContext(
  94. user='integration-test',
  95. project_id=_TEST_PROJECT_ID,
  96. is_admin=True,
  97. # Skip Keystone trust creation / deletion.
  98. trust_id='integration-dummy-trust',
  99. )
  100. return self.application
  101. class _TestAPIRouter(api_v1_router.APIRouter):
  102. """V1 API router using APIMapper (no /{project_id}/ path prefix).
  103. The production router uses ProjectMapper which adds /{project_id}/ to
  104. every route. For tests the coriolisclient sends paths without a
  105. project_id segment, so we use the plain APIMapper instead.
  106. """
  107. def __init__(self):
  108. ext_mgr = self.ExtensionManager()
  109. mapper = api_module.APIMapper()
  110. self.resources = {}
  111. self._setup_routes(mapper, ext_mgr)
  112. self._setup_ext_routes(mapper, ext_mgr)
  113. self._setup_extensions(ext_mgr)
  114. base_wsgi.Router.__init__(self, mapper)
  115. def _setup_routes(self, mapper, ext_mgr):
  116. super()._setup_routes(mapper, ext_mgr)
  117. # super()._setup_routes registers action routes with a hardcoded
  118. # /{project_id}/ prefix via mapper.connect(). The test client sends
  119. # paths without that prefix, so add a matching prefix-less route here.
  120. action_url_pairs = [
  121. ('minion_pool_actions', '/minion_pools/{id}/actions'),
  122. ('endpoint_actions', '/endpoints/{id}/actions'),
  123. ('deployment_actions', '/deployments/{id}/actions'),
  124. ('transfer_actions', '/transfers/{id}/actions'),
  125. ('transfer_tasks_execution_actions',
  126. '/transfers/{transfer_id}/executions/{id}/actions'),
  127. ]
  128. for action, url in action_url_pairs:
  129. mapper.connect(
  130. action,
  131. url,
  132. controller=self.resources[action],
  133. action='action',
  134. conditions={'method': 'POST'},
  135. )
  136. class _InProcessWorkerServerEndpoint(worker_rpc_server.WorkerServerEndpoint):
  137. """Worker endpoint that runs tasks as greenlets instead of subprocesses.
  138. The fake:// transport is in-memory and process-local. A subprocess would
  139. initialise its own isolated fake:// instance with no conductor listener, so
  140. every RPC call made by the task's event handler would block indefinitely.
  141. """
  142. def _exec_task_process(
  143. self, ctxt, task_id, task_type, origin, destination, instance,
  144. task_info, report_to_conductor=True):
  145. result_q = queue.Queue()
  146. if report_to_conductor:
  147. self._rpc_conductor_client.set_task_host(
  148. ctxt, task_id, self._server)
  149. self._rpc_conductor_client.set_task_process(
  150. ctxt, task_id, os.getpid())
  151. def _run():
  152. try:
  153. task_runner = task_runners_factory.get_task_runner_class(
  154. task_type)()
  155. event_handler = (
  156. worker_rpc_server._get_event_handler_for_task_type(
  157. task_type, ctxt, task_id))
  158. task_result = task_runner.run(
  159. ctxt, instance, origin, destination, task_info,
  160. event_handler)
  161. coriolis_utils.is_serializable(task_result)
  162. result_q.put(task_result)
  163. except Exception as ex:
  164. LOG.exception(ex)
  165. result_q.put(str(ex))
  166. thread = coriolis_utils.start_thread(_run)
  167. thread.join()
  168. result = result_q.get_nowait()
  169. if isinstance(result, str):
  170. raise exception.TaskProcessException(result)
  171. return result
  172. class _IntegrationHarness:
  173. """Shared Integration tests infrastructure; created once per process.
  174. The first call to ``_IntegrationHarness.get()`` performs the full setup:
  175. temp workspace, CONF overrides, DB sync, and service startup. Subsequent
  176. calls return the same instance. Teardown is registered with ``atexit`` so
  177. it runs after all test classes have finished, not after the first one.
  178. """
  179. _instance = None
  180. @classmethod
  181. def get(cls):
  182. if cls._instance is None:
  183. cls._instance = cls()
  184. return cls._instance
  185. def __init__(self):
  186. self.workdir = tempfile.mkdtemp(prefix="coriolis-integration-")
  187. self.lock_path = os.path.join(self.workdir, "locks")
  188. os.makedirs(self.lock_path)
  189. self._mysql_container_name = "coriolis-test-mysql-%s" % str(
  190. uuid.uuid4()).split("-")[0]
  191. self._mysql_username = "root"
  192. self._mysql_password = "coriolis"
  193. self._mysql_database = "coriolis"
  194. self.ssh_key_path = os.path.join(self.workdir, "id_rsa")
  195. subprocess.run(
  196. ["ssh-keygen", "-t", "rsa", "-b", "2048",
  197. "-f", self.ssh_key_path, "-N", ""],
  198. check=True,
  199. stdout=subprocess.DEVNULL,
  200. stderr=subprocess.DEVNULL,
  201. )
  202. coriolis_conf.init_common_opts()
  203. cfg.CONF([], project='coriolis', version='1.0.0',
  204. default_config_files=[], default_config_dirs=[])
  205. cfg.CONF.set_override('messaging_transport_url', 'fake://')
  206. cfg.CONF.set_override(
  207. 'providers', [_TEST_EXPORT_PROVIDER, _TEST_IMPORT_PROVIDER])
  208. db_url = ('mysql+pymysql://%(user)s:%(password)s'
  209. '@localhost:13306/%(database)s') % {
  210. "user": self._mysql_username,
  211. "password": self._mysql_password,
  212. "database": self._mysql_database,
  213. }
  214. cfg.CONF.set_override(
  215. 'connection', db_url, group='database')
  216. cfg.CONF.set_override(
  217. 'retry_interval', 1, group='database')
  218. cfg.CONF.set_override(
  219. 'lock_path', self.lock_path, group='oslo_concurrency')
  220. coriolis_utils.setup_logging()
  221. test_utils.init_scsi_debug()
  222. # Policy enforcer: reset so it re-reads the new CONF (no policy file).
  223. policy_module.reset()
  224. self.exp_provider_platform = _provider_platform(_TEST_EXPORT_PROVIDER)
  225. self.imp_provider_platform = _provider_platform(_TEST_IMPORT_PROVIDER)
  226. self._wsgi_server = None
  227. self._wsgi_server_thread = None
  228. self.api_port = None
  229. self._conductor_svc = None
  230. self._scheduler_svc = None
  231. self._transfer_cron_svc = None
  232. self._deployer_manager_svc = None
  233. self._worker_svc = None
  234. self._worker_host_svc = None
  235. # SQLAlchemy facade and RPC transport are module-level singletons;
  236. # reset them so they are re-created from the new CONF values.
  237. sqlalchemy_api._facade = None
  238. rpc_module._TRANSPORT = None
  239. atexit.register(self._teardown)
  240. self._start_db_container()
  241. engine = db_api.get_engine()
  242. db_migration.db_sync(engine)
  243. self._start_coriolis_services()
  244. def _start_db_container(self):
  245. coriolis_utils.exec_process(
  246. [
  247. "docker",
  248. "run",
  249. "-d",
  250. "--name",
  251. self._mysql_container_name,
  252. "-e",
  253. f"MYSQL_ROOT_PASSWORD={self._mysql_password}",
  254. "-e", f"MYSQL_DATABASE={self._mysql_database}",
  255. "-p", "13306:3306",
  256. "mariadb:10-jammy",
  257. ])
  258. def _start_coriolis_services(self):
  259. """Start conductor, scheduler, worker, and API in-process."""
  260. rpc_module.init()
  261. # Conductor: must start first so the worker can register with it.
  262. conductor_endpoint = conductor_rpc_server.ConductorServerEndpoint()
  263. conductor_endpoint._licensing_client = None
  264. conductor_endpoint._minion_manager_client_instance = mock.MagicMock()
  265. self._conductor_svc = service.MessagingService(
  266. constants.CONDUCTOR_MAIN_MESSAGING_TOPIC,
  267. [conductor_endpoint],
  268. conductor_rpc_server.VERSION,
  269. worker_count=1,
  270. init_rpc=False,
  271. )
  272. self._conductor_svc.start()
  273. self._scheduler_svc = service.MessagingService(
  274. constants.SCHEDULER_MAIN_MESSAGING_TOPIC,
  275. [scheduler_rpc_server.SchedulerServerEndpoint()],
  276. scheduler_rpc_server.VERSION,
  277. worker_count=1,
  278. init_rpc=False,
  279. )
  280. self._scheduler_svc.start()
  281. # Transfer-cron: constructor makes an RPC call to the conductor to load
  282. # existing schedules, so the conductor must be running first.
  283. self._transfer_cron_svc = service.MessagingService(
  284. constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC,
  285. [transfer_cron_rpc_server.TransferCronServerEndpoint()],
  286. transfer_cron_rpc_server.VERSION,
  287. worker_count=1,
  288. init_rpc=False,
  289. )
  290. self._transfer_cron_svc.start()
  291. self._deployer_manager_svc = service.MessagingService(
  292. constants.DEPLOYER_MANAGER_MAIN_MESSAGING_TOPIC,
  293. [deployer_manager_rpc_server.DeployerManagerServerEndpoint()],
  294. deployer_manager_rpc_server.VERSION,
  295. worker_count=1,
  296. init_rpc=False,
  297. )
  298. self._deployer_manager_svc.start()
  299. # Worker: constructor calls _register_worker_service() which makes a
  300. # blocking RPC call to the conductor, so the conductor must already be
  301. # listening.
  302. #
  303. # We reuse the same endpoint instance for both the main topic and the
  304. # host-specific topic (coriolis_worker.{hostname}) to avoid a double
  305. # service registration. The fake:// transport uses literal string
  306. # matching instead of AMQP topic routing, so the host-specific topic
  307. # must be served explicitly; otherwise the conductor's WorkerClient
  308. # (which routes via SERVICE_MESSAGING_TOPIC_FORMAT) would send to a
  309. # queue that nobody reads.
  310. _worker_endpoint = _InProcessWorkerServerEndpoint()
  311. self._worker_svc = service.MessagingService(
  312. constants.WORKER_MAIN_MESSAGING_TOPIC,
  313. [_worker_endpoint],
  314. worker_rpc_server.VERSION,
  315. worker_count=1,
  316. init_rpc=False,
  317. )
  318. self._worker_svc.start()
  319. _worker_host_topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
  320. "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
  321. "host": coriolis_utils.get_hostname(),
  322. }
  323. self._worker_host_svc = service.MessagingService(
  324. _worker_host_topic,
  325. [_worker_endpoint],
  326. worker_rpc_server.VERSION,
  327. worker_count=1,
  328. init_rpc=False,
  329. )
  330. self._worker_host_svc.start()
  331. # API: build the WSGI stack without keystonemiddleware and serve it
  332. # on a random local port.
  333. wsgi_app = _TestAPIRouter()
  334. wsgi_app = _NoAuthMiddleware(wsgi_app)
  335. wsgi_app = fault_middleware.FaultWrapper(wsgi_app)
  336. wsgi_app = request_id_middleware.RequestId(wsgi_app)
  337. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  338. s.bind(('127.0.0.1', 0))
  339. s.listen(1)
  340. # Pick an available port.
  341. self.api_port = s.getsockname()[1]
  342. self._wsgi_server = cheroot_wsgi.Server(
  343. bind_addr=("127.0.0.1", self.api_port),
  344. wsgi_app=wsgi_app,
  345. server_name="coriolis-api",
  346. )
  347. self._wsgi_server.prepare()
  348. self._wsgi_server_thread = coriolis_utils.start_thread(
  349. self._wsgi_server.serve,
  350. daemon=True,
  351. )
  352. def _teardown(self):
  353. LOG.info("Teardown initiated.")
  354. try:
  355. coriolis_utils.exec_process(
  356. [
  357. "docker",
  358. "stop",
  359. self._mysql_container_name
  360. ])
  361. coriolis_utils.exec_process(
  362. [
  363. "docker",
  364. "rm",
  365. self._mysql_container_name
  366. ])
  367. except Exception:
  368. pass
  369. for svc in [self._worker_host_svc, self._worker_svc,
  370. self._deployer_manager_svc, self._transfer_cron_svc,
  371. self._scheduler_svc, self._conductor_svc]:
  372. if not svc:
  373. continue
  374. try:
  375. svc.stop()
  376. except Exception:
  377. pass
  378. if self._wsgi_server:
  379. try:
  380. self._wsgi_server.stop()
  381. self._wsgi_server_thread.join()
  382. except Exception:
  383. pass
  384. shutil.rmtree(self.workdir, True)
  385. try:
  386. test_utils.destroy_scsi_debug()
  387. except Exception:
  388. pass