| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- # Copyright 2026 Cloudbase Solutions Srl
- # All Rights Reserved.
- """
- Base test harness for Coriolis integration tests.
- Starts conductor, scheduler, and worker services in-process using
- oslo.messaging's fake:// transport and a temporary SQLite database. Serves
- the Coriolis REST API via cheroot on a random local port. No RabbitMQ,
- Keystone, or Barbican are required.
- Tasks are executed in-process as greenlets rather than subprocesses. The
- fake:// oslo.messaging transport is in-memory and process-local; subprocess
- tasks would initialise their own isolated transport with no conductor listener,
- causing every event-handler RPC call from the task to block indefinitely.
- Must be run as root (scsi_debug block device setup requires it).
- """
- import atexit
- import importlib
- import os
- import queue
- import shutil
- import socket
- import subprocess
- import tempfile
- from unittest import mock
- import uuid
- from cheroot.workers import threadpool as cheroot_threadpool
- from cheroot import wsgi as cheroot_wsgi
- from oslo_config import cfg
- from oslo_log import log as logging
- from oslo_middleware import request_id as request_id_middleware
- from oslo_service import wsgi as base_wsgi
- import webob.dec
- from coriolis import api as api_module
- from coriolis.api.middleware import fault as fault_middleware
- from coriolis.api.v1 import router as api_v1_router
- from coriolis.api import wsgi as api_wsgi
- from coriolis.conductor.rpc import server as conductor_rpc_server
- from coriolis import conf as coriolis_conf
- from coriolis import constants
- from coriolis import context
- from coriolis.db import api as db_api
- from coriolis.db.sqlalchemy import api as sqlalchemy_api
- from coriolis.db.sqlalchemy import migration as db_migration
- from coriolis.deployer_manager.rpc import server as deployer_manager_rpc_server
- from coriolis import exception
- from coriolis import policy as policy_module
- from coriolis import rpc as rpc_module
- from coriolis.scheduler.rpc import server as scheduler_rpc_server
- from coriolis import service
- from coriolis.tasks import factory as task_runners_factory
- from coriolis.tests.integration import utils as test_utils
- from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server
- from coriolis import utils as coriolis_utils
- from coriolis.worker.rpc import server as worker_rpc_server
- CONF = cfg.CONF
- LOG = logging.getLogger(__name__)
- # Dotted paths to the export (source) and import (destination) provider
- # classes.
- _TEST_EXPORT_PROVIDER = (
- "coriolis.tests.integration.providers.test_provider.exp.TestExportProvider"
- )
- _TEST_IMPORT_PROVIDER = (
- "coriolis.tests.integration.providers.test_provider.imp.TestImportProvider"
- )
- # Fixed project used for all test requests.
- _TEST_PROJECT_ID = 'integration-project'
- def _provider_platform(dotted_path):
- """Return the ``platform`` attribute of the class at *dotted_path*."""
- module_path, class_name = dotted_path.rsplit('.', 1)
- cls = getattr(importlib.import_module(module_path), class_name)
- return cls.platform
- class DaemonCherootWorker(cheroot_threadpool.WorkerThread):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.daemon = True
- # Mark the cheroot threads as daemons so that the main thread won't
- # wait for them when closing. The WSGI will be stopped using "atexit",
- # which runs a bit later.
- #
- # Possible alternatives if this becomes a problem:
- # * Use threading._register_atexit instead of atexit
- # * may become public:
- # https://github.com/python/cpython/issues/86128
- # * Cleanup the harness in tearDownClass
- # * we tried to avoid spinning up the services for every test class
- # * Move the api services to a separate process
- # * we're currently relying on the "fake" messaging backend, which
- # doesn't work with separate processes.
- cheroot_threadpool.WorkerThread = DaemonCherootWorker
- class _NoAuthMiddleware(api_wsgi.Middleware):
- """Injects a fixed admin RequestContext; replaces keystonecontext."""
- @webob.dec.wsgify(RequestClass=api_wsgi.Request)
- def __call__(self, req):
- req.environ['coriolis.context'] = context.RequestContext(
- user='integration-test',
- project_id=_TEST_PROJECT_ID,
- is_admin=True,
- # Skip Keystone trust creation / deletion.
- trust_id='integration-dummy-trust',
- )
- return self.application
- class _TestAPIRouter(api_v1_router.APIRouter):
- """V1 API router using APIMapper (no /{project_id}/ path prefix).
- The production router uses ProjectMapper which adds /{project_id}/ to
- every route. For tests the coriolisclient sends paths without a
- project_id segment, so we use the plain APIMapper instead.
- """
- def __init__(self):
- ext_mgr = self.ExtensionManager()
- mapper = api_module.APIMapper()
- self.resources = {}
- self._setup_routes(mapper, ext_mgr)
- self._setup_ext_routes(mapper, ext_mgr)
- self._setup_extensions(ext_mgr)
- base_wsgi.Router.__init__(self, mapper)
- def _setup_routes(self, mapper, ext_mgr):
- super()._setup_routes(mapper, ext_mgr)
- # super()._setup_routes registers action routes with a hardcoded
- # /{project_id}/ prefix via mapper.connect(). The test client sends
- # paths without that prefix, so add a matching prefix-less route here.
- action_url_pairs = [
- ('minion_pool_actions', '/minion_pools/{id}/actions'),
- ('endpoint_actions', '/endpoints/{id}/actions'),
- ('deployment_actions', '/deployments/{id}/actions'),
- ('transfer_actions', '/transfers/{id}/actions'),
- ('transfer_tasks_execution_actions',
- '/transfers/{transfer_id}/executions/{id}/actions'),
- ]
- for action, url in action_url_pairs:
- mapper.connect(
- action,
- url,
- controller=self.resources[action],
- action='action',
- conditions={'method': 'POST'},
- )
- class _InProcessWorkerServerEndpoint(worker_rpc_server.WorkerServerEndpoint):
- """Worker endpoint that runs tasks as greenlets instead of subprocesses.
- The fake:// transport is in-memory and process-local. A subprocess would
- initialise its own isolated fake:// instance with no conductor listener, so
- every RPC call made by the task's event handler would block indefinitely.
- """
- def _exec_task_process(
- self, ctxt, task_id, task_type, origin, destination, instance,
- task_info, report_to_conductor=True):
- result_q = queue.Queue()
- if report_to_conductor:
- self._rpc_conductor_client.set_task_host(
- ctxt, task_id, self._server)
- self._rpc_conductor_client.set_task_process(
- ctxt, task_id, os.getpid())
- def _run():
- try:
- task_runner = task_runners_factory.get_task_runner_class(
- task_type)()
- event_handler = (
- worker_rpc_server._get_event_handler_for_task_type(
- task_type, ctxt, task_id))
- task_result = task_runner.run(
- ctxt, instance, origin, destination, task_info,
- event_handler)
- coriolis_utils.is_serializable(task_result)
- result_q.put(task_result)
- except Exception as ex:
- LOG.exception(ex)
- result_q.put(str(ex))
- thread = coriolis_utils.start_thread(_run)
- thread.join()
- result = result_q.get_nowait()
- if isinstance(result, str):
- raise exception.TaskProcessException(result)
- return result
- class _IntegrationHarness:
- """Shared Integration tests infrastructure; created once per process.
- The first call to ``_IntegrationHarness.get()`` performs the full setup:
- temp workspace, CONF overrides, DB sync, and service startup. Subsequent
- calls return the same instance. Teardown is registered with ``atexit`` so
- it runs after all test classes have finished, not after the first one.
- """
- _instance = None
- @classmethod
- def get(cls):
- if cls._instance is None:
- cls._instance = cls()
- return cls._instance
- def __init__(self):
- self.workdir = tempfile.mkdtemp(prefix="coriolis-integration-")
- self.lock_path = os.path.join(self.workdir, "locks")
- os.makedirs(self.lock_path)
- self._mysql_container_name = "coriolis-test-mysql-%s" % str(
- uuid.uuid4()).split("-")[0]
- self._mysql_username = "root"
- self._mysql_password = "coriolis"
- self._mysql_database = "coriolis"
- self.ssh_key_path = os.path.join(self.workdir, "id_rsa")
- subprocess.run(
- ["ssh-keygen", "-t", "rsa", "-b", "2048",
- "-f", self.ssh_key_path, "-N", ""],
- check=True,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- )
- coriolis_conf.init_common_opts()
- cfg.CONF([], project='coriolis', version='1.0.0',
- default_config_files=[], default_config_dirs=[])
- cfg.CONF.set_override('messaging_transport_url', 'fake://')
- cfg.CONF.set_override(
- 'providers', [_TEST_EXPORT_PROVIDER, _TEST_IMPORT_PROVIDER])
- db_url = ('mysql+pymysql://%(user)s:%(password)s'
- '@localhost:13306/%(database)s') % {
- "user": self._mysql_username,
- "password": self._mysql_password,
- "database": self._mysql_database,
- }
- cfg.CONF.set_override(
- 'connection', db_url, group='database')
- cfg.CONF.set_override(
- 'retry_interval', 1, group='database')
- cfg.CONF.set_override(
- 'lock_path', self.lock_path, group='oslo_concurrency')
- coriolis_utils.setup_logging()
- test_utils.init_scsi_debug()
- # Policy enforcer: reset so it re-reads the new CONF (no policy file).
- policy_module.reset()
- self.exp_provider_platform = _provider_platform(_TEST_EXPORT_PROVIDER)
- self.imp_provider_platform = _provider_platform(_TEST_IMPORT_PROVIDER)
- self._wsgi_server = None
- self._wsgi_server_thread = None
- self.api_port = None
- self._conductor_svc = None
- self._scheduler_svc = None
- self._transfer_cron_svc = None
- self._deployer_manager_svc = None
- self._worker_svc = None
- self._worker_host_svc = None
- # SQLAlchemy facade and RPC transport are module-level singletons;
- # reset them so they are re-created from the new CONF values.
- sqlalchemy_api._facade = None
- rpc_module._TRANSPORT = None
- atexit.register(self._teardown)
- self._start_db_container()
- engine = db_api.get_engine()
- db_migration.db_sync(engine)
- self._start_coriolis_services()
- def _start_db_container(self):
- coriolis_utils.exec_process(
- [
- "docker",
- "run",
- "-d",
- "--name",
- self._mysql_container_name,
- "-e",
- f"MYSQL_ROOT_PASSWORD={self._mysql_password}",
- "-e", f"MYSQL_DATABASE={self._mysql_database}",
- "-p", "13306:3306",
- "mariadb:10-jammy",
- ])
- def _start_coriolis_services(self):
- """Start conductor, scheduler, worker, and API in-process."""
- rpc_module.init()
- # Conductor: must start first so the worker can register with it.
- conductor_endpoint = conductor_rpc_server.ConductorServerEndpoint()
- conductor_endpoint._licensing_client = None
- conductor_endpoint._minion_manager_client_instance = mock.MagicMock()
- self._conductor_svc = service.MessagingService(
- constants.CONDUCTOR_MAIN_MESSAGING_TOPIC,
- [conductor_endpoint],
- conductor_rpc_server.VERSION,
- worker_count=1,
- init_rpc=False,
- )
- self._conductor_svc.start()
- self._scheduler_svc = service.MessagingService(
- constants.SCHEDULER_MAIN_MESSAGING_TOPIC,
- [scheduler_rpc_server.SchedulerServerEndpoint()],
- scheduler_rpc_server.VERSION,
- worker_count=1,
- init_rpc=False,
- )
- self._scheduler_svc.start()
- # Transfer-cron: constructor makes an RPC call to the conductor to load
- # existing schedules, so the conductor must be running first.
- self._transfer_cron_svc = service.MessagingService(
- constants.TRANSFER_CRON_MAIN_MESSAGING_TOPIC,
- [transfer_cron_rpc_server.TransferCronServerEndpoint()],
- transfer_cron_rpc_server.VERSION,
- worker_count=1,
- init_rpc=False,
- )
- self._transfer_cron_svc.start()
- self._deployer_manager_svc = service.MessagingService(
- constants.DEPLOYER_MANAGER_MAIN_MESSAGING_TOPIC,
- [deployer_manager_rpc_server.DeployerManagerServerEndpoint()],
- deployer_manager_rpc_server.VERSION,
- worker_count=1,
- init_rpc=False,
- )
- self._deployer_manager_svc.start()
- # Worker: constructor calls _register_worker_service() which makes a
- # blocking RPC call to the conductor, so the conductor must already be
- # listening.
- #
- # We reuse the same endpoint instance for both the main topic and the
- # host-specific topic (coriolis_worker.{hostname}) to avoid a double
- # service registration. The fake:// transport uses literal string
- # matching instead of AMQP topic routing, so the host-specific topic
- # must be served explicitly; otherwise the conductor's WorkerClient
- # (which routes via SERVICE_MESSAGING_TOPIC_FORMAT) would send to a
- # queue that nobody reads.
- _worker_endpoint = _InProcessWorkerServerEndpoint()
- self._worker_svc = service.MessagingService(
- constants.WORKER_MAIN_MESSAGING_TOPIC,
- [_worker_endpoint],
- worker_rpc_server.VERSION,
- worker_count=1,
- init_rpc=False,
- )
- self._worker_svc.start()
- _worker_host_topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
- "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
- "host": coriolis_utils.get_hostname(),
- }
- self._worker_host_svc = service.MessagingService(
- _worker_host_topic,
- [_worker_endpoint],
- worker_rpc_server.VERSION,
- worker_count=1,
- init_rpc=False,
- )
- self._worker_host_svc.start()
- # API: build the WSGI stack without keystonemiddleware and serve it
- # on a random local port.
- wsgi_app = _TestAPIRouter()
- wsgi_app = _NoAuthMiddleware(wsgi_app)
- wsgi_app = fault_middleware.FaultWrapper(wsgi_app)
- wsgi_app = request_id_middleware.RequestId(wsgi_app)
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
- s.bind(('127.0.0.1', 0))
- s.listen(1)
- # Pick an available port.
- self.api_port = s.getsockname()[1]
- self._wsgi_server = cheroot_wsgi.Server(
- bind_addr=("127.0.0.1", self.api_port),
- wsgi_app=wsgi_app,
- server_name="coriolis-api",
- )
- self._wsgi_server.prepare()
- self._wsgi_server_thread = coriolis_utils.start_thread(
- self._wsgi_server.serve,
- daemon=True,
- )
- def _teardown(self):
- LOG.info("Teardown initiated.")
- try:
- coriolis_utils.exec_process(
- [
- "docker",
- "stop",
- self._mysql_container_name
- ])
- coriolis_utils.exec_process(
- [
- "docker",
- "rm",
- self._mysql_container_name
- ])
- except Exception:
- pass
- for svc in [self._worker_host_svc, self._worker_svc,
- self._deployer_manager_svc, self._transfer_cron_svc,
- self._scheduler_svc, self._conductor_svc]:
- if not svc:
- continue
- try:
- svc.stop()
- except Exception:
- pass
- if self._wsgi_server:
- try:
- self._wsgi_server.stop()
- self._wsgi_server_thread.join()
- except Exception:
- pass
- shutil.rmtree(self.workdir, True)
- try:
- test_utils.destroy_scsi_debug()
- except Exception:
- pass
|