Просмотр исходного кода

integration: Add BaseTestImportProvider abstraction

Introduces an `BaseTestImportProvider` ABC that contains provider-specific
logic not currently defined in the import providers, meant to be used for
testing-only purposes:
- detect leaked resources
- delete deployed replicas

Adds an implementation for it, in `TestImportProvider`.
Claudiu Belu 2 недель назад
Родитель
Сommit
3eb41d830a

+ 15 - 12
coriolis/tests/integration/base.py

@@ -13,7 +13,6 @@ Subclasses must be run as root.
 """
 
 import os
-import subprocess
 import time
 import unittest
 from unittest import mock
@@ -241,17 +240,7 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
 
     @classmethod
     def setUpClass(cls):
-        result = subprocess.run(
-            ["docker", "image", "inspect", test_utils.DATA_MINION_IMAGE],
-            stdout=subprocess.DEVNULL,
-            stderr=subprocess.DEVNULL,
-        )
-        if result.returncode != 0:
-            raise unittest.SkipTest(
-                "Docker image not found; build it with: "
-                "docker build -t %s "
-                "coriolis/tests/integration/dockerfiles/data-minion/"
-                % test_utils.DATA_MINION_IMAGE)
+        harness._IntegrationHarness.get().imp_provider.check_prerequisites()
 
         super().setUpClass()
 
@@ -471,6 +460,9 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
         that occurs when a deployment is still in-flight at cleanup time, which
         can happen with slow providers when a test fails or times out before
         the deployment completes.
+
+        Calls ``_imp_provider.delete_deployed_instance`` for every deployment
+        instance, so that finalized VMs at the destination are destroyed.
         """
         ctxt = self._get_db_context()
         deployment = db_api.get_deployment(ctxt, deployment_id)
@@ -479,6 +471,8 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
                 "Deployment '%s' not found. Skip cleanup.", deployment_id)
             return
 
+        instances = list(deployment.instances or [])
+
         if deployment.last_execution_status in (
                 constants.ACTIVE_EXECUTION_STATUSES):
             self._client.deployments.cancel(deployment_id)
@@ -486,6 +480,15 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
 
         self._client.deployments.delete(deployment_id)
 
+        for instance_name in instances:
+            try:
+                self._imp_provider.delete_deployed_instance(
+                    self._imp_conn_info, instance_name)
+            except Exception as ex:
+                LOG.warning(
+                    "Could not clean up deployed instance '%s': %s",
+                    instance_name, ex)
+
     def wait_for_deployment(self, deployment_id, timeout=300,
                             desired_statuses=None):
         """Block until *deployment_id* reaches any terminal state.

+ 10 - 0
coriolis/tests/integration/deployments/test_osmorphing.py

@@ -9,9 +9,11 @@ installation in the target OS.
 
 import os
 import re
+import unittest
 import uuid
 
 from coriolis.tests.integration import base as integration_base
+from coriolis.tests.integration import harness as integration_harness
 from coriolis.tests.integration import utils as test_utils
 
 
@@ -21,6 +23,14 @@ class OsMorphingDeploymentTest(integration_base.ReplicaIntegrationTestBase):
     # any new packages to be added during OS morphing.
     _SCSI_DEBUG_SIZE_MB = 256
 
+    @classmethod
+    def setUpClass(cls):
+        harness = integration_harness._IntegrationHarness.get()
+        if not harness.uses_core_test_import_provider():
+            raise unittest.SkipTest(
+                "OS morphing tests require local disk access")
+        super().setUpClass()
+
     def setUp(self):
         super().setUp()
         test_utils.write_os_image_to_disk(self._src_device, "ubuntu:24.04")

+ 10 - 0
coriolis/tests/integration/harness.py

@@ -57,6 +57,7 @@ from coriolis.scheduler.rpc import server as scheduler_rpc_server
 from coriolis import service
 from coriolis.taskflow import runner as taskflow_runner
 from coriolis.tasks import factory as task_runners_factory
+from coriolis.tests.integration.test_provider import imp as test_provider_imp
 from coriolis.tests.integration import utils as test_utils
 from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server
 from coriolis import utils as coriolis_utils
@@ -322,6 +323,7 @@ class _IntegrationHarness:
             "pkey_path": self.ssh_key_path,
             "role": "destination",
         }
+        self.imp_provider.initialize(self.imp_conn_info)
         self.imp_env_options = {}
 
         self._wsgi_server = None
@@ -340,6 +342,7 @@ class _IntegrationHarness:
         sqlalchemy_api._facade = None
         rpc_module._TRANSPORT = None
 
+        atexit.register(self.imp_provider.teardown, self.imp_conn_info)
         atexit.register(self._teardown)
 
         self._start_db_container()
@@ -522,3 +525,10 @@ class _IntegrationHarness:
             test_utils.destroy_scsi_debug()
         except Exception:
             pass
+
+    def uses_core_test_import_provider(self):
+        """Returns True when the test import provider is being used."""
+        return isinstance(
+            self.imp_provider,
+            test_provider_imp.TestImportProvider,
+        )

+ 49 - 0
coriolis/tests/integration/provider_test_base.py

@@ -0,0 +1,49 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""
+Abstract base class for test import providers.
+
+Based on the Base* provider convention from coriolis/providers/base.py.
+
+The BaseTestImportProvider contains provider-specific logic not currently
+defined in the import providers, meant to be used for testing-only purposes:
+    - detect leaked resources
+    - delete deployed replicas
+"""
+
+import abc
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class BaseTestImportProvider(abc.ABC):
+
+    def initialize(self, connection_info: dict):
+        """One-time initialization, before any tests run.
+
+        Can be used to list the current resources on the target provider,
+        which can then be used to check if any test resources leaked and
+        clean them.
+        """
+
+    def teardown(self, connection_info: dict):
+        """One-time teardown called at atexit.
+
+        Can be used to check and clean any leaked test resources.
+        """
+
+    def check_prerequisites(self):
+        """Raise ``unittest.SkipTest`` if required infrastructure is absent."""
+
+    def delete_deployed_instance(
+        self, connection_info: dict, instance_name: str,
+    ):
+        """Destroy the VM created at the destination by a completed deployment.
+
+        Called during integration test cleanup after each deployment test, so
+        that finalized VMs do not accumulate across runs and cause failures in
+        later tests (e.g. name collisions, resource exhaustion).
+        """

+ 41 - 1
coriolis/tests/integration/test_provider/imp.py

@@ -10,6 +10,7 @@ target_conn_info that BackupWritersFactory expects.
 """
 
 import os
+import unittest
 import uuid
 
 from oslo_log import log as logging
@@ -24,6 +25,7 @@ from coriolis.providers.base import BaseEndpointStorageProvider
 from coriolis.providers.base import BaseReplicaImportProvider
 from coriolis.providers.base import BaseReplicaImportValidationProvider
 from coriolis.providers.base import BaseUpdateDestinationReplicaProvider
+from coriolis.tests.integration import provider_test_base
 from coriolis.tests.integration.test_provider import osmorphing
 from coriolis.tests.integration import utils as test_utils
 from coriolis import utils as coriolis_utils
@@ -33,6 +35,13 @@ LOG = logging.getLogger(__name__)
 # Port used by the test writer binary inside the container.
 WRITER_TEST_PORT = 6677
 
+# Name prefixes used by _create_minion callers.
+_CONTAINER_PREFIXES = (
+    "coriolis-writer-",
+    "coriolis-osmorphing-",
+    "coriolis-pool-minion-",
+)
+
 
 class TestImportProvider(
         BaseEndpointProvider,
@@ -42,7 +51,8 @@ class TestImportProvider(
         BaseUpdateDestinationReplicaProvider,
         BaseReplicaImportProvider,
         BaseReplicaImportValidationProvider,
-        BaseDestinationMinionPoolProvider):
+        BaseDestinationMinionPoolProvider,
+        provider_test_base.BaseTestImportProvider):
     """Destination-side provider backed by a local `scsi_debug` block device.
 
     ``connection_info`` (the destination endpoint's connection info) has the
@@ -63,6 +73,36 @@ class TestImportProvider(
     def __init__(self, event_handler):
         self._event_handler = event_handler
 
+    # BaseTestImportProvider - test only
+
+    def initialize(self, connection_info: dict):
+        self._initial_containers = test_utils.list_containers(
+            _CONTAINER_PREFIXES
+        )
+
+    def teardown(self, connection_info: dict):
+        new_containers = test_utils.list_containers(_CONTAINER_PREFIXES)
+        leaked_containers = new_containers - self._initial_containers
+
+        if not leaked_containers:
+            return
+
+        for name in leaked_containers:
+            test_utils.remove_container(name)
+
+        raise AssertionError(
+            "Found leaked containers during teardown: %s" % leaked_containers
+        )
+
+    def check_prerequisites(self):
+        if not test_utils.container_image_exists(test_utils.DATA_MINION_IMAGE):
+            raise unittest.SkipTest(
+                "Docker image '%s' not found; build it with: "
+                "docker build -t %s "
+                "coriolis/tests/integration/dockerfiles/data-minion/"
+                % (test_utils.DATA_MINION_IMAGE, test_utils.DATA_MINION_IMAGE)
+            )
+
     # BaseProvider / BaseEndpointProvider
 
     def get_connection_info_schema(self):

+ 17 - 15
coriolis/tests/integration/transfers/test_transfer.py

@@ -41,19 +41,19 @@ class ReplicaTransferIntegrationTest(base.ReplicaIntegrationTestBase):
         - Create source / destination endpoints and a Replica transfer via the
           Coriolis REST API (using coriolisclient).
         - Execute the transfer and wait for it to complete.
-        - Assert that the destination device contains the same data as the
-          source.
         - Overwrite a single chunk on the source device.
         - Execute a second transfer run (incremental=True).
-        - Assert that the destination now matches the updated source.
+
+        The content is verified only if the test import provider is being used.
         """
         # First run: full transfer
         self._execute_and_wait(self._transfer.id)
 
-        self.assertTrue(
-            test_utils.devices_match(self._src_device, self._dst_device),
-            "Devices do not match after initial full transfer",
-        )
+        if self._harness.uses_core_test_import_provider():
+            self.assertTrue(
+                test_utils.devices_match(self._src_device, self._dst_device),
+                "Devices do not match after initial full transfer",
+            )
 
         # Mutate source: write a different pattern at the second chunk
         test_utils.write_bytes_at_offset(
@@ -61,18 +61,20 @@ class ReplicaTransferIntegrationTest(base.ReplicaIntegrationTestBase):
             offset=4096,
             data=b"\xff\xfe\xfd\xfc" * 1024,
         )
-        self.assertFalse(
-            test_utils.devices_match(self._src_device, self._dst_device),
-            "Devices should differ after mutating the source",
-        )
+        if self._harness.uses_core_test_import_provider():
+            self.assertFalse(
+                test_utils.devices_match(self._src_device, self._dst_device),
+                "Devices should differ after mutating the source",
+            )
 
         # Second run: incremental
         self._execute_and_wait(self._transfer.id)
 
-        self.assertTrue(
-            test_utils.devices_match(self._src_device, self._dst_device),
-            "Destination does not match source after incremental transfer",
-        )
+        if self._harness.uses_core_test_import_provider():
+            self.assertTrue(
+                test_utils.devices_match(self._src_device, self._dst_device),
+                "Destination does not match source after incremental transfer",
+            )
 
 
 class MinionPoolTransferTest(

+ 23 - 0
coriolis/tests/integration/utils.py

@@ -197,6 +197,29 @@ def wait_for_ssh(host, port, username, pkey_path, timeout=30):
 # Docker utils
 
 
+def list_containers(prefixes) -> set:
+    result = subprocess.run(
+        ["docker", "ps", "-a", "--format", "{{.Names}}"],
+        capture_output=True,
+        text=True,
+    )
+
+    return {
+        name for name in result.stdout.splitlines()
+        if any(name.startswith(p) for p in prefixes)
+    }
+
+
+def container_image_exists(image_name):
+    result = subprocess.run(
+        ["docker", "image", "inspect", image_name],
+        stdout=subprocess.DEVNULL,
+        stderr=subprocess.DEVNULL,
+    )
+
+    return result.returncode == 0
+
+
 def start_container(container_id):
     """Start a stopped Docker container."""
     _run(["docker", "start", container_id], check=False)