Przeglądaj źródła

Slot in minion pool tasks to Migrations.

Nashwan Azhari 5 lat temu
rodzic
commit
6c8ed0fb5e

+ 4 - 0
coriolis/api/v1/minion_pools.py

@@ -96,6 +96,10 @@ class MinionPoolController(api_wsgi.Controller):
     def _validate_update_body(self, id, context, body):
         try:
             minion_pool = body["minion_pool"]
+            if 'endpoint_id' in minion_pool:
+                raise exception.InvalidInput(
+                    "The 'endpoint_id' of a minion pool cannot be "
+                    "updated.")
             vals = {k: minion_pool[k] for k in minion_pool.keys() &
                     {"name", "environment_options", "minimum_minions",
                      "maximum_minions", "minion_max_idle_time",

+ 408 - 96
coriolis/conductor/rpc/server.py

@@ -1100,11 +1100,13 @@ class ConductorServerEndpoint(object):
 
             self._begin_tasks(ctxt, execution, task_info=replica.info)
         except Exception:
-            LOG.warn(
-                "Exception occured while verifying/registering Replica tasks "
-                "execution for Replica '%s'. Cleaning up all minion "
-                "allocations from DB." % replica.id)
-            self._deallocate_minion_machines_for_action(ctxt, replica)
+            if minion_pool_allocations:
+                LOG.warn(
+                    "Exception occured while verifying/registering Replica "
+                    "tasks execution for Replica '%s'. Cleaning up all minion "
+                    "allocations from DB: %s" % (
+                        replica.id, minion_pool_allocations))
+                self._deallocate_minion_machines_for_action(ctxt, replica)
             raise
 
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
@@ -1273,7 +1275,7 @@ class ConductorServerEndpoint(object):
 
         if action.instance_osmorphing_minion_pool_mappings:
             for (instance, pool_id) in (
-                    action.instance_osmorphing_minion_pool_mappings):
+                    action.instance_osmorphing_minion_pool_mappings.items()):
                 osmorphing_pool = _get_pool(pool_id)
                 if osmorphing_pool.origin_endpoint_id != (
                         action.destination_endpoint_id):
@@ -1299,6 +1301,7 @@ class ConductorServerEndpoint(object):
 
         replica = models.Replica()
         replica.id = str(uuid.uuid4())
+        replica.base_id = replica.id
         replica.origin_endpoint_id = origin_endpoint_id
         replica.origin_minion_pool_id = origin_minion_pool_id
         replica.destination_endpoint_id = destination_endpoint_id
@@ -1414,6 +1417,7 @@ class ConductorServerEndpoint(object):
 
         migration = models.Migration()
         migration.id = str(uuid.uuid4())
+        migration.base_id = migration.id
         migration.origin_endpoint_id = replica.origin_endpoint_id
         migration.destination_endpoint_id = replica.destination_endpoint_id
         # TODO(aznashwan): have these passed separately to the relevant
@@ -1428,6 +1432,18 @@ class ConductorServerEndpoint(object):
         migration.instances = instances
         migration.replica = replica
         migration.info = replica.info
+        migration.origin_minion_pool_id = replica.origin_minion_pool_id
+        migration.destination_minion_pool_id = (
+            replica.destination_minion_pool_id)
+        migration.instance_osmorphing_minion_pool_mappings = (
+            replica.instance_osmorphing_minion_pool_mappings)
+        if instance_osmorphing_minion_pool_mappings:
+            migration.instance_osmorphing_minion_pool_mappings.update(
+                instance_osmorphing_minion_pool_mappings)
+
+        minion_pool_allocations = self._allocate_minion_machines_for_action(
+            ctxt, migration, include_transfer_minions=False,
+            include_osmorphing_minions=not skip_os_morphing)
 
         execution = models.TasksExecution()
         migration.executions = [execution]
@@ -1458,15 +1474,36 @@ class ConductorServerEndpoint(object):
                 # "network_map": network_map,
                 # "storage_mappings": storage_mappings,
 
+            instance_minion_machines = minion_pool_allocations.get(
+                instance, {})
+            instance_osmorphing_minion = instance_minion_machines.get(
+                'osmorphing_minion')
+
             validate_replica_deployment_inputs_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
                 execution)
 
+            validate_osmorphing_minion_task = None
+            last_validation_task = validate_replica_deployment_inputs_task
+            if not skip_os_morphing and instance_osmorphing_minion:
+                migration.info[instance].update({
+                    "osmorphing_minion_machine_id": instance_osmorphing_minion.id,
+                    "osmorphing_minion_provider_properties": (
+                        instance_osmorphing_minion.provider_properties),
+                    "osmorphing_minion_connection_info": (
+                        instance_osmorphing_minion.connection_info)})
+                validate_osmorphing_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,
+                    execution, depends_on=[
+                        validate_replica_deployment_inputs_task.id])
+                last_validation_task = validate_osmorphing_minion_task
+
             create_snapshot_task = self._create_task(
                 instance, constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS,
                 execution, depends_on=[
-                    validate_replica_deployment_inputs_task.id])
+                    last_validation_task.id])
 
             deploy_replica_task = self._create_task(
                 instance,
@@ -1476,25 +1513,67 @@ class ConductorServerEndpoint(object):
 
             depends_on = [deploy_replica_task.id]
             if not skip_os_morphing:
-                task_deploy_os_morphing_resources = self._create_task(
-                    instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
-                    execution, depends_on=depends_on)
+                task_deploy_os_morphing_resources = None
+                attach_osmorphing_minion_volumes_task = None
+                last_osmorphing_resources_deployment_task = None
+                if instance_osmorphing_minion:
+                    osmorphing_vol_attachment_deps = [
+                        validate_osmorphing_minion_task.id]
+                    osmorphing_vol_attachment_deps.extend(depends_on)
+                    attach_osmorphing_minion_volumes_task = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION,
+                        execution, depends_on=osmorphing_vol_attachment_deps)
+                    last_osmorphing_resources_deployment_task = (
+                        attach_osmorphing_minion_volumes_task)
+
+                    collect_osmorphing_info_task = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_COLLECT_OSMORPHING_INFO,
+                        execution,
+                        depends_on=[attach_osmorphing_minion_volumes_task.id])
+                    last_osmorphing_resources_deployment_task = (
+                        collect_osmorphing_info_task)
+                else:
+                    task_deploy_os_morphing_resources = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
+                        execution, depends_on=depends_on)
+                    last_osmorphing_resources_deployment_task = (
+                        task_deploy_os_morphing_resources)
 
                 task_osmorphing = self._create_task(
                     instance, constants.TASK_TYPE_OS_MORPHING,
                     execution, depends_on=[
-                        task_deploy_os_morphing_resources.id])
-
-                task_delete_os_morphing_resources = self._create_task(
-                    instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[
-                        task_deploy_os_morphing_resources.id,
-                        task_osmorphing.id],
-                    on_error=True)
-
-                depends_on = [
-                    task_osmorphing.id,
-                    task_delete_os_morphing_resources.id]
+                        last_osmorphing_resources_deployment_task.id])
+
+                depends_on = [task_osmorphing.id]
+
+                if instance_osmorphing_minion:
+                    detach_osmorphing_minion_volumes_task = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION,
+                        execution, depends_on=[
+                            attach_osmorphing_minion_volumes_task.id,
+                            task_osmorphing.id],
+                        on_error=True)
+                    depends_on.append(detach_osmorphing_minion_volumes_task.id)
+
+                    self._create_task(
+                        instance,
+                        constants.TASK_TYPE_RELEASE_OSMORPHING_MINION,
+                        execution, depends_on=[
+                            validate_osmorphing_minion_task.id,
+                            detach_osmorphing_minion_volumes_task.id],
+                        on_error=True)
+                else:
+                    task_delete_os_morphing_resources = self._create_task(
+                        instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
+                        execution, depends_on=[
+                            task_deploy_os_morphing_resources.id,
+                            task_osmorphing.id],
+                        on_error=True)
+                    depends_on.append(task_delete_os_morphing_resources.id)
 
             if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
                     destination_provider_types):
@@ -1534,11 +1613,21 @@ class ConductorServerEndpoint(object):
                     depends_on=[cleanup_deployment_task.id],
                     on_error=True)
 
-        self._check_execution_tasks_sanity(execution, migration.info)
-        db_api.add_migration(ctxt, migration)
-        LOG.info("Migration created: %s", migration.id)
+        try:
+            self._check_execution_tasks_sanity(execution, migration.info)
+            db_api.add_migration(ctxt, migration)
+            LOG.info("Migration created: %s", migration.id)
 
-        self._begin_tasks(ctxt, execution, task_info=migration.info)
+            self._begin_tasks(ctxt, execution, task_info=migration.info)
+        except Exception:
+            if minion_pool_allocations:
+                LOG.warn(
+                    "Exception occured while verifying/registering tasks "
+                    "execution for Migration '%s' from Replica '%s'. "
+                    "Cleaning up all minion allocations from DB: %s" % (
+                        migration.id, replica.id, minion_pool_allocations))
+                self._deallocate_minion_machines_for_action(ctxt, migration)
+            raise
 
         return self.get_migration(ctxt, migration.id)
 
@@ -1575,6 +1664,8 @@ class ConductorServerEndpoint(object):
         if action.instance_osmorphing_minion_pool_mappings:
             minion_pool_ids = minion_pool_ids.union(set(
                 action.instance_osmorphing_minion_pool_mappings.values()))
+        if None in minion_pool_ids:
+            minion_pool_ids.remove(None)
 
         def _select_machine(minion_pool, exclude=None):
             if not minion_pool.minion_machines:
@@ -1604,7 +1695,7 @@ class ConductorServerEndpoint(object):
                     "allocated ones: %s)" % (minion_pool.id, exclude))
             return selected_machine
 
-        instance_pool_allocations = {
+        instance_machine_allocations = {
             instance: {} for instance in action.instances}
         osmorphing_pool_map = (
             action.instance_osmorphing_minion_pool_mappings)
@@ -1651,7 +1742,7 @@ class ConductorServerEndpoint(object):
                         machine = _select_machine(
                             origin_pool, exclude=allocated_source_machine_ids)
                         allocated_source_machine_ids.add(machine.id)
-                        instance_pool_allocations[
+                        instance_machine_allocations[
                             instance]['source_minion'] = machine
                         LOG.debug(
                             "Selected minion machine '%s' for source-side "
@@ -1664,7 +1755,7 @@ class ConductorServerEndpoint(object):
                         machine = _select_machine(
                             dest_pool, exclude=allocated_target_machine_ids)
                         allocated_target_machine_ids.add(machine.id)
-                        instance_pool_allocations[
+                        instance_machine_allocations[
                             instance]['target_minion'] = machine
                         LOG.debug(
                             "Selected minion machine '%s' for target-side "
@@ -1676,6 +1767,10 @@ class ConductorServerEndpoint(object):
                         LOG.debug(
                             "Instance '%s' is not listed in the OSMorphing "
                             "minion pool mappings. Skipping." % instance)
+                    elif osmorphing_pool_map[instance] is None:
+                        LOG.debug(
+                            "OSMorphing pool ID for instance '%s' is "
+                            "None. Ignoring." % instance)
                     else:
                         osmorphing_pool_id = osmorphing_pool_map[instance]
                         # if the selected target and OSMorphing pools
@@ -1683,7 +1778,7 @@ class ConductorServerEndpoint(object):
                         if osmorphing_pool_id == (
                                 action.destination_minion_pool_id):
                             allocated_target_machine = (
-                                instance_pool_allocations[
+                                instance_machine_allocations[
                                     instance].get('target_minion'))
                             LOG.debug(
                                 "Reusing disk sync minion '%s' for the "
@@ -1691,7 +1786,7 @@ class ConductorServerEndpoint(object):
                                 "transfer action '%s'",
                                 allocated_target_machine.id, instance,
                                 action.id)
-                            instance_pool_allocations[
+                            instance_machine_allocations[
                                 instance]['osmorphing_minion'] = (
                                     allocated_target_machine)
                         # else, allocate a new minion from the selected pool:
@@ -1702,7 +1797,7 @@ class ConductorServerEndpoint(object):
                                 osmorphing_pool,
                                 exclude=allocated_osmorphing_machine_ids)
                             allocated_osmorphing_machine_ids.add(machine.id)
-                            instance_pool_allocations[
+                            instance_machine_allocations[
                                 instance]['osmorphing_minion'] = machine
                             LOG.debug(
                                 "Selected minion machine '%s' for OSMorphing "
@@ -1719,7 +1814,20 @@ class ConductorServerEndpoint(object):
                 ctxt, all_machine_ids, action.id,
                 constants.MINION_MACHINE_STATUS_ALLOCATED)
 
-        return instance_pool_allocations
+        # filter out redundancies:
+        instance_machine_allocations = {
+            instance: allocations
+            for (instance, allocations) in instance_machine_allocations.items()
+            if allocations}
+
+        LOG.debug(
+            "Allocated the following minion machines for action '%s': %s",
+            action.id, {
+                instance: {
+                    typ: machine.id
+                    for (typ, machine) in allocation.items()}
+                for (instance, allocation) in instance_machine_allocations.items()})
+        return instance_machine_allocations
 
     def _deallocate_minion_machines_for_action(self, ctxt, action):
         minion_pool_ids = set()
@@ -1731,7 +1839,12 @@ class ConductorServerEndpoint(object):
             minion_pool_ids = minion_pool_ids.union(set(
                 action.instance_osmorphing_minion_pool_mappings.values()))
 
-        if minion_pool_ids:
+        if not minion_pool_ids:
+            LOG.debug(
+                "No minion pools seem to have been used for action with "
+                "base_id '%s'. Skipping minion machine deallocation.",
+                action.base_id)
+        else:
             with contextlib.ExitStack() as stack:
                 _ = [
                     stack.enter_context(
@@ -1750,6 +1863,10 @@ class ConductorServerEndpoint(object):
                     db_api.set_minion_machines_allocation_statuses(
                         ctxt, machine_ids, None,
                         constants.MINION_MACHINE_STATUS_AVAILABLE)
+                else:
+                    LOG.debug(
+                        "No minion machines were found to be associated "
+                        "with action with base_id '%s'.", action.base_id)
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, origin_minion_pool_id,
@@ -1768,6 +1885,7 @@ class ConductorServerEndpoint(object):
 
         migration = models.Migration()
         migration.id = str(uuid.uuid4())
+        migration.base_id = migration.id
         migration.origin_endpoint_id = origin_endpoint_id
         migration.destination_endpoint_id = destination_endpoint_id
         migration.destination_environment = destination_environment
@@ -1795,6 +1913,10 @@ class ConductorServerEndpoint(object):
 
         self._check_minion_pools_for_action(ctxt, migration)
 
+        minion_pool_allocations = self._allocate_minion_machines_for_action(
+            ctxt, migration, include_transfer_minions=True,
+            include_osmorphing_minions=not skip_os_morphing)
+
         for instance in instances:
             migration.info[instance] = {
                 "volumes_info": [],
@@ -1811,26 +1933,58 @@ class ConductorServerEndpoint(object):
                 # "network_map": network_map,
                 # "storage_mappings": storage_mappings,
 
+            instance_minion_machines = minion_pool_allocations.get(
+                instance, {})
+            instance_source_minion = instance_minion_machines.get(
+                'source_minion')
+            instance_target_minion = instance_minion_machines.get(
+                'target_minion')
+            instance_osmorphing_minion = instance_minion_machines.get(
+                'osmorphing_minion')
+
             get_instance_info_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_GET_INSTANCE_INFO,
                 execution)
 
-            validate_migration_destination_inputs_task = self._create_task(
-                instance,
-                constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS,
-                execution,
-                depends_on=[get_instance_info_task.id])
-
             validate_migration_source_inputs_task = self._create_task(
                 instance,
                 constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
                 execution)
 
-            deploy_migration_source_resources_task = self._create_task(
+            validate_migration_destination_inputs_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES,
-                execution, depends_on=[validate_migration_source_inputs_task.id])
+                constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS,
+                execution,
+                depends_on=[get_instance_info_task.id])
+
+            migration_resources_task_ids = []
+            validate_source_minion_task = None
+            deploy_migration_source_resources_task = None
+            if instance_source_minion:
+                migration.info[instance].update({
+                    "source_minion_machine_id": instance_source_minion.id,
+                    "source_minion_provider_properties": (
+                        instance_source_minion.provider_properties),
+                    "source_minion_connection_info": (
+                        instance_source_minion.connection_info)})
+                validate_source_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
+                    execution,
+                    depends_on=[
+                        get_instance_info_task.id,
+                        validate_migration_source_inputs_task.id])
+                migration_resources_task_ids.append(
+                    validate_source_minion_task.id)
+            else:
+                deploy_migration_source_resources_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES,
+                    execution, depends_on=[
+                        validate_migration_source_inputs_task.id])
+                migration_resources_task_ids.append(
+                    deploy_migration_source_resources_task.id)
 
             create_instance_disks_task = self._create_task(
                 instance, constants.TASK_TYPE_CREATE_INSTANCE_DISKS,
@@ -1838,10 +1992,55 @@ class ConductorServerEndpoint(object):
                     validate_migration_source_inputs_task.id,
                     validate_migration_destination_inputs_task.id])
 
-            deploy_migration_target_resources_task = self._create_task(
-                instance,
-                constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES,
-                execution, depends_on=[create_instance_disks_task.id])
+            validate_target_minion_task = None
+            attach_target_minion_disks_task = None
+            deploy_migration_target_resources_task = None
+            if instance_target_minion:
+                migration.info[instance].update({
+                    "target_minion_machine_id": instance_target_minion.id,
+                    "target_minion_provider_properties": (
+                        instance_target_minion.provider_properties),
+                    "target_minion_connection_info": (
+                        instance_target_minion.connection_info),
+                    "target_minion_backup_writer_connection_info": (
+                        instance_target_minion.backup_writer_connection_info)})
+                ttyp = (
+                    constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY)
+                validate_target_minion_task = self._create_task(
+                    instance, ttyp, execution, depends_on=[
+                        validate_migration_destination_inputs_task.id])
+
+                attach_target_minion_disks_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION,
+                    execution, depends_on=[
+                        validate_target_minion_task.id,
+                        create_instance_disks_task.id])
+                migration_resources_task_ids.append(
+                    attach_target_minion_disks_task.id)
+            else:
+                deploy_migration_target_resources_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES,
+                    execution, depends_on=[create_instance_disks_task.id])
+                migration_resources_task_ids.append(
+                    deploy_migration_target_resources_task.id)
+
+            validate_osmorphing_minion_task = None
+            if not skip_os_morphing and instance_osmorphing_minion:
+                migration.info[instance].update({
+                    "osmorphing_minion_machine_id": instance_osmorphing_minion.id,
+                    "osmorphing_minion_provider_properties": (
+                        instance_osmorphing_minion.provider_properties),
+                    "osmorphing_minion_connection_info": (
+                        instance_osmorphing_minion.connection_info)})
+                validate_osmorphing_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY,
+                    execution, depends_on=[
+                        validate_migration_destination_inputs_task.id])
+                migration_resources_task_ids.append(
+                    validate_osmorphing_minion_task.id)
 
             # NOTE(aznashwan): re-executing the REPLICATE_DISKS task only works
             # if all the source disk snapshotting and worker setup steps are
@@ -1849,21 +2048,18 @@ class ConductorServerEndpoint(object):
             # This should no longer be a problem when worker pooling lands.
             last_sync_task = None
             first_sync_task = None
-            migration_resources_tasks = [
-                deploy_migration_source_resources_task.id,
-                deploy_migration_target_resources_task.id]
             for i in range(migration.replication_count):
                 # insert SHUTDOWN_INSTANCES task before the last sync:
                 if i == (migration.replication_count - 1) and (
                         migration.shutdown_instances):
-                    shutdown_deps = migration_resources_tasks
+                    shutdown_deps = migration_resources_task_ids
                     if last_sync_task:
                         shutdown_deps = [last_sync_task.id]
                     last_sync_task = self._create_task(
                         instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
                         execution, depends_on=shutdown_deps)
 
-                replication_deps = migration_resources_tasks
+                replication_deps = migration_resources_task_ids
                 if last_sync_task:
                     replication_deps = [last_sync_task.id]
 
@@ -1873,57 +2069,142 @@ class ConductorServerEndpoint(object):
                 if not first_sync_task:
                     first_sync_task = last_sync_task
 
-            delete_source_resources_task = self._create_task(
-                instance,
-                constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
-                execution, depends_on=[
-                    deploy_migration_source_resources_task.id,
-                    last_sync_task.id],
-                on_error=True)
+            release_source_minion_task = None
+            delete_source_resources_task = None
+            source_resource_cleanup_task = None
+            if instance_source_minion:
+                release_source_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_RELEASE_SOURCE_MINION,
+                    execution,
+                    depends_on=[
+                        validate_source_minion_task.id,
+                        last_sync_task.id],
+                    on_error=True)
+                source_resource_cleanup_task = release_source_minion_task
+            else:
+                delete_source_resources_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
+                    execution, depends_on=[
+                        deploy_migration_source_resources_task.id,
+                        last_sync_task.id],
+                    on_error=True)
+                source_resource_cleanup_task = delete_source_resources_task
 
             cleanup_source_storage_task = self._create_task(
                 instance, constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE,
                 execution, depends_on=[
                     first_sync_task.id,
-                    delete_source_resources_task.id],
+                    source_resource_cleanup_task.id],
                 on_error=True)
 
-            delete_destination_resources_task = self._create_task(
-                instance,
-                constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
-                execution, depends_on=[
-                    deploy_migration_target_resources_task.id,
-                    last_sync_task.id],
-                on_error=True)
+            target_resources_cleanup_task = None
+            if instance_target_minion:
+                detach_volumes_from_target_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION,
+                    execution,
+                    depends_on=[
+                        attach_target_minion_disks_task.id,
+                        last_sync_task.id],
+                    on_error=True)
+                target_resources_cleanup_task = (
+                    detach_volumes_from_target_minion_task)
+
+                self._create_task(
+                    instance,
+                    constants.TASK_TYPE_RELEASE_DESTINATION_MINION,
+                    execution, depends_on=[
+                        validate_target_minion_task.id,
+                        detach_volumes_from_target_minion_task.id],
+                    on_error=True)
+            else:
+                delete_destination_resources_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
+                    execution, depends_on=[
+                        deploy_migration_target_resources_task.id,
+                        last_sync_task.id],
+                    on_error=True)
+                target_resources_cleanup_task = (
+                    delete_destination_resources_task)
 
             deploy_instance_task = self._create_task(
                 instance, constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES,
                 execution, depends_on=[
                     last_sync_task.id,
-                    delete_destination_resources_task.id])
+                    target_resources_cleanup_task.id])
 
             depends_on = [deploy_instance_task.id]
-            task_delete_os_morphing_resources = None
+            osmorphing_resources_cleanup_task = None
             if not skip_os_morphing:
-                task_deploy_os_morphing_resources = self._create_task(
-                    instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
-                    execution, depends_on=depends_on)
+                task_deploy_os_morphing_resources = None
+                task_delete_os_morphing_resources = None
+                attach_osmorphing_minion_volumes_task = None
+                last_osmorphing_resources_deployment_task = None
+                if instance_osmorphing_minion:
+                    osmorphing_vol_attachment_deps = [
+                        validate_osmorphing_minion_task.id]
+                    osmorphing_vol_attachment_deps.extend(depends_on)
+                    attach_osmorphing_minion_volumes_task = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION,
+                        execution, depends_on=osmorphing_vol_attachment_deps)
+                    last_osmorphing_resources_deployment_task = (
+                        attach_osmorphing_minion_volumes_task)
+
+                    collect_osmorphing_info_task = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_COLLECT_OSMORPHING_INFO,
+                        execution,
+                        depends_on=[attach_osmorphing_minion_volumes_task.id])
+                    last_osmorphing_resources_deployment_task = (
+                        collect_osmorphing_info_task)
+                else:
+                    task_deploy_os_morphing_resources = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
+                        execution, depends_on=depends_on)
+                    last_osmorphing_resources_deployment_task = (
+                        task_deploy_os_morphing_resources)
 
                 task_osmorphing = self._create_task(
                     instance, constants.TASK_TYPE_OS_MORPHING,
                     execution, depends_on=[
-                        task_deploy_os_morphing_resources.id])
-
-                task_delete_os_morphing_resources = self._create_task(
-                    instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[
-                        task_deploy_os_morphing_resources.id,
-                        task_osmorphing.id],
-                    on_error=True)
-
-                depends_on = [
-                    task_osmorphing.id,
-                    task_delete_os_morphing_resources.id]
+                        last_osmorphing_resources_deployment_task.id])
+
+                depends_on = [task_osmorphing.id]
+
+                if instance_osmorphing_minion:
+                    detach_osmorphing_minion_volumes_task = self._create_task(
+                        instance,
+                        constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION,
+                        execution, depends_on=[
+                            attach_osmorphing_minion_volumes_task.id,
+                            task_osmorphing.id],
+                        on_error=True)
+                    depends_on.append(detach_osmorphing_minion_volumes_task.id)
+                    osmorphing_resources_cleanup_task = (
+                        detach_osmorphing_minion_volumes_task)
+
+                    self._create_task(
+                        instance,
+                        constants.TASK_TYPE_RELEASE_OSMORPHING_MINION,
+                        execution, depends_on=[
+                            validate_osmorphing_minion_task.id,
+                            detach_osmorphing_minion_volumes_task.id],
+                        on_error=True)
+                else:
+                    task_delete_os_morphing_resources = self._create_task(
+                        instance, constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES,
+                        execution, depends_on=[
+                            task_deploy_os_morphing_resources.id,
+                            task_osmorphing.id],
+                        on_error=True)
+                    depends_on.append(task_delete_os_morphing_resources.id)
+                    osmorphing_resources_cleanup_task = (
+                        task_delete_os_morphing_resources)
 
             if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
                     destination_provider_types):
@@ -1948,20 +2229,30 @@ class ConductorServerEndpoint(object):
             cleanup_deps = [
                 create_instance_disks_task.id,
                 cleanup_source_storage_task.id,
-                delete_destination_resources_task.id,
+                target_resources_cleanup_task.id,
                 cleanup_failed_deployment_task.id]
-            if task_delete_os_morphing_resources:
-                cleanup_deps.append(task_delete_os_morphing_resources.id)
+            if osmorphing_resources_cleanup_task:
+                cleanup_deps.append(osmorphing_resources_cleanup_task.id)
             self._create_task(
                 instance, constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE,
                 execution, depends_on=cleanup_deps,
                 on_error_only=True)
 
-        self._check_execution_tasks_sanity(execution, migration.info)
-        db_api.add_migration(ctxt, migration)
+        try:
+            self._check_execution_tasks_sanity(execution, migration.info)
+            db_api.add_migration(ctxt, migration)
 
-        LOG.info("Migration created: %s", migration.id)
-        self._begin_tasks(ctxt, execution, task_info=migration.info)
+            LOG.info("Migration created: %s", migration.id)
+            self._begin_tasks(ctxt, execution, task_info=migration.info)
+        except Exception:
+            if minion_pool_allocations:
+                LOG.warn(
+                    "Exception occured while verifying/registering tasks "
+                    "execution for Migration '%s'. Cleaning up all minion "
+                    "allocations from DB: %s" % (
+                        migration.id, minion_pool_allocations))
+                self._deallocate_minion_machines_for_action(ctxt, migration)
+            raise
 
         return self.get_migration(ctxt, migration.id)
 
@@ -2130,12 +2421,18 @@ class ConductorServerEndpoint(object):
                 "state advancement after cancellation.", execution.id)
 
     def _set_tasks_execution_status(self, ctxt, execution_id, execution_status):
-        LOG.info(
-            "Tasks execution %(id)s status updated to: %(status)s",
-            {"id": execution_id, "status": execution_status})
         execution = db_api.set_execution_status(
             ctxt, execution_id, execution_status)
+        LOG.info(
+            "Tasks execution %(id)s (action %(action)s) status updated "
+            "to: %(status)s",
+            {"id": execution_id, "status": execution_status,
+             "action": execution.action_id})
         if ctxt.delete_trust_id:
+            LOG.debug(
+                "Deleting Keystone trust following status change "
+                "for Execution '%s' (action '%s') to '%s'",
+                execution_id, execution.action_id, execution_status)
             keystone.delete_trust(ctxt)
 
         if execution.type in constants.MINION_POOL_EXECUTION_TYPES:
@@ -2143,9 +2440,22 @@ class ConductorServerEndpoint(object):
                 ctxt, execution, execution_status)
         else:
             if execution_status in constants.FINALIZED_EXECUTION_STATUSES:
+                LOG.debug(
+                    "Attempting to deallocate minion machines for finalized "
+                    "Execution '%s' of type '%s' (action '%s') following "
+                    "its transition into finalized status '%s'",
+                    execution.id, execution.type, execution.action_id,
+                    execution_status)
                 action = db_api.get_action(ctxt, execution.action_id)
                 self._deallocate_minion_machines_for_action(
                     ctxt, action)
+            else:
+                LOG.debug(
+                    "Not deallocating minion machines for Execution '%s' "
+                    "of type '%s' (action '%s') yet as it is still in an "
+                    "active Execution status (%s)",
+                    execution.id, execution.type, execution.action_id,
+                    execution_status)
 
     @staticmethod
     def _update_minion_pool_status_for_finished_execution(
@@ -2953,10 +3263,12 @@ class ConductorServerEndpoint(object):
             updated_task_info = None
             if task_result:
                 LOG.info(
-                    "Setting task %(task_id)s result for instance %(instance)s "
-                    "into action %(action_id)s info: %(task_result)s", {
+                    "Setting task %(task_id)s (type %(task_type)s)result for "
+                    "instance %(instance)s into action %(action_id)s info: "
+                    "%(task_result)s", {
                         "task_id": task_id,
                         "instance": task.instance,
+                        "task_type": task.task_type,
                         "action_id": action_id,
                         "task_result": utils.sanitize_task_info(
                             task_result)})

+ 1 - 0
coriolis/constants.py

@@ -159,6 +159,7 @@ TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY = (
 TASK_TYPE_RELEASE_SOURCE_MINION = "RELEASE_SOURCE_MINION"
 TASK_TYPE_RELEASE_DESTINATION_MINION = "RELEASE_DESTINATION_MINION"
 TASK_TYPE_RELEASE_OSMORPHING_MINION = "RELEASE_OSMORPHING_MINION"
+TASK_TYPE_COLLECT_OSMORPHING_INFO = "COLLECT_OS_MORPHING_INFO"
 
 TASK_PLATFORM_SOURCE = "source"
 TASK_PLATFORM_DESTINATION = "destination"

+ 12 - 0
coriolis/providers/base.py

@@ -624,3 +624,15 @@ class BaseMinionPoolProvider(
     def detach_volumes_from_minion(
             self, ctxt, connection_info, minion_properties, volumes_info):
         pass
+
+    @abc.abstractmethod
+    def get_additional_os_morphing_info(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
+        """ This method should return any additional 'osmorphing_info'
+        as defined in coriolis.schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA
+        Source-only providers can safely implement a stub method which returns
+        nothing, as this will only ever be called during OSMorphing for a
+        target plugin.
+        """
+        pass

+ 3 - 1
coriolis/tasks/factory.py

@@ -116,7 +116,9 @@ _TASKS_MAP = {
     constants.TASK_TYPE_RELEASE_DESTINATION_MINION:
         minion_pool_tasks.ReleaseDestinationMinionTask,
     constants.TASK_TYPE_RELEASE_OSMORPHING_MINION:
-        minion_pool_tasks.ReleaseOSMorphingMinionTask
+        minion_pool_tasks.ReleaseOSMorphingMinionTask,
+    constants.TASK_TYPE_COLLECT_OSMORPHING_INFO:
+        minion_pool_tasks.CollectOSMorphingInfoTask
 }
 
 

+ 42 - 0
coriolis/tasks/minion_pool_tasks.py

@@ -701,3 +701,45 @@ class ReleaseOSMorphingMinionTask(_BaseReleaseMinionTask):
     @classmethod
     def _get_minion_task_info_field_mappings(cls):
         return OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class CollectOSMorphingInfoTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["target_environment", "instance_deployment_info"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return ["osmorphing_info"]
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_MINION_POOL]}
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_MINION_POOL,
+            event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+        target_environment = task_info["target_environment"]
+        instance_deployment_info = task_info["instance_deployment_info"]
+
+        result = provider.get_additional_os_morphing_info(
+            ctxt, connection_info, target_environment,
+            instance_deployment_info)
+
+        if not isinstance(result, dict) or 'osmorphing_info' not in result:
+            raise exception.CoriolisException(
+                "'get_additional_os_morphing_info' method for provider of type"
+                " '%s' failed to return OSMorphing info.")
+
+        return {
+            "osmorphing_info": result["osmorphing_info"]}