Răsfoiți Sursa

Add auto deployment feature in worker component

This commit implements core logic for auto deployment feature, which allows
deployer executions to launch deployments that will wait and automatically
start after the transfer execution is completed. This is done by launching
a separate greenthread for each deployer execution.
Daniel Vincze 1 an în urmă
părinte
comite
a93acb60d8

+ 17 - 2
coriolis/conductor/rpc/client.py

@@ -216,12 +216,27 @@ class ConductorClient(rpc.BaseRPCClient):
             ctxt, 'get_deployment', deployment_id=deployment_id,
             include_task_info=include_task_info)
 
+    def confirm_deployer_completed(
+            self, ctxt, deployment_id, skip_os_morphing=False, force=False,
+            clone_disks=True, user_scripts=None):
+        return self._cast(
+            ctxt, 'confirm_deployer_completed', deployment_id=deployment_id,
+            skip_os_morphing=skip_os_morphing, force=force,
+            clone_disks=clone_disks, user_scripts=user_scripts)
+
+    def report_deployer_failure(
+            self, ctxt, deployemnt_id, deployer_error_details):
+        return self._cast(
+            ctxt, 'report_deployer_failure', deployment_id=deployemnt_id,
+            deployer_error_details=deployer_error_details)
+
     def deploy_transfer_instances(
-            self, ctxt, transfer_id,
+            self, ctxt, transfer_id, force, wait_for_execution=None,
             instance_osmorphing_minion_pool_mappings=None, clone_disks=False,
-            force=False, skip_os_morphing=False, user_scripts=None):
+            skip_os_morphing=False, user_scripts=None):
         return self._call(
             ctxt, 'deploy_transfer_instances', transfer_id=transfer_id,
+            wait_for_execution=wait_for_execution,
             instance_osmorphing_minion_pool_mappings=(
                 instance_osmorphing_minion_pool_mappings),
             clone_disks=clone_disks, force=force,

+ 103 - 53
coriolis/conductor/rpc/server.py

@@ -894,7 +894,8 @@ class ConductorServerEndpoint(object):
             execution.id, execution.type)
 
     @transfer_synchronized
-    def execute_transfer_tasks(self, ctxt, transfer_id, shutdown_instances):
+    def execute_transfer_tasks(self, ctxt, transfer_id, shutdown_instances,
+                               auto_deploy=False):
         transfer = self._get_transfer(
             ctxt, transfer_id, include_task_info=True)
         self._check_transfer_running_executions(ctxt, transfer)
@@ -1107,6 +1108,10 @@ class ConductorServerEndpoint(object):
         else:
             self._begin_tasks(ctxt, transfer, execution)
 
+        if auto_deploy:
+            self._worker_client.execute_auto_deployment(
+                ctxt, transfer.id, execution.id)
+
         return self.get_transfer_tasks_execution(
             ctxt, transfer_id, execution.id)
 
@@ -1372,70 +1377,35 @@ class ConductorServerEndpoint(object):
                 "No provider found for: %s" % endpoint.type)
         return provider_types["types"]
 
-    @transfer_synchronized
-    def deploy_transfer_instances(
-            self, ctxt, transfer_id, clone_disks, force,
-            instance_osmorphing_minion_pool_mappings=None,
-            skip_os_morphing=False, user_scripts=None):
-        transfer = self._get_transfer(
-            ctxt, transfer_id, include_task_info=True)
+    def _execute_deployment(
+            self, ctxt, deployment, skip_os_morphing, force, clone_disks,
+            user_scripts):
+        transfer = deployment.transfer
         self._check_transfer_running_executions(ctxt, transfer)
         self._check_valid_transfer_tasks_execution(transfer, force)
-        user_scripts = user_scripts or transfer.user_scripts
-
-        destination_endpoint = self.get_endpoint(
-            ctxt, transfer.destination_endpoint_id)
-        destination_provider_types = self._get_provider_types(
-            ctxt, destination_endpoint)
-
         for instance, info in transfer.info.items():
             if not info.get("volumes_info"):
                 raise exception.InvalidTransferState(
-                    "The transfer doesn't contain volumes information for "
-                    "instance: %s. If transferred disks are deleted, the "
-                    "transfer needs to be executed anew before a deployment"
-                    " can occur" % instance)
-
-        instances = transfer.instances
-
-        deployment = models.Deployment()
-        deployment.id = str(uuid.uuid4())
-        deployment.base_id = deployment.id
-        deployment.origin_endpoint_id = transfer.origin_endpoint_id
-        deployment.destination_endpoint_id = transfer.destination_endpoint_id
-        # TODO(aznashwan): have these passed separately to the relevant
-        # provider methods instead of through the dest-env:
-        dest_env = copy.deepcopy(transfer.destination_environment)
-        dest_env['network_map'] = transfer.network_map
-        dest_env['storage_mappings'] = transfer.storage_mappings
-        deployment.destination_environment = dest_env
-        deployment.source_environment = transfer.source_environment
-        deployment.network_map = transfer.network_map
-        deployment.storage_mappings = transfer.storage_mappings
-        deployment.instances = instances
-        deployment.transfer = transfer
+                    "The transfer doesn't contain volumes information "
+                    f"for instance: {instance}. If transferred disks are "
+                    "deleted, the transfer needs to be executed anew "
+                    "before a deployment can occur")
         deployment.info = transfer.info
-        deployment.notes = transfer.notes
-        deployment.user_scripts = user_scripts
-        # NOTE: Deployments have no use for the source/target
-        # pools of the parent Transfer so these can be omitted:
-        deployment.origin_minion_pool_id = None
-        deployment.destination_minion_pool_id = None
-        deployment.instance_osmorphing_minion_pool_mappings = (
-            transfer.instance_osmorphing_minion_pool_mappings)
-        if instance_osmorphing_minion_pool_mappings:
-            deployment.instance_osmorphing_minion_pool_mappings.update(
-                instance_osmorphing_minion_pool_mappings)
         self._check_minion_pools_for_action(ctxt, deployment)
         self._check_reservation_for_transfer(transfer)
 
+        destination_endpoint = self.get_endpoint(
+            ctxt, transfer.destination_endpoint_id)
+        destination_provider_types = self._get_provider_types(
+            ctxt, destination_endpoint)
+
         execution = models.TasksExecution()
         deployment.executions = [execution]
         execution.status = constants.EXECUTION_STATUS_UNEXECUTED
         execution.number = 1
         execution.type = constants.EXECUTION_TYPE_DEPLOYMENT
 
-        for instance in instances:
+        for instance in deployment.instances:
             deployment.info[instance]["clone_disks"] = clone_disks
             scripts = self._get_instance_scripts(user_scripts, instance)
             deployment.info[instance]["user_scripts"] = scripts
@@ -1451,7 +1421,7 @@ class ConductorServerEndpoint(object):
             # could be in the `.info` field instead of the old ones)
             deployment.info[instance].update({
                 "source_environment": deployment.source_environment,
-                "target_environment": dest_env})
+                "target_environment": deployment.destination_environment})
             # TODO(aznashwan): have these passed separately to the relevant
             # provider methods (they're currently passed directly inside
             # dest-env by the API service when accepting the call)
@@ -1599,8 +1569,7 @@ class ConductorServerEndpoint(object):
                     on_error=True)
 
         self._check_execution_tasks_sanity(execution, deployment.info)
-        db_api.add_deployment(ctxt, deployment)
-        LOG.info("Deployment created: %s", deployment.id)
+        db_api.add_transfer_tasks_execution(ctxt, execution)
 
         if not skip_os_morphing and (
                 deployment.instance_osmorphing_minion_pool_mappings):
@@ -1619,6 +1588,87 @@ class ConductorServerEndpoint(object):
         else:
             self._begin_tasks(ctxt, deployment, execution)
 
+    @deployment_synchronized
+    def confirm_deployer_completed(
+            self, ctxt, deployment_id, skip_os_morphing=False, force=False,
+            clone_disks=True, user_scripts=None):
+        if user_scripts is None:
+            user_scripts = {}
+        deployment = self._get_deployment(
+            ctxt, deployment_id, include_task_info=True)
+        self._execute_deployment(
+            ctxt, deployment, skip_os_morphing, force, clone_disks,
+            user_scripts)
+
+    @deployment_synchronized
+    def report_deployer_failure(
+            self, ctxt, deployment_id, deployer_error_details):
+        deployment = self._get_deployment(ctxt, deployment_id)
+        error_status = constants.EXECUTION_STATUS_ERROR
+        expected_status = constants.EXECUTION_STATUS_UNEXECUTED
+        if deployment.last_execution_status != expected_status:
+            raise exception.InvalidDeploymentState(
+                f"Deployment is in '{deployment.last_execution_status}' "
+                f"status instead of the expected '{expected_status}' to have "
+                f"deployers fail for it.")
+        LOG.warn(
+            f"Error occurred while waiting for deployer to finish. Setting "
+            f"{error_status} to Deployment '{deployment_id}'. Error was: "
+            f"{deployer_error_details}")
+        db_api.set_action_last_execution_status(
+            ctxt, deployment.id, error_status)
+
+    @transfer_synchronized
+    def deploy_transfer_instances(
+            self, ctxt, transfer_id, force=False, wait_for_execution=None,
+            clone_disks=True, instance_osmorphing_minion_pool_mappings=None,
+            skip_os_morphing=False, user_scripts=None):
+        transfer = self._get_transfer(
+            ctxt, transfer_id, include_task_info=True)
+        user_scripts = user_scripts or transfer.user_scripts
+
+        instances = transfer.instances
+
+        deployment = models.Deployment()
+        deployment.id = str(uuid.uuid4())
+        deployment.base_id = deployment.id
+        deployment.origin_endpoint_id = transfer.origin_endpoint_id
+        deployment.destination_endpoint_id = transfer.destination_endpoint_id
+        # TODO(aznashwan): have these passed separately to the relevant
+        # provider methods instead of through the dest-env:
+        dest_env = copy.deepcopy(transfer.destination_environment)
+        dest_env['network_map'] = transfer.network_map
+        dest_env['storage_mappings'] = transfer.storage_mappings
+        deployment.destination_environment = dest_env
+        deployment.source_environment = transfer.source_environment
+        deployment.network_map = transfer.network_map
+        deployment.storage_mappings = transfer.storage_mappings
+        deployment.instances = instances
+        deployment.transfer = transfer
+        deployment.info = {}
+        deployment.notes = transfer.notes
+        deployment.user_scripts = user_scripts
+        deployment.last_execution_status = (
+            constants.EXECUTION_STATUS_UNEXECUTED)
+        # NOTE: Deployments have no use for the source/target
+        # pools of the parent Transfer so these can be omitted:
+        deployment.origin_minion_pool_id = None
+        deployment.destination_minion_pool_id = None
+        deployment.instance_osmorphing_minion_pool_mappings = (
+            transfer.instance_osmorphing_minion_pool_mappings)
+        if instance_osmorphing_minion_pool_mappings:
+            deployment.instance_osmorphing_minion_pool_mappings.update(
+                instance_osmorphing_minion_pool_mappings)
+
+
+        db_api.add_deployment(ctxt, deployment)
+        LOG.info("Deployment created: %s", deployment.id)
+
+        if not wait_for_execution:
+            self._execute_deployment(
+                ctxt, deployment, skip_os_morphing, force, clone_disks,
+                user_scripts)
+
         return self.get_deployment(ctxt, deployment.id)
 
     def _get_instance_scripts(self, user_scripts, instance):

+ 6 - 0
coriolis/worker/rpc/client.py

@@ -186,3 +186,9 @@ class WorkerClient(rpc.BaseRPCClient):
         return self._call(
             ctxt, 'validate_endpoint_destination_minion_pool_options',
             platform_name=platform_name, pool_environment=pool_environment)
+
+    def execute_auto_deployment(
+            self, ctxt, transfer_id, deployer_execution, **kwargs):
+        self._cast(
+            ctxt, 'execute_auto_deployment', transfer_id=transfer_id,
+            deployer_execution=deployer_execution, **kwargs)

+ 70 - 0
coriolis/worker/rpc/server.py

@@ -133,6 +133,76 @@ class WorkerServerEndpoint(object):
             self._rpc_conductor_client.confirm_task_cancellation(
                 ctxt, task_id, msg)
 
+    def _deploy_after_execution(
+            self, ctxt, deployment, transfer_id, execution_id, **kwargs):
+        ctxt.trust_id = None
+        active_statuses = [
+            constants.EXECUTION_STATUS_UNEXECUTED,
+            constants.EXECUTION_STATUS_RUNNING,
+            constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS
+        ]
+        error_statuses = [
+            constants.EXECUTION_STATUS_ERROR,
+            constants.EXECUTION_STATUS_DEADLOCKED,
+            constants.EXECUTION_STATUS_CANCELED,
+            constants.EXECUTION_STATUS_CANCELLING,
+            constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING,
+            constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS,
+        ]
+        LOG.debug(f"Waiting for deployer '{execution_id}' to complete.")
+        while True:
+            execution = (
+                self._rpc_conductor_client.get_transfer_tasks_execution(
+                    ctxt, transfer_id, execution_id))
+            ex_status = execution['status']
+            LOG.debug(
+                f"Deployer '{execution_id}' status is {ex_status}")
+            if ex_status in active_statuses:
+                time.sleep(5)
+            elif ex_status == constants.EXECUTION_STATUS_COMPLETED:
+                LOG.debug(f"Confirming deployer '{execution_id}' completed.")
+                return self._rpc_conductor_client.confirm_deployer_completed(
+                    ctxt, deployment['id'], **kwargs)
+            else:
+                if ex_status in error_statuses:
+                    raise exception.InvalidTransferState(
+                        f"Got status '{ex_status}' for execution with "
+                        "ID '{execution.id}'. Deployment cannot occur.")
+                else:
+                    raise exception.InvalidTransferState(
+                        f"Unknown status '{ex_status}' of execution "
+                        f"with ID '{execution['id']}'. Deployment cannot "
+                        f"occur.")
+
+    def _execute_auto_deployment(
+            self, ctxt, transfer_id, deployer_execution, **kwargs):
+        deployment = None
+        try:
+            LOG.debug(
+                f"Creating deployment for deployer ID '{deployer_execution}' "
+                f"of transfer '{transfer_id}'")
+            deployment = self._rpc_conductor_client.deploy_transfer_instances(
+                ctxt, transfer_id, wait_for_execution=deployer_execution,
+                **kwargs)
+            self._deploy_after_execution(
+                ctxt, deployment, transfer_id, deployer_execution, **kwargs)
+        except BaseException as ex:
+            LOG.warning(
+                f"Error occurred while waiting for deployer "
+                f"'{deployer_execution}'. Error was: "
+                f"{utils.get_exception_details()}")
+            if deployment:
+                LOG.debug(
+                    f"Reporting deployer '{deployer_execution}' failure")
+                return self._rpc_conductor_client.report_deployer_failure(
+                    ctxt, deployment['id'], str(ex))
+
+    def execute_auto_deployment(
+            self, ctxt, transfer_id, deployer_execution, **kwargs):
+        eventlet.spawn(
+            self._execute_auto_deployment, ctxt, transfer_id,
+            deployer_execution, **kwargs)
+
     def _handle_mp_log_events(self, p, mp_log_q):
         while True:
             try: