base.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. # Copyright 2026 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. """
  4. Base test classes for the Coriolis integration tests.
  5. Ensures the shared ``_IntegrationHarness`` is running (started once per
  6. process) and exposes its port and paths as class attributes. The heavy
  7. lifting - CONF overrides, DB sync, service startup - happens inside the
  8. harness and is not repeated for each subclass.
  9. Subclasses must be run as root.
  10. """
  11. import os
  12. import time
  13. import unittest
  14. from unittest import mock
  15. from coriolisclient import client as coriolis_client
  16. from keystoneauth1 import session as ks_session
  17. from keystoneauth1 import token_endpoint
  18. from oslo_config import cfg
  19. from oslo_log import log as logging
  20. from coriolis import constants
  21. from coriolis import context
  22. from coriolis.db import api as db_api
  23. from coriolis.tests.integration import harness
  24. from coriolis.tests.integration import utils as test_utils
  25. from coriolis.tests import test_base
  26. CONF = cfg.CONF
  27. LOG = logging.getLogger(__name__)
  28. class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
  29. """Base class for integration tests."""
  30. @classmethod
  31. def setUpClass(cls):
  32. if os.geteuid() != 0:
  33. raise unittest.SkipTest("Integration tests must run as root")
  34. super().setUpClass()
  35. cls._harness = harness._IntegrationHarness.get()
  36. cls._workdir = cls._harness.workdir
  37. cls._db_path = cls._harness.db_path
  38. cls._lock_path = cls._harness.lock_path
  39. cls._api_port = cls._harness.api_port
  40. cls._client = cls.get_client()
  41. # Helpers for subclasses
  42. @classmethod
  43. def get_client(cls):
  44. """Return a coriolisclient.Client pointed at the in-process API."""
  45. auth = token_endpoint.Token(
  46. endpoint='http://127.0.0.1:%d' % cls._api_port,
  47. token='integration-dummy-token',
  48. )
  49. return coriolis_client.Client(
  50. session=ks_session.Session(auth=auth),
  51. project_id=harness._TEST_PROJECT_ID,
  52. )
  53. def _create_endpoint(self, **kwargs):
  54. endpoint_kwargs = {
  55. "description": "",
  56. "regions": [],
  57. }
  58. endpoint_kwargs.update(kwargs)
  59. endpoint = self._client.endpoints.create(**endpoint_kwargs)
  60. self.addCleanup(self._client.endpoints.delete, endpoint.id)
  61. return endpoint
  62. def _create_transfer(self, src_id, dst_id, instances):
  63. """Create a Replica transfer object and return its ID."""
  64. transfer = self._client.transfers.create(
  65. origin_endpoint_id=src_id,
  66. destination_endpoint_id=dst_id,
  67. source_environment={},
  68. destination_environment={},
  69. instances=instances,
  70. transfer_scenario=constants.TRANSFER_SCENARIO_REPLICA,
  71. network_map={},
  72. storage_mappings={},
  73. notes="integration test replica",
  74. skip_os_morphing=True,
  75. )
  76. self.addCleanup(self._client.transfers.delete, transfer.id)
  77. return transfer
  78. class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
  79. def setUp(self):
  80. super().setUp()
  81. self._src_device = test_utils.add_scsi_debug_device()
  82. self.addCleanup(test_utils.remove_scsi_debug_device)
  83. self._dst_device = test_utils.add_scsi_debug_device()
  84. self.addCleanup(test_utils.remove_scsi_debug_device)
  85. # Write a test pattern on the src device.
  86. test_utils.write_test_pattern(self._src_device)
  87. # Create endpoints.
  88. self._src_endpoint = self._create_endpoint(
  89. name="test-src",
  90. endpoint_type="test-src",
  91. description="integration source endpoint",
  92. connection_info={
  93. "block_device_path": self._src_device,
  94. "pkey_path": "/home/ubuntu/.ssh/id_rsa",
  95. },
  96. )
  97. self._dst_endpoint = self._create_endpoint(
  98. name="test-dest",
  99. endpoint_type="test-dest",
  100. description="integration destination endpoint",
  101. connection_info={
  102. "devices": [self._dst_device],
  103. "pkey_path": "/home/ubuntu/.ssh/id_rsa",
  104. },
  105. )
  106. # Create transfer replica.
  107. self._transfer = self._create_transfer(
  108. self._src_endpoint.id,
  109. self._dst_endpoint.id,
  110. instances=[self._src_device],
  111. )
  112. # mock a few commands that are going to be ran through ssh; they won't
  113. # pass anyway.
  114. bkup = "coriolis.providers.backup_writers.HTTPBackupWriterBootstrapper"
  115. repl = "coriolis.providers.replicator.Replicator"
  116. for prop in [
  117. "coriolis.providers.backup_writers._disable_lvm2_lvmetad",
  118. f"{bkup}._add_firewalld_port",
  119. f"{bkup}._change_binary_se_context",
  120. f"{repl}._change_binary_se_context",
  121. ]:
  122. mocker = mock.patch(prop)
  123. mocker.start()
  124. self.addCleanup(mocker.stop)
  125. def _execute_and_wait(self, transfer_id, timeout=300):
  126. """Trigger one execution of *transfer_id* and wait for completion."""
  127. execution = self._client.transfer_executions.create(
  128. transfer_id, shutdown_instances=False)
  129. self.assertExecutionCompleted(execution.id, timeout=timeout)
  130. def _get_db_context(self):
  131. return context.RequestContext(
  132. user='int-test',
  133. project_id=harness._TEST_PROJECT_ID,
  134. is_admin=True,
  135. )
  136. def wait_for_execution(self, execution_id, timeout=300):
  137. """Block until *execution_id* reaches a terminal state.
  138. Polls the DB directly and yields on each iteration so in-process
  139. services can make progress.
  140. Returns the finalised TasksExecution ORM object.
  141. Raises ``AssertionError`` on timeout.
  142. """
  143. ctxt = self._get_db_context()
  144. deadline = time.monotonic() + timeout
  145. while time.monotonic() < deadline:
  146. execution = db_api.get_tasks_execution(ctxt, execution_id)
  147. if execution.status in constants.FINALIZED_EXECUTION_STATUSES:
  148. return execution
  149. time.sleep(1)
  150. self.fail(
  151. "Execution %s did not reach a terminal state within %ds "
  152. "(last status: %s)"
  153. % (execution_id, timeout, execution.status)
  154. )
  155. def assertExecutionCompleted(self, execution_id, timeout=300):
  156. """Assert that *execution_id* completes successfully."""
  157. execution = self.wait_for_execution(execution_id, timeout=timeout)
  158. self.assertEqual(
  159. constants.EXECUTION_STATUS_COMPLETED,
  160. execution.status,
  161. "Execution %s ended with status %s; task details: %s"
  162. % (
  163. execution_id,
  164. execution.status,
  165. [
  166. (t.task_type, t.status, t.exception_details)
  167. for t in execution.tasks
  168. if t.status not in (
  169. constants.TASK_STATUS_COMPLETED,
  170. constants.TASK_STATUS_CANCELED,
  171. )
  172. ],
  173. ),
  174. )
  175. def assertExecutionErrored(self, execution_id, timeout=300):
  176. """Assert that *execution_id* ends in an error state."""
  177. execution = self.wait_for_execution(execution_id, timeout=timeout)
  178. self.assertIn(
  179. execution.status,
  180. [
  181. constants.EXECUTION_STATUS_ERROR,
  182. constants.EXECUTION_STATUS_DEADLOCKED,
  183. ],
  184. "Expected an error status for execution %s, got %s"
  185. % (execution_id, execution.status),
  186. )
  187. def assertDeploymentCompleted(self, deployment_id, timeout=300):
  188. """Assert that *deployment_id* finishes with a completed status.
  189. Polls last_execution_status from the DB (the API view does not expose
  190. the execution ID directly, so DB polling is used for status tracking).
  191. """
  192. ctxt = self._get_db_context()
  193. deadline = time.monotonic() + timeout
  194. while time.monotonic() < deadline:
  195. deployment = db_api.get_deployment(ctxt, deployment_id)
  196. status = deployment.last_execution_status
  197. if status in constants.FINALIZED_EXECUTION_STATUSES:
  198. self.assertEqual(
  199. constants.EXECUTION_STATUS_COMPLETED,
  200. status,
  201. "Deployment %s ended with status %s"
  202. % (deployment_id, status),
  203. )
  204. return
  205. time.sleep(1)
  206. self.fail(
  207. "Deployment %s did not reach a terminal state within %ds"
  208. % (deployment_id, timeout)
  209. )