Browse Source

Add Deployer Manager service

Adds executable service entry for the deployer manager, which will
manage the lifecycle and automation of the deployments launched
by deployer executions, using the `auto_deploy` transfer execution
option.
Daniel Vincze 1 năm trước cách đây
mục cha
commit
e752eabcdd

+ 35 - 0
coriolis/cmd/deployer_manager.py

@@ -0,0 +1,35 @@
+# Copyright 2025 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import sys
+
+from oslo_config import cfg
+
+from coriolis import constants
+from coriolis.deployer_manager.rpc import server as rpc_server
+from coriolis import service
+from coriolis import utils
+
+deployer_manager_opts = [
+    cfg.IntOpt(
+        'worker_count', min=1, default=1,
+        help="Number of processes in which the service will be running")]
+CONF = cfg.CONF
+CONF.register_opts(deployer_manager_opts, 'deployer_manager')
+
+
+def main():
+    CONF(sys.argv[1:], project='coriolis', version='1.0.0')
+    utils.setup_logging()
+
+    server = service.MessagingService(
+        constants.DEPLOYER_MANAGER_MAIN_MESSAGING_TOPIC,
+        [rpc_server.DeployerManagerServerEndpoint()],
+        rpc_server.VERSION, worker_count=CONF.deployer_manager.worker_count)
+    launcher = service.service.launch(
+        CONF, server, workers=server.get_workers_count())
+    launcher.wait()
+
+
+if __name__ == '__main__':
+    main()

+ 4 - 6
coriolis/conductor/rpc/client.py

@@ -220,12 +220,10 @@ class ConductorClient(rpc.BaseRPCClient):
             include_task_info=include_task_info)
 
     def confirm_deployer_completed(
-            self, ctxt, deployment_id, skip_os_morphing=False, force=False,
-            clone_disks=True, user_scripts=None):
+            self, ctxt, deployment_id, force=False):
         return self._cast(
             ctxt, 'confirm_deployer_completed', deployment_id=deployment_id,
-            skip_os_morphing=skip_os_morphing, force=force,
-            clone_disks=clone_disks, user_scripts=user_scripts)
+            force=force)
 
     def report_deployer_failure(
             self, ctxt, deployemnt_id, deployer_error_details):
@@ -236,7 +234,7 @@ class ConductorClient(rpc.BaseRPCClient):
     def deploy_transfer_instances(
             self, ctxt, transfer_id, force, wait_for_execution=None,
             instance_osmorphing_minion_pool_mappings=None, clone_disks=False,
-            skip_os_morphing=False, user_scripts=None):
+            skip_os_morphing=False, user_scripts=None, trust_id=None):
         return self._call(
             ctxt, 'deploy_transfer_instances', transfer_id=transfer_id,
             wait_for_execution=wait_for_execution,
@@ -244,7 +242,7 @@ class ConductorClient(rpc.BaseRPCClient):
                 instance_osmorphing_minion_pool_mappings),
             clone_disks=clone_disks, force=force,
             skip_os_morphing=skip_os_morphing,
-            user_scripts=user_scripts)
+            user_scripts=user_scripts, trust_id=trust_id)
 
     def delete_deployment(self, ctxt, deployment_id):
         self._call(

+ 31 - 20
coriolis/conductor/rpc/server.py

@@ -14,6 +14,7 @@ from coriolis import constants
 from coriolis import context
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
+from coriolis.deployer_manager.rpc import client as rpc_deployer_manager_client
 from coriolis import exception
 from coriolis import keystone
 from coriolis.licensing import client as licensing_client
@@ -173,6 +174,7 @@ class ConductorServerEndpoint(object):
         self._scheduler_client_instance = None
         self._transfer_cron_client_instance = None
         self._minion_manager_client_instance = None
+        self._deployer_manager_client_instance = None
 
     # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated
     # oslo_messaging clients as the underlying eventlet thread queues will
@@ -207,6 +209,13 @@ class ConductorServerEndpoint(object):
                 rpc_minion_manager_client.MinionManagerClient())
         return self._minion_manager_client_instance
 
+    @property
+    def _deployer_manager_client(self):
+        if not self._deployer_manager_client_instance:
+            self._deployer_manager_client_instance = (
+                rpc_deployer_manager_client.DeployerManagerClient())
+        return self._deployer_manager_client_instance
+
     def get_all_diagnostics(self, ctxt):
         client_objects = {
             "conductor": self,
@@ -697,11 +706,12 @@ class ConductorServerEndpoint(object):
 
     def _begin_tasks(
             self, ctxt, action, execution, task_info_override=None,
-            scheduling_retry_count=5, scheduling_retry_period=2):
+            scheduling_retry_count=5, scheduling_retry_period=2,
+            delete_trust_id=True):
         """ Starts all non-error-only tasks which have no depencies. """
         if not ctxt.trust_id:
             keystone.create_trust(ctxt)
-            ctxt.delete_trust_id = True
+            ctxt.delete_trust_id = delete_trust_id
 
         task_info = action.info
         if task_info_override is not None:
@@ -1106,7 +1116,8 @@ class ConductorServerEndpoint(object):
                 ctxt, execution,
                 constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         else:
-            self._begin_tasks(ctxt, transfer, execution)
+            self._begin_tasks(
+                ctxt, transfer, execution, delete_trust_id=not auto_deploy)
 
         if auto_deploy:
             deployment_options = {
@@ -1115,7 +1126,7 @@ class ConductorServerEndpoint(object):
                 "user_scripts": transfer.user_scripts,
                 "force": False,
             }
-            self._worker_client.execute_auto_deployment(
+            self._deployer_manager_client.execute_auto_deployment(
                 ctxt, transfer.id, execution.id, **deployment_options)
 
         return self.get_transfer_tasks_execution(
@@ -1387,11 +1398,13 @@ class ConductorServerEndpoint(object):
                 "No provider found for: %s" % endpoint.type)
         return provider_types["types"]
 
-    def _execute_deployment(
-            self, ctxt, deployment, skip_os_morphing, force, clone_disks,
-            user_scripts):
+    def _execute_deployment(self, ctxt, deployment, force):
         transfer = self._get_transfer(
             ctxt, deployment.transfer_id, include_task_info=True)
+        skip_os_morphing = deployment.skip_os_morphing
+        clone_disks = deployment.clone_disks
+        user_scripts = deployment.user_scripts
+
         self._check_transfer_running_executions(ctxt, transfer)
         self._check_valid_transfer_tasks_execution(transfer, force)
         for instance, info in transfer.info.items():
@@ -1601,16 +1614,11 @@ class ConductorServerEndpoint(object):
 
     @deployment_synchronized
     def confirm_deployer_completed(
-            self, ctxt, deployment_id, skip_os_morphing=False, force=False,
-            clone_disks=True, user_scripts=None):
-        if user_scripts is None:
-            user_scripts = {}
+            self, ctxt, deployment_id, force=False):
         try:
             deployment = self._get_deployment(
                 ctxt, deployment_id, include_task_info=True)
-            self._execute_deployment(
-                ctxt, deployment, skip_os_morphing, force, clone_disks,
-                user_scripts)
+            self._execute_deployment(ctxt, deployment, force)
         except BaseException:
             LOG.error(
                 f"Something went wrong while attempting to launch pending "
@@ -1650,11 +1658,13 @@ class ConductorServerEndpoint(object):
     @transfer_synchronized
     def deploy_transfer_instances(
             self, ctxt, transfer_id, force=False, wait_for_execution=None,
-            clone_disks=True, instance_osmorphing_minion_pool_mappings=None,
-            skip_os_morphing=False, user_scripts=None):
+            clone_disks=None, instance_osmorphing_minion_pool_mappings=None,
+            skip_os_morphing=False, user_scripts=None, trust_id=None):
         transfer = self._get_transfer(
             ctxt, transfer_id, include_task_info=True)
         user_scripts = user_scripts or transfer.user_scripts
+        clone_disks = clone_disks or transfer.clone_disks
+        skip_os_morphing = skip_os_morphing or transfer.skip_os_morphing
 
         instances = transfer.instances
 
@@ -1677,6 +1687,10 @@ class ConductorServerEndpoint(object):
         deployment.info = {}
         deployment.notes = transfer.notes
         deployment.user_scripts = user_scripts
+        deployment.clone_disks = clone_disks
+        deployment.skip_os_morphing = skip_os_morphing
+        deployment.deployer_id = wait_for_execution
+        deployment.trust_id = trust_id
         deployment.last_execution_status = constants.EXECUTION_STATUS_PENDING
         # NOTE: Deployments have no use for the source/target
         # pools of the parent Transfer so these can be omitted:
@@ -1688,14 +1702,11 @@ class ConductorServerEndpoint(object):
             deployment.instance_osmorphing_minion_pool_mappings.update(
                 instance_osmorphing_minion_pool_mappings)
 
-
         db_api.add_deployment(ctxt, deployment)
         LOG.info("Deployment created: %s", deployment.id)
 
         if not wait_for_execution:
-            self._execute_deployment(
-                ctxt, deployment, skip_os_morphing, force, clone_disks,
-                user_scripts)
+            self._execute_deployment(ctxt, deployment, force)
 
         return self.get_deployment(ctxt, deployment.id)
 

+ 1 - 0
coriolis/constants.py

@@ -316,6 +316,7 @@ WORKER_MAIN_MESSAGING_TOPIC = "coriolis_worker"
 SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler"
 TRANSFER_CRON_MAIN_MESSAGING_TOPIC = "coriolis_transfer_cron_worker"
 MINION_MANAGER_MAIN_MESSAGING_TOPIC = "coriolis_minion_manager"
+DEPLOYER_MANAGER_MAIN_MESSAGING_TOPIC = "coriolis_deployer_manager"
 
 MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE = "delete"
 MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF = "poweroff"

+ 22 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/023_add_deployer_id.py

@@ -0,0 +1,22 @@
+import sqlalchemy
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    deployment = sqlalchemy.Table('deployment', meta, autoload=True)
+    deployer_id = sqlalchemy.Column(
+        'deployer_id', sqlalchemy.String(36), nullable=True)
+    trust_id = sqlalchemy.Column(
+        'trust_id', sqlalchemy.String(255), nullable=True)
+    created_columns = []
+    try:
+        deployment.create_column(deployer_id)
+        created_columns.append(deployer_id)
+        deployment.create_column(trust_id)
+        created_columns.append(trust_id)
+    except Exception:
+        for c in created_columns:
+            deployment.drop_column(c)
+        raise

+ 4 - 0
coriolis/db/sqlalchemy/models.py

@@ -367,6 +367,8 @@ class Deployment(BaseTransferAction):
     transfer = orm.relationship(
         Transfer, backref=orm.backref("deployments"),
         foreign_keys=[transfer_id])
+    deployer_id = sqlalchemy.Column(sqlalchemy.String(36), nullable=True)
+    trust_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
 
     __mapper_args__ = {
         'polymorphic_identity': 'deployment',
@@ -381,6 +383,8 @@ class Deployment(BaseTransferAction):
             "id": self.id,
             "transfer_id": self.transfer_id,
             "transfer_scenario_type": self.transfer.scenario,
+            "deployer_id": self.deployer_id,
+            "trust_id": self.trust_id,
         })
         return base
 

+ 0 - 0
coriolis/deployer_manager/__init__.py


+ 0 - 0
coriolis/deployer_manager/rpc/__init__.py


+ 36 - 0
coriolis/deployer_manager/rpc/client.py

@@ -0,0 +1,36 @@
+# Copyright 2025 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_config import cfg
+import oslo_messaging as messaging
+
+from coriolis import constants
+from coriolis import rpc
+
+VERSION = "1.0"
+
+deployer_manager_opts = [
+    cfg.IntOpt(
+        'deployer_manager_rpc_timeout',
+        help="Number of seconds until RPC calls to the deployer manager "
+             "timeout.")]
+CONF = cfg.CONF
+CONF.register_opts(deployer_manager_opts, 'deployer_manager')
+
+
+class DeployerManagerClient(rpc.BaseRPCClient):
+
+    def __init__(
+            self, timeout=None):
+        target = messaging.Target(
+            topic=constants.DEPLOYER_MANAGER_MAIN_MESSAGING_TOPIC,
+            version=VERSION)
+        if timeout is None:
+            timeout = CONF.deployer_manager.deployer_manager_rpc_timeout
+        super(DeployerManagerClient, self).__init__(target, timeout=timeout)
+
+    def execute_auto_deployment(
+            self, ctxt, transfer_id, deployer_id, **kwargs):
+        self._cast(
+            ctxt, 'execute_auto_deployment', transfer_id=transfer_id,
+            deployer_id=deployer_id, **kwargs)

+ 118 - 0
coriolis/deployer_manager/rpc/server.py

@@ -0,0 +1,118 @@
+# Copyright 2025 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import time
+
+import eventlet
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from coriolis.conductor.rpc import client as rpc_conductor_client
+from coriolis import constants
+from coriolis import context
+from coriolis import exception
+from coriolis import keystone
+from coriolis import utils
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+PENDING_STATUS = constants.EXECUTION_STATUS_PENDING
+VERSION = "1.0"
+
+
+class DeployerManagerServerEndpoint:
+
+    def __init__(self):
+        self._admin_ctx = context.get_admin_context()
+        self._conductor_client_instance = None
+        self._init_loop()
+
+    @property
+    def _rpc_conductor_client(self):
+        if not getattr(self, '_conductor_client_instance', None):
+            self._conductor_client_instance = (
+                rpc_conductor_client.ConductorClient())
+        return self._conductor_client_instance
+
+    def _check_deployer_status(self, deployment_id):
+        active_statuses = [
+            constants.EXECUTION_STATUS_UNEXECUTED,
+            constants.EXECUTION_STATUS_RUNNING,
+        ]
+        error_statuses = [
+            constants.EXECUTION_STATUS_ERROR,
+            constants.EXECUTION_STATUS_DEADLOCKED,
+            constants.EXECUTION_STATUS_CANCELED,
+            constants.EXECUTION_STATUS_CANCELLING,
+            constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
+            constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS,
+        ]
+        try:
+            deployment = self._rpc_conductor_client.get_deployment(
+                self._admin_ctx, deployment_id)
+            deployer_id = deployment.get('deployer_id')
+            transfer_id = deployment.get('transfer_id')
+            if not deployer_id:
+                raise exception.InvalidDeploymentState(
+                    f"Deployment '{deployment['id']}' is in {PENDING_STATUS} "
+                    f"status, without any deployer execution registered.")
+            deployer_execution = (
+                self._rpc_conductor_client.get_transfer_tasks_execution(
+                    self._admin_ctx, transfer_id, deployer_id))
+            LOG.debug(
+                f"Waiting for deployer '{deployer_id}' to complete.")
+            ex_status = deployer_execution['status']
+            LOG.debug(f"Deployer '{deployer_id}' status is {ex_status}")
+            if ex_status in active_statuses:
+                return
+            elif ex_status == constants.EXECUTION_STATUS_COMPLETED:
+                LOG.debug(
+                    f"Confirming deployer '{deployer_id}' completed.")
+                admin_ctx = context.get_admin_context(
+                    trust_id=deployment['trust_id'])
+                admin_ctx.delete_trust_id = True
+                return self._rpc_conductor_client.confirm_deployer_completed(
+                    admin_ctx, deployment['id'], force=False)
+            else:
+                if ex_status in error_statuses:
+                    raise exception.InvalidTransferState(
+                        f"Got status '{ex_status}' for execution with ID "
+                        f"'{deployer_id}'. Deployment cannot occur.")
+                else:
+                    raise exception.InvalidTransferState(
+                        f"Deployer with ID '{deployer_id}' is in invalid "
+                        f"state '{ex_status}'. Deployment cannot occur.")
+        except BaseException as ex:
+            LOG.debug(
+                f"Reporting deployer failure for deployment "
+                f"'{deployment_id}'.")
+            self._rpc_conductor_client.report_deployer_failure(
+                self._admin_ctx, deployment_id, str(ex))
+
+    def _loop(self):
+        while True:
+            try:
+                deployments = self._rpc_conductor_client.get_deployments(
+                    self._admin_ctx, include_tasks=False,
+                    include_task_info=False)
+                for d in deployments:
+                    if d['last_execution_status'] == PENDING_STATUS:
+                        eventlet.spawn(self._check_deployer_status, d['id'])
+            except Exception:
+                LOG.warning(
+                    f"Deployer manager failed to list pending deployments. "
+                    f"Error was: {utils.get_exception_details()}")
+            time.sleep(10)
+
+    def _init_loop(self):
+        eventlet.spawn(self._loop)
+
+    def execute_auto_deployment(
+            self, ctxt, transfer_id, deployer_id, **kwargs):
+        LOG.debug(
+            f"Creating deployment for deployer ID '{deployer_id}' of transfer "
+            f"'{transfer_id}'")
+        keystone.create_trust(ctxt)
+        self._rpc_conductor_client.deploy_transfer_instances(
+            ctxt, transfer_id, wait_for_execution=deployer_id,
+            trust_id=ctxt.trust_id, **kwargs)

+ 1 - 0
setup.cfg

@@ -32,6 +32,7 @@ console_scripts =
     coriolis-scheduler= coriolis.cmd.scheduler:main
     coriolis-minion-manager= coriolis.cmd.minion_manager:main
     coriolis-dbsync = coriolis.cmd.db_sync:main
+    coriolis-deployer-manager = coriolis.cmd.deployer_manager:main
 
 [wheel]
 universal = 1