Explorar o código

Minion pool support: add provider stubs and start MinionManager service

- Adds _InProcessTaskflowRunner that runs pool task flows in daemon threads
  instead of child processes, keeping them inside the shared fake://
  transport.
- Add _InProcessMinionManagerServerEndpoint that uses the in-thread runner.
- Start the MinionManagerServerEndpoint in the harness.
- Patch coriolis.keystone.delete_trust globally (no real Keystone in tests).
- Disable the automatic pool-refresh cron (period=0) to prevent Keystone
  calls during pool lifecycle tests.
- Adds integration tests for minion pool lifecycle. Adds test for
  allocating / deallocating minion pools.
- Adds transfer, transfer execution, and  deployment tests using minion
  pools.
Claudiu Belu hai 1 semana
pai
achega
2548dd406b

+ 209 - 10
coriolis/tests/integration/base.py

@@ -22,11 +22,14 @@ from coriolisclient import client as coriolis_client
 from keystoneauth1 import session as ks_session
 from keystoneauth1 import token_endpoint
 from oslo_config import cfg
+from oslo_db.sqlalchemy import models
 from oslo_log import log as logging
+import oslo_messaging as messaging
 
 from coriolis import constants
 from coriolis import context
 from coriolis.db import api as db_api
+from coriolis.providers import factory as providers_factory
 from coriolis.tests.integration import harness
 from coriolis.tests.integration import utils as test_utils
 from coriolis.tests import test_base
@@ -34,6 +37,18 @@ from coriolis.tests import test_base
 CONF = cfg.CONF
 LOG = logging.getLogger(__name__)
 
+# Statuses that represent a completed allocation attempt.
+MINION_ALLOCATED_TERMINAL = {
+    constants.MINION_POOL_STATUS_ALLOCATED,
+    constants.MINION_POOL_STATUS_ERROR,
+}
+
+# Statuses that represent a completed deallocation attempt.
+MINION_DEALLOCATED_TERMINAL = {
+    constants.MINION_POOL_STATUS_DEALLOCATED,
+    constants.MINION_POOL_STATUS_ERROR,
+}
+
 
 class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
     """Base class for integration tests."""
@@ -55,7 +70,36 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
     def setUp(self):
         super().setUp()
 
-        patcher = mock.patch("psutil.Process.send_signal")
+        to_patch = [
+            # Prevent the test runner from being killed by Coriolis sending
+            # SIGINT to in-process workers.
+            "psutil.Process.send_signal",
+            # When removing minion pools, this is also called.
+            # There is no Keystone, so it needs to be mocked.
+            "coriolis.keystone.delete_trust",
+        ]
+        for thing in to_patch:
+            patcher = mock.patch(thing)
+            patcher.start()
+            self.addCleanup(patcher.stop)
+
+        # fake:// oslo_messaging doesn't serialize objects. After calls, some
+        # actions may remain as objects, yet they are expected to be dicts.
+        self._patch_rpc_client_method("call")
+        self._patch_rpc_client_method("cast")
+
+    def _patch_rpc_client_method(self, method):
+        original_call = getattr(messaging.RPCClient, method)
+
+        def _call(self, ctxt, method, **kwargs):
+            for key, value in kwargs.items():
+                if isinstance(value, models.ModelBase):
+                    kwargs[key] = dict(value.items())
+
+            return original_call(self, ctxt, method, **kwargs)
+
+        patcher = mock.patch.object(
+            messaging.RPCClient, method, _call)
         patcher.start()
         self.addCleanup(patcher.stop)
 
@@ -83,7 +127,7 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
 
         return endpoint
 
-    def _create_transfer(self, src_id, dst_id, instances):
+    def _create_transfer(self, src_id, dst_id, instances, **kwargs):
         """Create a Replica transfer object and return its ID."""
         transfer = self._client.transfers.create(
             origin_endpoint_id=src_id,
@@ -96,11 +140,73 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
             storage_mappings={},
             notes="integration test replica",
             skip_os_morphing=True,
+            **kwargs,
         )
         self.addCleanup(self._client.transfers.delete, transfer.id)
 
         return transfer
 
+    def _create_pool(
+            self, endpoint_id, name="test-pool", skip_allocation=True):
+        pool = self._client.minion_pools.create(
+            name=name,
+            endpoint=endpoint_id,
+            platform=constants.PROVIDER_PLATFORM_DESTINATION,
+            os_type=constants.OS_TYPE_LINUX,
+            environment_options={},
+            minimum_minions=1,
+            maximum_minions=1,
+            minion_max_idle_time=3600,
+            minion_retention_strategy=(
+                constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE),
+            skip_allocation=skip_allocation,
+        )
+        self.addCleanup(self._safe_delete_pool, pool.id)
+
+        return pool
+
+    def _safe_delete_pool(self, pool_id):
+        """Delete pool, force-deallocating first if needed."""
+        try:
+            pool = self._client.minion_pools.get(pool_id)
+        except Exception:
+            LOG.info("[Cleanup]: Pool '%s' not found. Skip delete.")
+            return
+
+        if pool.status not in MINION_DEALLOCATED_TERMINAL:
+            self._client.minion_pools.deallocate_minion_pool(
+                pool_id, force=True)
+            self._wait_for_pool(pool_id, MINION_DEALLOCATED_TERMINAL)
+
+        self._client.minion_pools.delete(pool_id)
+
+    def _wait_for_pool(self, pool_id, terminal_statuses, timeout=180):
+        """Poll the DB until *pool_id* reaches one of *terminal_statuses*.
+
+        :returns: minion pool ORM object.
+        :raises: AssertionError on timeout.
+        """
+        ctxt = self._get_db_context()
+        deadline = time.monotonic() + timeout
+        while time.monotonic() < deadline:
+            pool = db_api.get_minion_pool(ctxt, pool_id)
+            if pool and pool.status in terminal_statuses:
+                return pool
+            time.sleep(1)
+        pool = db_api.get_minion_pool(ctxt, pool_id)
+        last = pool.status if pool else "not found"
+        self.fail(
+            "Pool %s did not reach one of %r within %ds (last: %s)"
+            % (pool_id, terminal_statuses, timeout, last)
+        )
+
+    def _get_db_context(self):
+        return context.RequestContext(
+            user='int-test',
+            project_id=harness._TEST_PROJECT_ID,
+            is_admin=True,
+        )
+
     @staticmethod
     def _ignoreExc(func, ignored_exc=Exception):
         """Wrap the given function, ignoring exceptions."""
@@ -116,6 +222,7 @@ class CoriolisIntegrationTestBase(test_base.CoriolisBaseTestCase):
 class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
 
     _CREATE_SCSI_DBG_DEVS = True
+    _CREATE_MINION_POOLS = False
 
     @classmethod
     def setUpClass(cls):
@@ -138,6 +245,7 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
 
         self._src_device = None
         self._dst_device = None
+        self._pool_id = None
 
         if self._CREATE_SCSI_DBG_DEVS:
             self._src_device = test_utils.add_scsi_debug_device()
@@ -146,7 +254,10 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             self.addCleanup(test_utils.remove_scsi_debug_device)
 
         # Write a test pattern on the src device.
-        test_utils.write_test_pattern(self._src_device)
+        # Incremental transfer tests update the second chunk (offset=4096).
+        # We need to reset any residual data left in the scsi_debug backing
+        # store from a previous test run.
+        test_utils.write_test_pattern(self._src_device, 8192)
 
         # Create endpoints.
         self._src_endpoint = self._create_endpoint(
@@ -169,11 +280,26 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             },
         )
 
+        # Create minion pool if needed.
+        if self._CREATE_MINION_POOLS:
+            pool = self._create_pool(
+                self._dst_endpoint.id, "transfer-pool", skip_allocation=False)
+            self._pool_id = pool.id
+
+            pool_obj = self._wait_for_pool(pool.id, MINION_ALLOCATED_TERMINAL)
+
+            self.assertEqual(
+                constants.MINION_POOL_STATUS_ALLOCATED,
+                pool_obj.status,
+                "Pool did not reach ALLOCATED (got %s)" % pool_obj.status,
+            )
+
         # Create transfer replica.
         self._transfer = self._create_transfer(
             self._src_endpoint.id,
             self._dst_endpoint.id,
             instances=[self._src_device],
+            destination_minion_pool_id=self._pool_id,
         )
 
         # mock a few commands that are going to be ran through ssh; they won't
@@ -196,13 +322,6 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
             transfer_id, shutdown_instances=False)
         self.assertExecutionCompleted(execution.id, timeout=timeout)
 
-    def _get_db_context(self):
-        return context.RequestContext(
-            user='int-test',
-            project_id=harness._TEST_PROJECT_ID,
-            is_admin=True,
-        )
-
     def wait_for_execution(self, execution_id, timeout=300,
                            desired_statuses=None):
         """Block until *execution_id* reaches a terminal state.
@@ -324,3 +443,83 @@ class ReplicaIntegrationTestBase(CoriolisIntegrationTestBase):
         )
         patcher.start()
         self.addCleanup(patcher.stop)
+
+
+class MinionPoolTestBase(CoriolisIntegrationTestBase):
+    """Base class for minion pool integration tests.
+
+    Skips the entire test class when the import provider does not advertise
+    ``PROVIDER_TYPE_DESTINATION_MINION_POOL`` support.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        available = providers_factory.get_available_providers()
+        imp_types = available.get(cls._imp_platform, {}).get("types", [])
+        if constants.PROVIDER_TYPE_DESTINATION_MINION_POOL not in imp_types:
+            raise unittest.SkipTest(
+                "Import provider '%s' does not support minion pools"
+                % cls._imp_platform
+            )
+
+
+class MinionPoolReplicaTestBase(
+        MinionPoolTestBase, ReplicaIntegrationTestBase):
+    """Base class for replica integration tests using minion pools.
+
+    Extends the assertions to also verify that the minions in the pool have
+    been used, and that the minions and the pool returns to an available state.
+    """
+
+    _CREATE_MINION_POOLS = True
+
+    def _execute_and_wait(self, transfer_id, timeout=300):
+        super()._execute_and_wait(transfer_id, timeout=timeout)
+        self.assertPoolAllocated(self._pool_id)
+        self.assertMachinesAvailable(self._pool_id)
+
+    def assertExecutionCompleted(self, execution_id, timeout=300):
+        super().assertExecutionCompleted(execution_id, timeout=timeout)
+        self.assertPoolAllocated(self._pool_id)
+        self.assertMachinesAvailable(self._pool_id)
+
+    def assertDeploymentCompleted(self, deployment_id, timeout=300):
+        super().assertDeploymentCompleted(deployment_id, timeout=timeout)
+        self.assertPoolAllocated(self._pool_id)
+        self.assertMachinesAvailable(self._pool_id)
+
+    def assertPoolAllocated(self, pool_id):
+        """Assert the pool is healthy and still in ALLOCATED status."""
+        ctxt = self._get_db_context()
+        pool = db_api.get_minion_pool(ctxt, pool_id)
+        self.assertIsNotNone(pool, "Pool %s not found" % pool_id)
+        self.assertEqual(
+            constants.MINION_POOL_STATUS_ALLOCATED,
+            pool.status,
+            "Pool %s is not ALLOCATED (got %s)" % (pool_id, pool.status),
+        )
+
+    def assertMachinesAvailable(self, pool_id):
+        """Assert all machines in the pool are AVAILABLE and have been used."""
+        ctxt = self._get_db_context()
+        pool = db_api.get_minion_pool(ctxt, pool_id, include_machines=True)
+        self.assertIsNotNone(pool, "Pool %s not found" % pool_id)
+        self.assertTrue(
+            pool.minion_machines,
+            "Pool %s has no minion machines" % pool_id,
+        )
+        for machine in pool.minion_machines:
+            self.assertEqual(
+                constants.MINION_MACHINE_STATUS_AVAILABLE,
+                machine.allocation_status,
+                "Machine %s in pool %s is not AVAILABLE (got %s)"
+                % (machine.id, pool_id, machine.allocation_status),
+            )
+            self.assertIsNotNone(
+                machine.last_used_at,
+                "Machine %s in pool %s has no last_used_at; "
+                "it may not have been used by the transfer"
+                % (machine.id, pool_id),
+            )

+ 55 - 4
coriolis/tests/integration/harness.py

@@ -25,7 +25,6 @@ import shutil
 import socket
 import subprocess
 import tempfile
-from unittest import mock
 import uuid
 
 from cheroot.workers import threadpool as cheroot_threadpool
@@ -49,10 +48,12 @@ from coriolis.db.sqlalchemy import api as sqlalchemy_api
 from coriolis.db.sqlalchemy import migration as db_migration
 from coriolis.deployer_manager.rpc import server as deployer_manager_rpc_server
 from coriolis import exception
+from coriolis.minion_manager.rpc import server as minion_manager_rpc_server
 from coriolis import policy as policy_module
 from coriolis import rpc as rpc_module
 from coriolis.scheduler.rpc import server as scheduler_rpc_server
 from coriolis import service
+from coriolis.taskflow import runner as taskflow_runner
 from coriolis.tasks import factory as task_runners_factory
 from coriolis.tests.integration import utils as test_utils
 from coriolis.transfer_cron.rpc import server as transfer_cron_rpc_server
@@ -204,6 +205,36 @@ class _InProcessWorkerServerEndpoint(worker_rpc_server.WorkerServerEndpoint):
         return result
 
 
+class _InProcessTaskflowRunner(taskflow_runner.TaskFlowRunner):
+    """Runs taskflow flows in a daemon thread instead of a child process.
+
+    The default runner uses multiprocessing.spawn, which creates an isolated
+    process with its own fake:// transport instance (no registered services).
+    Running in-thread lets pool tasks reach the conductor, scheduler, and
+    worker that are already live in this process.
+    """
+
+    def run_flow_in_background(self, flow, store=None):
+        coriolis_utils.start_thread(
+            target=self._run_flow,
+            args=(flow,),
+            kwargs={"store": store},
+            daemon=True,
+        )
+
+
+class _InProcessMinionManagerServerEndpoint(
+        minion_manager_rpc_server.MinionManagerServerEndpoint):
+    """Minion manager endpoint that runs pool task flows in-thread."""
+
+    @property
+    def _taskflow_runner(self):
+        return _InProcessTaskflowRunner(
+            constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC,
+            max_workers=25,
+        )
+
+
 class _IntegrationHarness:
     """Shared Integration tests infrastructure; created once per process.
 
@@ -259,6 +290,13 @@ class _IntegrationHarness:
             'retry_interval', 1, group='database')
         cfg.CONF.set_override(
             'lock_path', self.lock_path, group='oslo_concurrency')
+
+        # Disable automatic pool-refresh cron jobs (they would try to contact
+        # Keystone for trust maintenance).
+        cfg.CONF.set_override(
+            'minion_pool_default_refresh_period_minutes', 0,
+            group='minion_manager')
+
         coriolis_utils.setup_logging()
         test_utils.init_scsi_debug()
 
@@ -275,6 +313,7 @@ class _IntegrationHarness:
         self._scheduler_svc = None
         self._transfer_cron_svc = None
         self._deployer_manager_svc = None
+        self._minion_manager_svc = None
         self._worker_svc = None
         self._worker_host_svc = None
 
@@ -315,7 +354,6 @@ class _IntegrationHarness:
         # Conductor: must start first so the worker can register with it.
         conductor_endpoint = conductor_rpc_server.ConductorServerEndpoint()
         conductor_endpoint._licensing_client = None
-        conductor_endpoint._minion_manager_client_instance = mock.MagicMock()
         self._conductor_svc = service.MessagingService(
             constants.CONDUCTOR_MAIN_MESSAGING_TOPIC,
             [conductor_endpoint],
@@ -354,6 +392,18 @@ class _IntegrationHarness:
         )
         self._deployer_manager_svc.start()
 
+        # Minion manager: runs pool lifecycle task flows in-thread so that
+        # they can reach the in-process conductor, scheduler, and worker over
+        # the fake:// transport.
+        self._minion_manager_svc = service.MessagingService(
+            constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC,
+            [_InProcessMinionManagerServerEndpoint()],
+            minion_manager_rpc_server.VERSION,
+            worker_count=1,
+            init_rpc=False,
+        )
+        self._minion_manager_svc.start()
+
         # Worker: constructor calls _register_worker_service() which makes a
         # blocking RPC call to the conductor, so the conductor must already be
         # listening.
@@ -432,8 +482,9 @@ class _IntegrationHarness:
             pass
 
         for svc in [self._worker_host_svc, self._worker_svc,
-                    self._deployer_manager_svc, self._transfer_cron_svc,
-                    self._scheduler_svc, self._conductor_svc]:
+                    self._minion_manager_svc, self._deployer_manager_svc,
+                    self._transfer_cron_svc, self._scheduler_svc,
+                    self._conductor_svc]:
             if not svc:
                 continue
             try:

+ 5 - 0
coriolis/tests/integration/test_deployment.py

@@ -86,3 +86,8 @@ class ReplicaDeploymentIntegrationTest(base.ReplicaIntegrationTestBase):
                 constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
             ],
         )
+
+
+class MinionPoolReplicaDeploymentTests(
+        base.MinionPoolReplicaTestBase, ReplicaDeploymentIntegrationTest):
+    """Replica deployment that uses a pre-allocated destination minion pool."""

+ 86 - 0
coriolis/tests/integration/test_minion_pools.py

@@ -0,0 +1,86 @@
+# Copyright 2026 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+"""Integration tests for the Minion Pool lifecycle API.
+
+Exercises minion-pool operations via the Coriolis REST API:
+- CRUD without allocation (create skip_allocation=True, list, get, update,
+  delete)
+- Full allocation lifecycle (allocate -> wait for ALLOCATED -> refresh ->
+  deallocate -> wait for DEALLOCATED -> delete)
+"""
+
+from coriolis import constants
+from coriolis.tests.integration import base
+
+
+class MinionPoolLifecycleTest(base.MinionPoolTestBase):
+
+    def setUp(self):
+        super().setUp()
+
+        self._endpoint = self._create_endpoint(
+            name="pool-dst",
+            endpoint_type=self._imp_platform,
+            connection_info={
+                "devices": [],
+                "pkey_path": self._harness.ssh_key_path,
+            },
+        )
+
+    def test_minion_pool_crud(self):
+        # Create
+        pool = self._create_pool(self._endpoint.id)
+
+        self.assertEqual("test-pool", pool.name)
+        self.assertEqual(
+            constants.MINION_POOL_STATUS_DEALLOCATED, pool.status)
+
+        # List
+        pools = self._client.minion_pools.list()
+
+        pool_ids = [p.id for p in pools]
+        self.assertIn(pool.id, pool_ids)
+
+        # Get
+        fetched = self._client.minion_pools.get(pool.id)
+
+        self.assertEqual(pool.id, fetched.id)
+        self.assertEqual("test-pool", fetched.name)
+
+        # Update
+        updated = self._client.minion_pools.update(
+            pool.id, {"notes": "updated notes"})
+
+        self.assertEqual("updated notes", updated.notes)
+
+        # Delete
+        self._client.minion_pools.delete(pool.id)
+
+        pools = self._client.minion_pools.list()
+        self.assertNotIn(pool.id, [p.id for p in pools])
+
+    def test_allocate_deallocate(self):
+        pool = self._create_pool(self._endpoint.id)
+        self.assertEqual(
+            constants.MINION_POOL_STATUS_DEALLOCATED, pool.status)
+
+        # Allocate
+        self._client.minion_pools.allocate_minion_pool(pool.id)
+
+        final = self._wait_for_pool(pool.id, base.MINION_ALLOCATED_TERMINAL)
+        self.assertEqual(
+            constants.MINION_POOL_STATUS_ALLOCATED,
+            final.status,
+            "Pool allocation ended in unexpected status '%s'" % final.status,
+        )
+
+        # Deallocate
+        self._client.minion_pools.deallocate_minion_pool(pool.id)
+
+        final = self._wait_for_pool(pool.id, base.MINION_DEALLOCATED_TERMINAL)
+        self.assertEqual(
+            constants.MINION_POOL_STATUS_DEALLOCATED,
+            final.status,
+            "Pool deallocation ended in unexpected status '%s'" % final.status,
+        )

+ 5 - 0
coriolis/tests/integration/transfers/test_executions.py

@@ -116,3 +116,8 @@ class TransferExecutionsTests(base.ReplicaIntegrationTestBase):
             "Expected a canceled/error status after cancel, got %s"
             % final.status,
         )
+
+
+class MinionPoolTransferExecutionsTests(
+        base.MinionPoolReplicaTestBase, TransferExecutionsTests):
+    """Transfer executions that use a pre-allocated destination minion pool."""

+ 10 - 0
coriolis/tests/integration/transfers/test_transfer.py

@@ -73,3 +73,13 @@ class ReplicaTransferIntegrationTest(base.ReplicaIntegrationTestBase):
             test_utils.devices_match(self._src_device, self._dst_device),
             "Destination does not match source after incremental transfer",
         )
+
+
+class MinionPoolTransferTest(
+        base.MinionPoolReplicaTestBase, ReplicaTransferIntegrationTest):
+    """Transfer execution that uses a pre-allocated destination minion pool."""
+
+    def test_transfer(self):
+        super().test_transfer()
+        self.assertPoolAllocated(self._pool_id)
+        self.assertMachinesAvailable(self._pool_id)