Просмотр исходного кода

Make minion machine allocations asynchronous.

Nashwan Azhari 5 лет назад
Родитель
Сommit
f1ef142c16

+ 20 - 10
coriolis/api/v1/migrations.py

@@ -61,9 +61,17 @@ class MigrationController(api_wsgi.Controller):
             'destination_minion_pool_id')
         instance_osmorphing_minion_pool_mappings = migration.get(
             'instance_osmorphing_minion_pool_mappings', {})
-        destination_environment = migration.get(
-            "destination_environment", {})
         instances = migration["instances"]
+        extras = [
+            instance
+            for instance in instance_osmorphing_minion_pool_mappings
+            if instance not in instances]
+        if extras:
+            raise ValueError(
+                "One or more instance OSMorphing pool mappings were "
+                "provided for instances (%s) which are not part of the "
+                "migration's declared instances (%s)" % (extras, instances))
+
         notes = migration.get("notes")
         skip_os_morphing = migration.get("skip_os_morphing", False)
         shutdown_instances = migration.get(
@@ -80,20 +88,22 @@ class MigrationController(api_wsgi.Controller):
 
         network_map = migration.get("network_map", {})
         api_utils.validate_network_map(network_map)
-        destination_environment['network_map'] = network_map
-
-        # NOTE(aznashwan): we validate the destination environment for the
-        # import provider before appending the 'storage_mappings' parameter
-        # for plugins with strict property name checks which do not yet
-        # support storage mapping features:
-        self._endpoints_api.validate_target_environment(
-            context, destination_endpoint_id, destination_environment)
 
         # TODO(aznashwan): until the provider plugin interface is updated
         # to have separate 'network_map' and 'storage_mappings' fields,
         # we add them as part of the destination environment:
+        destination_environment = migration.get(
+            "destination_environment", {})
+        destination_environment['network_map'] = network_map
+        self._endpoints_api.validate_target_environment(
+            context, destination_endpoint_id, destination_environment)
+
         storage_mappings = migration.get("storage_mappings", {})
         api_utils.validate_storage_mappings(storage_mappings)
+        # NOTE(aznashwan): we validate the destination environment for the
+        # import provider before appending the 'storage_mappings' parameter
+        # for plugins with strict property name checks which do not yet
+        # support storage mapping features:
         destination_environment['storage_mappings'] = storage_mappings
 
         return (origin_endpoint_id, destination_endpoint_id,

+ 2 - 6
coriolis/api/v1/minion_pools.py

@@ -174,16 +174,12 @@ class MinionPoolController(api_wsgi.Controller):
                 if minion_pool['platform'] == (
                         constants.PROVIDER_PLATFORM_SOURCE):
                     self._endpoints_api.validate_endpoint_source_minion_pool_options(
-                        # TODO(aznashwan): remove endpoint ID fields redundancy
-                        # once DB models are overhauled:
-                        context, minion_pool['origin_endpoint_id'],
+                        context, minion_pool['endpoint_id'],
                         vals['environment_options'])
                 elif minion_pool['platform'] == (
                         constants.PROVIDER_PLATFORM_DESTINATION):
                     self._endpoints_api.validate_endpoint_destination_minion_pool_options(
-                        # TODO(aznashwan): remove endpoint ID fields redundancy
-                        # once DB models are overhauled:
-                        context, minion_pool['origin_endpoint_id'],
+                        context, minion_pool['endpoint_id'],
                         vals['environment_options'])
                 else:
                     raise Exception(

+ 19 - 10
coriolis/api/v1/replicas.py

@@ -67,16 +67,31 @@ class ReplicaController(api_wsgi.Controller):
         self._endpoints_api.validate_source_environment(
             context, origin_endpoint_id, source_environment)
 
-        network_map = replica.get("network_map", {})
-        api_utils.validate_network_map(network_map)
-        destination_environment['network_map'] = network_map
-
         origin_minion_pool_id = replica.get(
             'origin_minion_pool_id')
         destination_minion_pool_id = replica.get(
             'destination_minion_pool_id')
         instance_osmorphing_minion_pool_mappings = replica.get(
             'instance_osmorphing_minion_pool_mappings', {})
+        extras = [
+            instance
+            for instance in instance_osmorphing_minion_pool_mappings
+            if instance not in instances]
+        if extras:
+            raise ValueError(
+                "One or more instance OSMorphing pool mappings were "
+                "provided for instances (%s) which are not part of the "
+                "Replicas's declared instances (%s)" % (extras, instances))
+
+
+        # TODO(aznashwan): until the provider plugin interface is updated
+        # to have separate 'network_map' and 'storage_mappings' fields,
+        # we add them as part of the destination environment:
+        network_map = replica.get("network_map", {})
+        api_utils.validate_network_map(network_map)
+        destination_environment['network_map'] = network_map
+        self._endpoints_api.validate_target_environment(
+            context, destination_endpoint_id, destination_environment)
 
         user_scripts = replica.get('user_scripts', {})
         api_utils.validate_user_scripts(user_scripts)
@@ -87,15 +102,9 @@ class ReplicaController(api_wsgi.Controller):
         # import provider before appending the 'storage_mappings' parameter
         # for plugins with strict property name checks which do not yet
         # support storage mapping features:
-        self._endpoints_api.validate_target_environment(
-            context, destination_endpoint_id, destination_environment)
-
         storage_mappings = replica.get("storage_mappings", {})
         api_utils.validate_storage_mappings(storage_mappings)
 
-        # TODO(aznashwan): until the provider plugin interface is updated
-        # to have separate 'network_map' and 'storage_mappings' fields,
-        # we add them as part of the destination environment:
         destination_environment['storage_mappings'] = storage_mappings
 
         return (origin_endpoint_id, destination_endpoint_id,

+ 26 - 0
coriolis/conductor/rpc/client.py

@@ -402,3 +402,29 @@ class ConductorClient(rpc.BaseRPCClient):
     def delete_service(self, ctxt, service_id):
         return self._call(
             ctxt, 'delete_service', service_id=service_id)
+
+    def confirm_replica_minions_allocation(
+            self, ctxt, replica_id, minion_machine_allocations):
+        self._client.call(
+            ctxt, 'confirm_replica_minions_allocation', replica_id=replica_id,
+            minion_machine_allocations=minion_machine_allocations)
+
+    def report_replica_minions_allocation_error(
+            self, ctxt, replica_id, minion_allocation_error_details):
+        self._client.call(
+            ctxt, 'report_replica_minions_allocation_error', replica_id=replica_id,
+            minion_allocation_error_details=minion_allocation_error_details)
+
+    def confirm_migration_minions_allocation(
+            self, ctxt, migration_id, minion_machine_allocations):
+        self._client.call(
+            ctxt, 'confirm_migration_minions_allocation',
+            migration_id=migration_id,
+            minion_machine_allocations=minion_machine_allocations)
+
+    def report_migration_minions_allocation_error(
+            self, ctxt, migration_id, minion_allocation_error_details):
+        self._client.call(
+            ctxt, 'report_migration_minions_allocation_error',
+            migration_id=migration_id,
+            minion_allocation_error_details=minion_allocation_error_details)

+ 287 - 213
coriolis/conductor/rpc/server.py

@@ -821,18 +821,7 @@ class ConductorServerEndpoint(object):
         dest_env['network_map'] = replica.network_map
         dest_env['storage_mappings'] = replica.storage_mappings
 
-        minion_pool_allocations = self._allocate_minion_machines_for_action(
-            ctxt, replica, include_transfer_minions=True,
-            include_osmorphing_minions=False)
-
         for instance in execution.action.instances:
-            instance_minion_machines = minion_pool_allocations.get(
-                instance, {})
-            instance_source_minion = instance_minion_machines.get(
-                'source_minion')
-            instance_target_minion = instance_minion_machines.get(
-                'target_minion')
-
             # NOTE: we default/convert the volumes info to an empty list
             # to preserve backwards-compatibility with older versions
             # of Coriolis dating before the scheduling overhaul (PR##114)
@@ -869,13 +858,15 @@ class ConductorServerEndpoint(object):
 
             disk_deployment_depends_on = []
             validate_source_minion_task = None
-            if instance_source_minion:
+            if replica.origin_minion_pool_id:
+                # NOTE: these values are required for the
+                # _check_execution_tasks_sanity call but
+                # will be populated later when the pool
+                # allocations actually happen:
                 replica.info[instance].update({
-                    "source_minion_machine_id": instance_source_minion['id'],
-                    "source_minion_provider_properties": (
-                        instance_source_minion['provider_properties']),
-                    "source_minion_connection_info": (
-                        instance_source_minion['connection_info'])})
+                    "source_minion_machine_id": None,
+                    "source_minion_provider_properties": None,
+                    "source_minion_connection_info": None})
                 validate_source_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
@@ -890,15 +881,16 @@ class ConductorServerEndpoint(object):
                     validate_replica_source_inputs_task.id)
 
             validate_target_minion_task = None
-            if instance_target_minion:
+            if replica.destination_minion_pool_id:
+                # NOTE: these values are required for the
+                # _check_execution_tasks_sanity call but
+                # will be populated later when the pool
+                # allocations actually happen:
                 replica.info[instance].update({
-                    "target_minion_machine_id": instance_target_minion['id'],
-                    "target_minion_provider_properties": (
-                        instance_target_minion['provider_properties']),
-                    "target_minion_connection_info": (
-                        instance_target_minion['connection_info']),
-                    "target_minion_backup_writer_connection_info": (
-                        instance_target_minion['backup_writer_connection_info'])})
+                    "target_minion_machine_id": None,
+                    "target_minion_provider_properties": None,
+                    "target_minion_connection_info": None,
+                    "target_minion_backup_writer_connection_info": None})
                 validate_target_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY,
@@ -917,7 +909,7 @@ class ConductorServerEndpoint(object):
 
             shutdown_deps = []
             deploy_replica_source_resources_task = None
-            if not instance_source_minion:
+            if not replica.origin_minion_pool_id:
                 deploy_replica_source_resources_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
@@ -927,7 +919,7 @@ class ConductorServerEndpoint(object):
 
             attach_target_minion_disks_task = None
             deploy_replica_target_resources_task = None
-            if instance_target_minion:
+            if replica.destination_minion_pool_id:
                 ttyp = constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION
                 attach_target_minion_disks_task = self._create_task(
                     instance, ttyp, execution, depends_on=[
@@ -952,7 +944,7 @@ class ConductorServerEndpoint(object):
                 instance, constants.TASK_TYPE_REPLICATE_DISKS,
                 execution, depends_on=depends_on)
 
-            if instance_source_minion:
+            if replica.origin_minion_pool_id:
                 self._create_task(
                     instance,
                     constants.TASK_TYPE_RELEASE_SOURCE_MINION,
@@ -971,7 +963,7 @@ class ConductorServerEndpoint(object):
                         replicate_disks_task.id],
                     on_error=True)
 
-            if instance_target_minion:
+            if replica.destination_minion_pool_id:
                 detach_volumes_from_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION,
@@ -998,28 +990,28 @@ class ConductorServerEndpoint(object):
                         replicate_disks_task.id],
                     on_error=True)
 
-        try:
-            self._check_execution_tasks_sanity(execution, replica.info)
-
-            # update the action info for all of the Replicas:
-            for instance in execution.action.instances:
-                db_api.update_transfer_action_info_for_instance(
-                    ctxt, replica.id, instance, replica.info[instance])
+        self._check_execution_tasks_sanity(execution, replica.info)
 
-            # add new execution to DB:
-            db_api.add_replica_tasks_execution(ctxt, execution)
-            LOG.info("Replica tasks execution created: %s", execution.id)
+        # update the action info for all of the Replicas:
+        for instance in execution.action.instances:
+            db_api.update_transfer_action_info_for_instance(
+                ctxt, replica.id, instance, replica.info[instance])
 
+        # add new execution to DB:
+        db_api.add_replica_tasks_execution(ctxt, execution)
+        LOG.info("Replica tasks execution added to DB: %s", execution.id)
+
+        uses_minion_pools = any([
+            replica.origin_minion_pool_id,
+            replica.destination_minion_pool_id])
+        if uses_minion_pools:
+            self._minion_manager_client.allocate_minion_machines_for_replica(
+                ctxt, replica)
+            self._set_tasks_execution_status(
+                ctxt, execution.id,
+                constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
+        else:
             self._begin_tasks(ctxt, replica, execution)
-        except Exception:
-            if minion_pool_allocations:
-                LOG.warn(
-                    "Exception occured while verifying/registering Replica "
-                    "tasks execution for Replica '%s'. Cleaning up all minion "
-                    "allocations from DB: %s" % (
-                        replica.id, minion_pool_allocations))
-                self._deallocate_minion_machines_for_action(ctxt, replica)
-            raise
 
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
@@ -1310,10 +1302,6 @@ class ConductorServerEndpoint(object):
             migration.instance_osmorphing_minion_pool_mappings.update(
                 instance_osmorphing_minion_pool_mappings)
 
-        minion_pool_allocations = self._allocate_minion_machines_for_action(
-            ctxt, migration, include_transfer_minions=False,
-            include_osmorphing_minions=not skip_os_morphing)
-
         execution = models.TasksExecution()
         migration.executions = [execution]
         execution.status = constants.EXECUTION_STATUS_UNEXECUTED
@@ -1343,11 +1331,6 @@ class ConductorServerEndpoint(object):
                 # "network_map": network_map,
                 # "storage_mappings": storage_mappings,
 
-            instance_minion_machines = minion_pool_allocations.get(
-                instance, {})
-            instance_osmorphing_minion = instance_minion_machines.get(
-                'osmorphing_minion')
-
             validate_replica_deployment_inputs_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
@@ -1355,13 +1338,16 @@ class ConductorServerEndpoint(object):
 
             validate_osmorphing_minion_task = None
             last_validation_task = validate_replica_deployment_inputs_task
-            if not skip_os_morphing and instance_osmorphing_minion:
+            if not skip_os_morphing and instance in (
+                    migration.instance_osmorphing_minion_pool_mappings):
+                # NOTE: these values are required for the
+                # _check_execution_tasks_sanity call but
+                # will be populated later when the pool
+                # allocations actually happen:
                 migration.info[instance].update({
-                    "osmorphing_minion_machine_id": instance_osmorphing_minion['id'],
-                    "osmorphing_minion_provider_properties": (
-                        instance_osmorphing_minion['provider_properties']),
-                    "osmorphing_minion_connection_info": (
-                        instance_osmorphing_minion['connection_info'])})
+                    "osmorphing_minion_machine_id": None,
+                    "osmorphing_minion_provider_properties": None,
+                    "osmorphing_minion_connection_info": None})
                 validate_osmorphing_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,
@@ -1385,7 +1371,8 @@ class ConductorServerEndpoint(object):
                 task_deploy_os_morphing_resources = None
                 attach_osmorphing_minion_volumes_task = None
                 last_osmorphing_resources_deployment_task = None
-                if instance_osmorphing_minion:
+                if instance in (
+                        migration.instance_osmorphing_minion_pool_mappings):
                     osmorphing_vol_attachment_deps = [
                         validate_osmorphing_minion_task.id]
                     osmorphing_vol_attachment_deps.extend(depends_on)
@@ -1418,7 +1405,8 @@ class ConductorServerEndpoint(object):
 
                 depends_on = [task_osmorphing.id]
 
-                if instance_osmorphing_minion:
+                if instance in (
+                        migration.instance_osmorphing_minion_pool_mappings):
                     detach_osmorphing_minion_volumes_task = self._create_task(
                         instance,
                         constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION,
@@ -1482,21 +1470,25 @@ class ConductorServerEndpoint(object):
                     depends_on=[cleanup_deployment_task.id],
                     on_error=True)
 
-        try:
-            self._check_execution_tasks_sanity(execution, migration.info)
-            db_api.add_migration(ctxt, migration)
-            LOG.info("Migration created: %s", migration.id)
-
+        self._check_execution_tasks_sanity(execution, migration.info)
+        db_api.add_migration(ctxt, migration)
+        LOG.info("Migration created: %s", migration.id)
+
+        if not skip_os_morphing and (
+                migration.instance_osmorphing_minion_pool_mappings):
+            # NOTE: we lock on the migration ID to ensure the minion
+            # allocation confirmations don't come in too early:
+            with lockutils.lock(
+                    constants.MIGRATION_LOCK_NAME_FORMAT % migration.id,
+                    external=True):
+                self._minion_manager_client.allocate_minion_machines_for_migration(
+                    ctxt, migration, include_transfer_minions=False,
+                    include_osmorphing_minions=True)
+                self._set_tasks_execution_status(
+                    ctxt, execution.id,
+                    constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
+        else:
             self._begin_tasks(ctxt, migration, execution)
-        except Exception:
-            if minion_pool_allocations:
-                LOG.warn(
-                    "Exception occured while verifying/registering tasks "
-                    "execution for Migration '%s' from Replica '%s'. "
-                    "Cleaning up all minion allocations from DB: %s" % (
-                        migration.id, replica.id, minion_pool_allocations))
-                self._deallocate_minion_machines_for_action(ctxt, migration)
-            raise
 
         return self.get_migration(ctxt, migration.id)
 
@@ -1513,36 +1505,174 @@ class ConductorServerEndpoint(object):
                 ret["instances"][instance] = instance_script
         return ret
 
-    def _allocate_minion_machines_for_action(
-            self, ctxt, action, include_transfer_minions=True,
-            include_osmorphing_minions=True):
-        return self._minion_manager_client.allocate_minion_machines_for_action(
-            ctxt, action, include_transfer_minions=include_transfer_minions,
-            include_osmorphing_minions=include_osmorphing_minions)
-
     def _deallocate_minion_machines_for_action(self, ctxt, action):
-        # NOTE: we only send the required info for deallocation to avoid
-        # extraneous background loads from the DB of uneeded attributes
-        action_deallocation_info = {
-            "id": action.base_id,
-            "instances": action.instances,
-            "origin_minion_pool_id": action.origin_minion_pool_id,
-            "destination_minion_pool_id": action.destination_minion_pool_id,
-            "instance_osmorphing_minion_pool_mappings": (
-                action.instance_osmorphing_minion_pool_mappings)}
         return self._minion_manager_client.deallocate_minion_machines_for_action(
-            ctxt, action_deallocation_info)
+            ctxt, action.base_id)
 
     def _check_minion_pools_for_action(self, ctxt, action):
         return self._minion_manager_client.validate_minion_pool_selections_for_action(
             ctxt, action)
 
-    def accept_minion_allocations_for_execution(
-            self, ctxt, execution_id, action_id, pool_allocations):
-        # TODO: fetch execution
-        # update action_info
-        # call begin_tasks
-        pass
+    def _update_task_info_for_minion_allocations(
+            self, ctxt, action, minion_machine_allocations):
+
+        for instance in action.instances:
+            instance_minion_machines = minion_machine_allocations.get(
+                instance, {})
+            instance_source_minion = instance_minion_machines.get(
+                'source_minion')
+            if instance_source_minion:
+                action.info[instance].update({
+                    "source_minion_machine_id": instance_source_minion['id'],
+                    "source_minion_provider_properties": (
+                        instance_source_minion['provider_properties']),
+                    "source_minion_connection_info": (
+                        instance_source_minion['connection_info'])})
+
+            instance_target_minion = instance_minion_machines.get(
+                'target_minion')
+            if instance_target_minion:
+                action.info[instance].update({
+                    "target_minion_machine_id": instance_target_minion['id'],
+                    "target_minion_provider_properties": (
+                        instance_target_minion['provider_properties']),
+                    "target_minion_connection_info": (
+                        instance_target_minion['connection_info']),
+                    "target_minion_backup_writer_connection_info": (
+                        instance_target_minion['backup_writer_connection_info'])})
+
+            instance_osmorphing_minion = instance_minion_machines.get(
+                'osmorphing_minion')
+            if instance_osmorphing_minion:
+                action.info[instance].update({
+                    "osmorphing_minion_machine_id": instance_osmorphing_minion['id'],
+                    "osmorphing_minion_provider_properties": (
+                        instance_osmorphing_minion['provider_properties']),
+                    "osmorphing_minion_connection_info": (
+                        instance_osmorphing_minion['connection_info'])})
+
+        # update the action info for all of the instances:
+        for instance in minion_machine_allocations:
+            if instance not in action.instances:
+                LOG.warn(
+                    "Got minion pool allocations for machine(s) which are not "
+                    "part of action '%s'. Ignoring: %s", action.id,
+                    minion_machine_allocations[instance])
+                continue
+
+            db_api.update_transfer_action_info_for_instance(
+                ctxt, action.id, instance, action.info[instance])
+
+    def _get_last_execution_for_replica(self, ctxt, replica, requery=False):
+        if requery:
+            replica = self._get_replica(ctxt, replica.id)
+        last_replica_execution = None
+        if not replica.executions:
+            raise exception.InvalidReplicaState(
+                "Replica with ID '%s' has no existing Replica "
+                "executions." % (replica.id))
+        last_replica_execution = sorted(
+            replica.executions, key=lambda e: e.number)[-1]
+        return last_replica_execution
+
+    def _get_execution_for_migration(self, ctxt, migration, requery=False):
+        if requery:
+            migration = self._get_migration(ctxt, migration.id)
+
+        if not migration.executions:
+            raise exception.InvalidMigrationState(
+                "Migration with ID '%s' has no existing executions." % (
+                    migration.id))
+        if len(migration.executions) > 1:
+            raise exception.InvalidMigrationState(
+                "Migration with ID '%s' has more than one execution:"
+                " %s" % (migration.id, [e.id for e in migration.executions]))
+        return migration.executions[0]
+
+    @replica_synchronized
+    def confirm_replica_minions_allocation(
+            self, ctxt, replica_id, minion_machine_allocations):
+        replica = self._get_replica(ctxt, replica_id)
+
+        awaiting_minions_status = (
+            constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
+        if replica.last_execution_status != awaiting_minions_status:
+            raise exception.InvalidReplicaState(
+                "Replica is in '%s' status instead of the expected '%s' to "
+                "have minion machines allocated for it." % (
+                    replica.last_execution_status, awaiting_minions_status))
+
+        last_replica_execution = self._get_last_execution_for_replica(
+            ctxt, replica, requery=False)
+        self._update_task_info_for_minion_allocations(
+            ctxt, replica, minion_machine_allocations)
+
+        last_replica_execution = db_api.get_replica_tasks_execution(
+            ctxt, replica.id, last_replica_execution.id)
+        self._begin_tasks(
+            ctxt, replica, last_replica_execution)
+
+    @replica_synchronized
+    def report_replica_minions_allocation_error(
+            self, ctxt, replica_id, minion_allocation_error_details):
+        replica = self._get_replica(ctxt, replica_id)
+        awaiting_minions_status = (
+            constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
+        if replica.last_execution_status != awaiting_minions_status:
+            raise exception.InvalidReplicaState(
+                "Replica is in '%s' status instead of the expected '%s' to "
+                "have minion machines allocations fail for it." % (
+                    replica.last_execution_status, awaiting_minions_status))
+
+        last_replica_execution = self._get_last_execution_for_replica(
+            ctxt, replica, requery=False)
+        LOG.warn(
+            "Error occured while allocating minion machines for Replica '%s'. "
+            "Cancelling the current Replica Execution ('%s'). Error was: %s",
+            replica_id, last_replica_execution.id,
+            minion_allocation_error_details)
+        self._cancel_tasks_execution(
+            ctxt, last_replica_execution, requery=True)
+
+    @migration_synchronized
+    def confirm_migration_minions_allocation(
+            self, ctxt, migration_id, minion_machine_allocations):
+        migration = self._get_migration(ctxt, migration_id)
+
+        awaiting_minions_status = (
+            constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
+        if migration.last_execution_status != awaiting_minions_status:
+            raise exception.InvalidMigrationState(
+                "Migration is in '%s' status instead of the expected '%s' to "
+                "have minion machines allocated for it." % (
+                    migration.last_execution_status, awaiting_minions_status))
+
+        execution = self._get_execution_for_migration(
+            ctxt, migration, requery=False)
+        self._update_task_info_for_minion_allocations(
+            ctxt, migration, minion_machine_allocations)
+        self._begin_tasks(ctxt, migration, execution)
+
+    @migration_synchronized
+    def report_migration_minions_allocation_error(
+            self, ctxt, migration_id, minion_allocation_error_details):
+        migration = self._get_migration(ctxt, migration_id)
+        awaiting_minions_status = (
+            constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
+        if migration.last_execution_status != awaiting_minions_status:
+            raise exception.InvalidReplicaState(
+                "Machine is in '%s' status instead of the expected '%s' to "
+                "have minion machines allocations fail for it." % (
+                    migration.last_execution_status, awaiting_minions_status))
+
+        execution = self._get_execution_for_migration(
+            ctxt, migration, requery=False)
+        LOG.warn(
+            "Error occured while allocating minion machines for Migration '%s'. "
+            "Cancelling the current Execution ('%s'). Error was: %s",
+            migration_id, execution.id, minion_allocation_error_details)
+        self._cancel_tasks_execution(
+            ctxt, execution, requery=True)
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, origin_minion_pool_id,
@@ -1582,6 +1712,8 @@ class ConductorServerEndpoint(object):
         migration.replication_count = replication_count
         migration.origin_minion_pool_id = origin_minion_pool_id
         migration.destination_minion_pool_id = destination_minion_pool_id
+        if instance_osmorphing_minion_pool_mappings is None:
+            instance_osmorphing_minion_pool_mappings = {}
         migration.instance_osmorphing_minion_pool_mappings = (
             instance_osmorphing_minion_pool_mappings)
 
@@ -1590,10 +1722,6 @@ class ConductorServerEndpoint(object):
 
         self._check_minion_pools_for_action(ctxt, migration)
 
-        minion_pool_allocations = self._allocate_minion_machines_for_action(
-            ctxt, migration, include_transfer_minions=True,
-            include_osmorphing_minions=not skip_os_morphing)
-
         for instance in instances:
             migration.info[instance] = {
                 "volumes_info": [],
@@ -1610,15 +1738,6 @@ class ConductorServerEndpoint(object):
                 # "network_map": network_map,
                 # "storage_mappings": storage_mappings,
 
-            instance_minion_machines = minion_pool_allocations.get(
-                instance, {})
-            instance_source_minion = instance_minion_machines.get(
-                'source_minion')
-            instance_target_minion = instance_minion_machines.get(
-                'target_minion')
-            instance_osmorphing_minion = instance_minion_machines.get(
-                'osmorphing_minion')
-
             get_instance_info_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_GET_INSTANCE_INFO,
@@ -1641,13 +1760,15 @@ class ConductorServerEndpoint(object):
             migration_resources_task_deps = [
                 get_instance_info_task.id,
                 validate_migration_source_inputs_task.id]
-            if instance_source_minion:
+            if migration.origin_minion_pool_id:
+                # NOTE: these values are required for the
+                # _check_execution_tasks_sanity call but
+                # will be populated later when the pool
+                # allocations actually happen:
                 migration.info[instance].update({
-                    "source_minion_machine_id": instance_source_minion['id'],
-                    "source_minion_provider_properties": (
-                        instance_source_minion['provider_properties']),
-                    "source_minion_connection_info": (
-                        instance_source_minion['connection_info'])})
+                    "source_minion_machine_id": None,
+                    "source_minion_provider_properties": None,
+                    "source_minion_connection_info": None})
                 validate_source_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
@@ -1672,15 +1793,16 @@ class ConductorServerEndpoint(object):
             validate_target_minion_task = None
             attach_target_minion_disks_task = None
             deploy_migration_target_resources_task = None
-            if instance_target_minion:
+            if migration.destination_minion_pool_id:
+                # NOTE: these values are required for the
+                # _check_execution_tasks_sanity call but
+                # will be populated later when the pool
+                # allocations actually happen:
                 migration.info[instance].update({
-                    "target_minion_machine_id": instance_target_minion['id'],
-                    "target_minion_provider_properties": (
-                        instance_target_minion['provider_properties']),
-                    "target_minion_connection_info": (
-                        instance_target_minion['connection_info']),
-                    "target_minion_backup_writer_connection_info": (
-                        instance_target_minion['backup_writer_connection_info'])})
+                    "target_minion_machine_id": None,
+                    "target_minion_provider_properties": None,
+                    "target_minion_connection_info": None,
+                    "target_minion_backup_writer_connection_info": None})
                 ttyp = (
                     constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY)
                 validate_target_minion_task = self._create_task(
@@ -1704,13 +1826,16 @@ class ConductorServerEndpoint(object):
                     deploy_migration_target_resources_task.id)
 
             validate_osmorphing_minion_task = None
-            if not skip_os_morphing and instance_osmorphing_minion:
+            if not skip_os_morphing and (
+                    instance in instance_osmorphing_minion_pool_mappings):
+                # NOTE: these values are required for the
+                # _check_execution_tasks_sanity call but
+                # will be populated later when the pool
+                # allocations actually happen:
                 migration.info[instance].update({
-                    "osmorphing_minion_machine_id": instance_osmorphing_minion['id'],
-                    "osmorphing_minion_provider_properties": (
-                        instance_osmorphing_minion['provider_properties']),
-                    "osmorphing_minion_connection_info": (
-                        instance_osmorphing_minion['connection_info'])})
+                    "osmorphing_minion_machine_id": None,
+                    "osmorphing_minion_provider_properties": None,
+                    "osmorphing_minion_connection_info": None})
                 validate_osmorphing_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,
@@ -1719,10 +1844,6 @@ class ConductorServerEndpoint(object):
                 migration_resources_task_ids.append(
                     validate_osmorphing_minion_task.id)
 
-            # NOTE(aznashwan): re-executing the REPLICATE_DISKS task only works
-            # if all the source disk snapshotting and worker setup steps are
-            # performed by the source plugin in REPLICATE_DISKS.
-            # This should no longer be a problem when worker pooling lands.
             last_sync_task = None
             first_sync_task = None
             for i in range(migration.replication_count):
@@ -1749,7 +1870,7 @@ class ConductorServerEndpoint(object):
             release_source_minion_task = None
             delete_source_resources_task = None
             source_resource_cleanup_task = None
-            if instance_source_minion:
+            if migration.origin_minion_pool_id:
                 release_source_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_RELEASE_SOURCE_MINION,
@@ -1777,7 +1898,7 @@ class ConductorServerEndpoint(object):
                 on_error=True)
 
             target_resources_cleanup_task = None
-            if instance_target_minion:
+            if migration.destination_minion_pool_id:
                 detach_volumes_from_target_minion_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION,
@@ -1819,7 +1940,8 @@ class ConductorServerEndpoint(object):
                 task_delete_os_morphing_resources = None
                 attach_osmorphing_minion_volumes_task = None
                 last_osmorphing_resources_deployment_task = None
-                if instance_osmorphing_minion:
+                if instance in (
+                        migration.instance_osmorphing_minion_pool_mappings):
                     osmorphing_vol_attachment_deps = [
                         validate_osmorphing_minion_task.id]
                     osmorphing_vol_attachment_deps.extend(depends_on)
@@ -1852,7 +1974,8 @@ class ConductorServerEndpoint(object):
 
                 depends_on = [task_osmorphing.id]
 
-                if instance_osmorphing_minion:
+                if instance in (
+                        migration.instance_osmorphing_minion_pool_mappings):
                     detach_osmorphing_minion_volumes_task = self._create_task(
                         instance,
                         constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION,
@@ -1914,21 +2037,28 @@ class ConductorServerEndpoint(object):
                 execution, depends_on=cleanup_deps,
                 on_error_only=True)
 
-        try:
-            self._check_execution_tasks_sanity(execution, migration.info)
-            db_api.add_migration(ctxt, migration)
-
-            LOG.info("Migration created: %s", migration.id)
+        self._check_execution_tasks_sanity(execution, migration.info)
+        db_api.add_migration(ctxt, migration)
+        LOG.info("Migration added to DB: %s", migration.id)
+
+        uses_minion_pools = any([
+            migration.origin_minion_pool_id,
+            migration.destination_minion_pool_id,
+            migration.instance_osmorphing_minion_pool_mappings])
+        if uses_minion_pools:
+            # NOTE: we lock on the migration ID to ensure the minion
+            # allocation confirmations don't come in too early:
+            with lockutils.lock(
+                    constants.MIGRATION_LOCK_NAME_FORMAT % migration.id,
+                    external=True):
+                self._minion_manager_client.allocate_minion_machines_for_migration(
+                    ctxt, migration, include_transfer_minions=True,
+                    include_osmorphing_minions=not skip_os_morphing)
+                self._set_tasks_execution_status(
+                    ctxt, execution.id,
+                    constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
+        else:
             self._begin_tasks(ctxt, migration, execution)
-        except Exception:
-            if minion_pool_allocations:
-                LOG.warn(
-                    "Exception occured while verifying/registering tasks "
-                    "execution for Migration '%s'. Cleaning up all minion "
-                    "allocations from DB: %s" % (
-                        migration.id, minion_pool_allocations))
-                self._deallocate_minion_machines_for_action(ctxt, migration)
-            raise
 
         return self.get_migration(ctxt, migration.id)
 
@@ -2741,62 +2871,6 @@ class ConductorServerEndpoint(object):
                     db_api.update_replica(
                         ctxt, execution.action_id, task_info)
 
-        # elif task_type in (
-        #         constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES,
-        #         constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES):
-        #     still_running = _check_other_tasks_running(execution, task)
-        #     if not still_running:
-        #         LOG.info(
-        #             "Updating 'pool_shared_resources' for pool %s after "
-        #             "completion of task '%s' (type '%s').",
-        #             execution.action_id, task.id, task_type)
-        #         db_api.update_minion_pool_lifecycle(
-        #             ctxt, execution.action_id, {
-        #                 "pool_shared_resources": task_info.get(
-        #                     "pool_shared_resources", {})})
-
-        # elif task_type in (
-        #         constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
-        #         constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES):
-        #     still_running = _check_other_tasks_running(execution, task)
-        #     if not still_running:
-        #         LOG.info(
-        #             "Clearing 'pool_shared_resources' for pool %s following "
-        #             "completion of task '%s' (type %s)",
-        #             execution.action_id, task.id, task_type)
-        #         db_api.update_minion_pool_lifecycle(
-        #             ctxt, execution.action_id, {
-        #                 "pool_shared_resources": {}})
-
-        # elif task_type in (
-        #         constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
-        #         constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE):
-        #     LOG.info(
-        #         "Adding DB entry for Minion Machine '%s' of pool %s "
-        #         "following completion of task '%s' (type %s).",
-        #         task.instance, execution.action_id, task.id, task_type)
-        #     minion_machine = models.MinionMachine()
-        #     minion_machine.id = task.instance
-        #     minion_machine.pool_id = execution.action_id
-        #     minion_machine.status = (
-        #         constants.MINION_MACHINE_STATUS_AVAILABLE)
-        #     minion_machine.connection_info = task_info[
-        #         "minion_connection_info"]
-        #     minion_machine.provider_properties = task_info[
-        #         "minion_provider_properties"]
-        #     minion_machine.backup_writer_connection_info = task_info[
-        #         "minion_backup_writer_connection_info"]
-        #     db_api.add_minion_machine(ctxt, minion_machine)
-
-        # elif task_type in (
-        #         constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE,
-        #         constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE):
-        #     LOG.info(
-        #         "%s task for Minon Machine '%s' has completed successfully. "
-        #         "Deleting minion machine from DB.",
-        #         task_type, task.instance)
-        #     db_api.delete_minion_machine(ctxt, task.instance)
-
         elif task_type in (
                 constants.TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION,
                 constants.TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION):

+ 4 - 1
coriolis/constants.py

@@ -11,10 +11,13 @@ EXECUTION_STATUS_DEADLOCKED = "DEADLOCKED"
 EXECUTION_STATUS_CANCELED = "CANCELED"
 EXECUTION_STATUS_CANCELLING = "CANCELLING"
 EXECUTION_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
+EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS = "AWAITING_MINION_ALLOCATIONS"
+EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS = "ERROR_ALLOCATING_MINIONS"
 
 ACTIVE_EXECUTION_STATUSES = [
     EXECUTION_STATUS_RUNNING,
-    EXECUTION_STATUS_CANCELLING
+    EXECUTION_STATUS_CANCELLING,
+    EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS
 ]
 
 FINALIZED_EXECUTION_STATUSES = [

+ 6 - 2
coriolis/db/api.py

@@ -1305,8 +1305,12 @@ def get_minion_pools(
         q = q.options(orm.joinedload('progress_updates'))
     db_result = q.all()
     if to_dict:
-        return [i.to_dict(
-            include_machines=include_machines) for i in db_result]
+        return [
+            i.to_dict(
+                include_machines=include_machines,
+                include_events=include_events,
+                include_progress_updates=include_progress_updates)
+            for i in db_result]
     return db_result
 
 

+ 11 - 5
coriolis/minion_manager/rpc/client.py

@@ -57,17 +57,23 @@ class MinionManagerClient(object):
             ctxt, 'validate_minion_pool_selections_for_action',
             action=action)
 
-    def allocate_minion_machines_for_action(
-            self, ctxt, action, include_transfer_minions=True,
+    def allocate_minion_machines_for_replica(
+            self, ctxt, replica):
+        return self._client.cast(
+            ctxt, 'allocate_minion_machines_for_replica', replica=replica)
+
+    def allocate_minion_machines_for_migration(
+            self, ctxt, migration, include_transfer_minions=True,
             include_osmorphing_minions=True):
         return self._client.cast(
-            ctxt, 'allocate_minion_machines_for_action', action=action,
+            ctxt, 'allocate_minion_machines_for_migration',
+            migration=migration,
             include_transfer_minions=include_transfer_minions,
             include_osmorphing_minions=include_osmorphing_minions)
 
-    def deallocate_minion_machines_for_action(self, ctxt, action):
+    def deallocate_minion_machines_for_action(self, ctxt, action_id):
         return self._client.cast(
-            ctxt, 'deallocate_minion_machines_for_action', action=action)
+            ctxt, 'deallocate_minion_machines_for_action', action_id=action_id)
 
     def create_minion_pool(
             self, ctxt, name, endpoint_id, pool_platform, pool_os_type,

+ 135 - 58
coriolis/minion_manager/rpc/server.py

@@ -151,6 +151,7 @@ class MinionManagerServerEndpoint(object):
         return db_api.get_minion_pool_progress_step(ctxt, minion_pool_id)
 
     def validate_minion_pool_selections_for_action(self, ctxt, action):
+        """ Validates the minion pool selections for a given action. """
         if not isinstance(action, dict):
             raise exception.InvalidInput(
                 "Action must be a dict, got '%s': %s" % (
@@ -274,7 +275,44 @@ class MinionManagerServerEndpoint(object):
                     "instance '%s' during action '%s'." % (
                         pool_id, instance, action['id']))
 
-    def allocate_minion_machines_for_action(
+    def allocate_minion_machines_for_replica(
+            self, ctxt, replica):
+        minion_allocations = self._allocate_minion_machines_for_action(
+            ctxt, replica, include_transfer_minions=True,
+            include_osmorphing_minions=False)
+        try:
+            self._conductor_client.confirm_replica_minions_allocation(
+                ctxt, replica['id'], minion_allocations)
+        except Exception as ex:
+            LOG.warn(
+                "Error occured while reporting minion pool allocations for "
+                "Replica with ID '%s'. Removing all allocations. "
+                "Error was: %s" % (
+                    replica['id'], utils.get_exception_details()))
+            self.deallocate_minion_machines_for_action(
+                ctxt, replica['id'])
+            raise
+
+    def allocate_minion_machines_for_migration(
+            self, ctxt, migration, include_transfer_minions=True,
+            include_osmorphing_minions=True):
+        minion_allocations = self._allocate_minion_machines_for_action(
+            ctxt, migration, include_transfer_minions=include_transfer_minions,
+            include_osmorphing_minions=include_osmorphing_minions)
+        try:
+            self._conductor_client.confirm_migration_minions_allocation(
+                ctxt, migration['id'], minion_allocations)
+        except Exception as ex:
+            LOG.warn(
+                "Error occured while reporting minion pool allocations for "
+                "Migration with ID '%s'. Removing all allocations. "
+                "Error was: %s" % (
+                    migration['id'], utils.get_exception_details()))
+            self.deallocate_minion_machines_for_action(
+                ctxt, migration['id'])
+            raise
+
+    def _allocate_minion_machines_for_action(
             self, ctxt, action, include_transfer_minions=True,
             include_osmorphing_minions=True):
         """ Returns a dict of the form:
@@ -497,68 +535,44 @@ class MinionManagerServerEndpoint(object):
                 for (instance, allocation) in instance_machine_allocations.items()})
         return instance_machine_allocations
 
-    def deallocate_minion_machines_for_action(self, ctxt, action):
-        if not isinstance(action, dict):
-            raise exception.InvalidInput(
-                "Action must be a dict, got '%s': %s" % (
-                    type(action), action))
-        required_action_properties = [
-            'id', 'instances', 'origin_minion_pool_id',
-            'destination_minion_pool_id',
-            'instance_osmorphing_minion_pool_mappings']
-        missing = [
-            prop for prop in required_action_properties
-            if prop not in action]
-        if missing:
-            raise exception.InvalidInput(
-                "Missing the following required action properties for "
-                "minion pool machine allocation: %s. Got %s" % (
-                    missing, action))
+    def deallocate_minion_machines_for_action(self, ctxt, action_id):
 
-        minion_pool_ids = set()
-        if action['origin_minion_pool_id']:
-            minion_pool_ids.add(action['origin_minion_pool_id'])
-        if action['destination_minion_pool_id']:
-            minion_pool_ids.add(action['destination_minion_pool_id'])
-        if action['instance_osmorphing_minion_pool_mappings']:
-            minion_pool_ids = minion_pool_ids.union(set(
-                action['instance_osmorphing_minion_pool_mappings'].values()))
-        if None in minion_pool_ids:
-            minion_pool_ids.remove(None)
+        allocated_minion_machines = db_api.get_minion_machines(
+            ctxt, allocated_action_id=action_id)
 
-        if not minion_pool_ids:
+        if not allocated_minion_machines:
             LOG.debug(
-                "No minion pools seem to have been used for action with "
+                "No minion machines seem to have been used for action with "
                 "base_id '%s'. Skipping minion machine deallocation.",
-                action['id'])
-        else:
+                action_id)
+            return
+
+        minion_pool_ids = {
+            machine.pool_id for machine in allocated_minion_machines}
+
+        LOG.debug(
+            "Attempting to deallocate all minion pool machine selections "
+            "for action '%s'. Afferent pools are: %s",
+            action_id, minion_pool_ids)
+
+        with contextlib.ExitStack() as stack:
+            _ = [
+                stack.enter_context(
+                    lockutils.lock(
+                        constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
+                        external=True))
+                for pool_id in minion_pool_ids]
+
+            machine_ids = [m.id for m in allocated_minion_machines]
+            LOG.info(
+                "Releasing the following minion machines for "
+                "action '%s': %s", action_id, machine_ids)
+            db_api.set_minion_machines_allocation_statuses(
+                ctxt, machine_ids, None,
+                constants.MINION_MACHINE_STATUS_AVAILABLE)
             LOG.debug(
-                "Attempting to deallocate all minion pool machine selections "
-                "for action '%s'. Afferent pools are: %s",
-                action['id'], minion_pool_ids)
-
-            with contextlib.ExitStack() as stack:
-                _ = [
-                    stack.enter_context(
-                        lockutils.lock(
-                            constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
-                            external=True))
-                    for pool_id in minion_pool_ids]
-
-                minion_machines = db_api.get_minion_machines(
-                    ctxt, allocated_action_id=action['id'])
-                machine_ids = [m.id for m in minion_machines]
-                if machine_ids:
-                    LOG.info(
-                        "Releasing the following minion machines for "
-                        "action '%s': %s", action['base_id'], machine_ids)
-                    db_api.set_minion_machines_allocation_statuses(
-                        ctxt, machine_ids, None,
-                        constants.MINION_MACHINE_STATUS_AVAILABLE)
-                else:
-                    LOG.debug(
-                        "No minion machines were found to be associated "
-                        "with action with base_id '%s'.", action['base_id'])
+                "Successfully released all minion machines associated "
+                "with action with base_id '%s'.", action_id)
 
     def _get_minion_pool_healthcheck_flow(
             self, ctxt, minion_pool, requery=True):
@@ -1373,3 +1387,66 @@ class MinionManagerServerEndpoint(object):
     #         execution.type, new_execution_status)
     #     db_api.set_minion_pool_status(
     #         ctxt, execution.action_id, final_pool_status)
+
+    # def deallocate_minion_machines_for_action(self, ctxt, action_id):
+    #     if not isinstance(action, dict):
+    #         raise exception.InvalidInput(
+    #             "Action must be a dict, got '%s': %s" % (
+    #                 type(action), action))
+    #     required_action_properties = [
+    #         'id', 'instances', 'origin_minion_pool_id',
+    #         'destination_minion_pool_id',
+    #         'instance_osmorphing_minion_pool_mappings']
+    #     missing = [
+    #         prop for prop in required_action_properties
+    #         if prop not in action]
+    #     if missing:
+    #         raise exception.InvalidInput(
+    #             "Missing the following required action properties for "
+    #             "minion pool machine deallocation: %s. Got %s" % (
+    #                 missing, action))
+
+    #     minion_pool_ids = set()
+    #     if action['origin_minion_pool_id']:
+    #         minion_pool_ids.add(action['origin_minion_pool_id'])
+    #     if action['destination_minion_pool_id']:
+    #         minion_pool_ids.add(action['destination_minion_pool_id'])
+    #     if action['instance_osmorphing_minion_pool_mappings']:
+    #         minion_pool_ids = minion_pool_ids.union(set(
+    #             action['instance_osmorphing_minion_pool_mappings'].values()))
+    #     if None in minion_pool_ids:
+    #         minion_pool_ids.remove(None)
+
+    #     if not minion_pool_ids:
+    #         LOG.debug(
+    #             "No minion pools seem to have been used for action with "
+    #             "base_id '%s'. Skipping minion machine deallocation.",
+    #             action['id'])
+    #     else:
+    #         LOG.debug(
+    #             "Attempting to deallocate all minion pool machine selections "
+    #             "for action '%s'. Afferent pools are: %s",
+    #             action['id'], minion_pool_ids)
+
+    #         with contextlib.ExitStack() as stack:
+    #             _ = [
+    #                 stack.enter_context(
+    #                     lockutils.lock(
+    #                         constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
+    #                         external=True))
+    #                 for pool_id in minion_pool_ids]
+
+    #             minion_machines = db_api.get_minion_machines(
+    #                 ctxt, allocated_action_id=action['id'])
+    #             machine_ids = [m.id for m in minion_machines]
+    #             if machine_ids:
+    #                 LOG.info(
+    #                     "Releasing the following minion machines for "
+    #                     "action '%s': %s", action['base_id'], machine_ids)
+    #                 db_api.set_minion_machines_allocation_statuses(
+    #                     ctxt, machine_ids, None,
+    #                     constants.MINION_MACHINE_STATUS_AVAILABLE)
+    #             else:
+    #                 LOG.debug(
+    #                     "No minion machines were found to be associated "
+    #                     "with action with base_id '%s'.", action['base_id'])

+ 16 - 9
coriolis/minion_manager/rpc/tasks.py

@@ -59,7 +59,8 @@ class UpdateMinionPoolStatusTask(
         self._minion_pool_id = minion_pool_id
         self._task_name = (MINION_POOL_UPDATE_STATUS_TASK_NAME_FORMAT % (
             self._minion_pool_id, self._target_status)).lower()
-        self._previous_status = status_to_revert_to
+        self._previous_status = None
+        self._status_to_revert_to = status_to_revert_to
 
         super(UpdateMinionPoolStatusTask, self).__init__(
             name=self._task_name, **kwargs)
@@ -112,12 +113,19 @@ class UpdateMinionPoolStatusTask(
                 "reversion." % (self._task_name, self._minion_pool_id))
             return
 
-        if minion_pool.status == self._previous_status:
+        previous_status = self._previous_status
+        if self._status_to_revert_to:
+            LOG.debug(
+                "Forcibly reverting pool to status '%s' despite previous "
+                "status being '%s'",
+                self._status_to_revert_to, self._previous_status)
+            previous_status = self._status_to_revert_to
+        if minion_pool.status == previous_status:
             LOG.debug(
                 "[Task '%s'] Minion pool '%s' is/was already reverted to "
                 "'%s'." % (
                     self._task_name, self._minion_pool_id,
-                    self._previous_status))
+                    previous_status))
         else:
             if minion_pool.status != self._target_status:
                 LOG.warn(
@@ -125,19 +133,18 @@ class UpdateMinionPoolStatusTask(
                     "neither the previous status ('%s'), nor the newly-set "
                     "status ('%s'). Reverting to '%s' anyway.",
                     self._task_name, self._minion_pool_id, minion_pool.status,
-                    self._previous_status, self._target_status,
-                    self._previous_status)
+                    previous_status, self._target_status, previous_status)
             LOG.debug(
                 "[Task '%s'] Reverting pool '%s' status from '%s' to "
                 "'%s'" % (
                     self._task_name, self._minion_pool_id, minion_pool.status,
-                    self._previous_status))
+                    previous_status))
             db_api.set_minion_pool_status(
-                context, self._minion_pool_id, self._previous_status)
+                context, self._minion_pool_id, previous_status)
             self._add_minion_pool_event(
                 context,
                 "Pool status reverted from '%s' to '%s'" % (
-                    minion_pool.status, self._previous_status))
+                    minion_pool.status, previous_status))
 
 
 class BaseMinionManangerTask(coriolis_taskflow_base.BaseRunWorkerTask):
@@ -200,7 +207,7 @@ class BaseMinionManangerTask(coriolis_taskflow_base.BaseRunWorkerTask):
             context,
             "Failure occurred for one or more operations on minion pool '%s'. "
             "Please check the logs for additional details. Error messages "
-            "were: %s" % (
+            "were:\n%s" % (
                 self._minion_pool_id,
                 self._get_error_str_for_flow_failures(
                     flow_failures, full_tracebacks=False)),