Explorar el Código

Add minion machine selection logic for transfer actions.

Nashwan Azhari hace 5 años
padre
commit
ac32a03c2a

+ 13 - 3
coriolis/api/v1/minion_pools.py

@@ -52,12 +52,22 @@ class MinionPoolController(api_wsgi.Controller):
             self._endpoints_api.validate_endpoint_minion_pool_options(
             self._endpoints_api.validate_endpoint_minion_pool_options(
                 ctxt, endpoint_id, environment_options)
                 ctxt, endpoint_id, environment_options)
 
 
-            minimum_minions = minion_pool.get("minimum_minions", 0)
-            maximum_minions = minion_pool.get("maximum_minions", 1)
+            minimum_minions = minion_pool.get("minimum_minions", 1)
+            maximum_minions = minion_pool.get(
+                "maximum_minions", minimum_minions)
             minion_max_idle_time = minion_pool.get(
             minion_max_idle_time = minion_pool.get(
                 "minion_max_idle_time", 1)
                 "minion_max_idle_time", 1)
             minion_retention_strategy = minion_pool.get(
             minion_retention_strategy = minion_pool.get(
-                "minion_retention_strategy")
+                "minion_retention_strategy",
+                constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE)
+            suppoted_retention_strategies = [
+                constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE]
+            if minion_retention_strategy not in suppoted_retention_strategies:
+                raise Exception(
+                    "Unsupported minion retention strategy '%s'. Must be "
+                    "one of: %s" % (
+                        minion_retention_strategy,
+                        suppoted_retention_strategies))
             notes = minion_pool.get("notes")
             notes = minion_pool.get("notes")
             return (
             return (
                 name, endpoint_id, pool_os_type, environment_options,
                 name, endpoint_id, pool_os_type, environment_options,

+ 8 - 0
coriolis/api/v1/replicas.py

@@ -228,6 +228,14 @@ class ReplicaController(api_wsgi.Controller):
             final_values['destination_environment'][
             final_values['destination_environment'][
                 'network_map'] = final_network_map
                 'network_map'] = final_network_map
 
 
+        minion_pool_fields = [
+            "origin_minion_pool_id", "destination_minion_pool_id",
+            "instance_osmorphing_minion_pool_mappings"]
+        final_values.update({
+            mpf: updated_values[mpf]
+            for mpf in minion_pool_fields
+            if mpf in updated_values})
+
         return final_values
         return final_values
 
 
     def _validate_update_body(self, id, context, body):
     def _validate_update_body(self, id, context, body):

+ 406 - 48
coriolis/conductor/rpc/server.py

@@ -1,8 +1,10 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 # All Rights Reserved.
 
 
+import contextlib
 import copy
 import copy
 import functools
 import functools
+import itertools
 import random
 import random
 import time
 import time
 import uuid
 import uuid
@@ -907,7 +909,18 @@ class ConductorServerEndpoint(object):
         dest_env['network_map'] = replica.network_map
         dest_env['network_map'] = replica.network_map
         dest_env['storage_mappings'] = replica.storage_mappings
         dest_env['storage_mappings'] = replica.storage_mappings
 
 
+        minion_pool_allocations = self._allocate_minion_machines_for_action(
+            ctxt, replica, include_transfer_minions=True,
+            include_osmorphing_minions=False)
+
         for instance in execution.action.instances:
         for instance in execution.action.instances:
+            instance_minion_machines = minion_pool_allocations.get(
+                instance, {})
+            instance_source_minion = instance_minion_machines.get(
+                'source_minion')
+            instance_target_minion = instance_minion_machines.get(
+                'target_minion')
+
             # NOTE: we default/convert the volumes info to an empty list
             # NOTE: we default/convert the volumes info to an empty list
             # to preserve backwards-compatibility with older versions
             # to preserve backwards-compatibility with older versions
             # of Coriolis dating before the scheduling overhaul (PR##114)
             # of Coriolis dating before the scheduling overhaul (PR##114)
@@ -942,26 +955,81 @@ class ConductorServerEndpoint(object):
                 execution,
                 execution,
                 depends_on=[get_instance_info_task.id])
                 depends_on=[get_instance_info_task.id])
 
 
+            disk_deployment_depends_on = []
+            validate_source_minion_task = None
+            if instance_source_minion:
+                replica.info[instance].update({
+                    "source_minion_machine_id": instance_source_minion.id,
+                    "source_minion_provider_properties": (
+                        instance_source_minion.provider_properties),
+                    "source_minion_connection_info": (
+                        instance_source_minion.connection_info)})
+                validate_source_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY,
+                    execution,
+                    depends_on=[
+                        get_instance_info_task.id,
+                        validate_replica_source_inputs_task.id])
+                disk_deployment_depends_on.append(
+                    validate_source_minion_task.id)
+            else:
+                disk_deployment_depends_on.append(
+                    validate_replica_source_inputs_task.id)
+
+            validate_target_minion_task = None
+            if instance_target_minion:
+                replica.info[instance].update({
+                    "target_minion_machine_id": instance_target_minion.id,
+                    "target_minion_provider_properties": (
+                        instance_target_minion.provider_properties),
+                    "target_minion_connection_info": (
+                        instance_target_minion.connection_info),
+                    "target_minion_backup_writer_connection_info": (
+                        instance_target_minion.backup_writer_connection_info)})
+                validate_target_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY,
+                    execution,
+                    depends_on=[
+                        validate_replica_destination_inputs_task.id])
+                disk_deployment_depends_on.append(
+                    validate_target_minion_task.id)
+            else:
+                disk_deployment_depends_on.append(
+                    validate_replica_destination_inputs_task.id)
+
             deploy_replica_disks_task = self._create_task(
             deploy_replica_disks_task = self._create_task(
                 instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
                 instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
-                execution, depends_on=[
-                    validate_replica_source_inputs_task.id,
-                    validate_replica_destination_inputs_task.id])
+                execution, depends_on=disk_deployment_depends_on)
 
 
-            deploy_replica_source_resources_task = self._create_task(
-                instance,
-                constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
-                execution, depends_on=[deploy_replica_disks_task.id])
-
-            deploy_replica_target_resources_task = self._create_task(
-                instance,
-                constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
-                execution, depends_on=[
-                    deploy_replica_disks_task.id])
+            shutdown_deps = []
+            deploy_replica_source_resources_task = None
+            if not instance_source_minion:
+                deploy_replica_source_resources_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
+                    execution, depends_on=[
+                        deploy_replica_disks_task.id])
+                shutdown_deps.append(deploy_replica_source_resources_task)
+
+            attach_target_minion_disks_task = None
+            deploy_replica_target_resources_task = None
+            if instance_target_minion:
+                ttyp = constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION
+                attach_target_minion_disks_task = self._create_task(
+                    instance, ttyp, execution, depends_on=[
+                        deploy_replica_disks_task.id])
+                shutdown_deps.append(attach_target_minion_disks_task)
+            else:
+                deploy_replica_target_resources_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
+                    execution, depends_on=[
+                        deploy_replica_disks_task.id])
+                shutdown_deps.append(deploy_replica_target_resources_task)
 
 
-            depends_on = [
-                deploy_replica_source_resources_task.id,
-                deploy_replica_target_resources_task.id]
+            depends_on = [t.id for t in shutdown_deps]
             if shutdown_instances:
             if shutdown_instances:
                 shutdown_instance_task = self._create_task(
                 shutdown_instance_task = self._create_task(
                     instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
                     instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
@@ -972,35 +1040,73 @@ class ConductorServerEndpoint(object):
                 instance, constants.TASK_TYPE_REPLICATE_DISKS,
                 instance, constants.TASK_TYPE_REPLICATE_DISKS,
                 execution, depends_on=depends_on)
                 execution, depends_on=depends_on)
 
 
-            self._create_task(
-                instance,
-                constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
-                execution,
-                depends_on=[
-                    deploy_replica_source_resources_task.id,
-                    replicate_disks_task.id],
-                on_error=True)
+            if instance_source_minion:
+                self._create_task(
+                    instance,
+                    constants.TASK_TYPE_RELEASE_SOURCE_MINION,
+                    execution,
+                    depends_on=[
+                        validate_source_minion_task.id,
+                        replicate_disks_task.id],
+                    on_error=True)
+            else:
+                self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
+                    execution,
+                    depends_on=[
+                        deploy_replica_source_resources_task.id,
+                        replicate_disks_task.id],
+                    on_error=True)
 
 
-            self._create_task(
-                instance,
-                constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
-                execution, depends_on=[
-                    deploy_replica_target_resources_task.id,
-                    replicate_disks_task.id],
-                on_error=True)
+            if instance_target_minion:
+                detach_volumes_from_minion_task = self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION,
+                    execution,
+                    depends_on=[
+                        attach_target_minion_disks_task.id,
+                        replicate_disks_task.id],
+                    on_error=True)
 
 
-        self._check_execution_tasks_sanity(execution, replica.info)
+                self._create_task(
+                    instance,
+                    constants.TASK_TYPE_RELEASE_DESTINATION_MINION,
+                    execution,
+                    depends_on=[
+                        validate_target_minion_task.id,
+                        detach_volumes_from_minion_task.id],
+                    on_error=True)
+            else:
+                self._create_task(
+                    instance,
+                    constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
+                    execution, depends_on=[
+                        deploy_replica_target_resources_task.id,
+                        replicate_disks_task.id],
+                    on_error=True)
 
 
-        # update the action info for all of the Replicas:
-        for instance in execution.action.instances:
-            db_api.update_transfer_action_info_for_instance(
-                ctxt, replica.id, instance, replica.info[instance])
+        try:
+            self._check_execution_tasks_sanity(execution, replica.info)
 
 
-        # add new execution to DB:
-        db_api.add_replica_tasks_execution(ctxt, execution)
-        LOG.info("Replica tasks execution created: %s", execution.id)
+            # update the action info for all of the Replicas:
+            for instance in execution.action.instances:
+                db_api.update_transfer_action_info_for_instance(
+                    ctxt, replica.id, instance, replica.info[instance])
+
+            # add new execution to DB:
+            db_api.add_replica_tasks_execution(ctxt, execution)
+            LOG.info("Replica tasks execution created: %s", execution.id)
+
+            self._begin_tasks(ctxt, execution, task_info=replica.info)
+        except Exception:
+            LOG.warn(
+                "Exception occured while verifying/registering Replica tasks "
+                "execution for Replica '%s'. Cleaning up all minion "
+                "allocations from DB." % replica.id)
+            self._deallocate_minion_machines_for_action(ctxt, replica)
+            raise
 
 
-        self._begin_tasks(ctxt, execution, task_info=replica.info)
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
 
     @replica_synchronized
     @replica_synchronized
@@ -1131,7 +1237,7 @@ class ConductorServerEndpoint(object):
 
 
     def _check_minion_pools_for_action(self, ctxt, action):
     def _check_minion_pools_for_action(self, ctxt, action):
         minion_pools = {
         minion_pools = {
-            pool.id
+            pool.id: pool
             for pool in db_api.get_minion_pool_lifecycles(
             for pool in db_api.get_minion_pool_lifecycles(
                 ctxt, include_tasks_executions=False,
                 ctxt, include_tasks_executions=False,
                 include_info=False, include_machines=False,
                 include_info=False, include_machines=False,
@@ -1140,41 +1246,43 @@ class ConductorServerEndpoint(object):
             pool = minion_pools.get(pool_id)
             pool = minion_pools.get(pool_id)
             if not pool:
             if not pool:
                 raise exception.NotFound(
                 raise exception.NotFound(
-                    "Could not find minion pool with ID '%s'." % minion_pools)
+                    "Could not find minion pool with ID '%s'." % pool_id)
             return pool
             return pool
 
 
         if action.origin_minion_pool_id:
         if action.origin_minion_pool_id:
             origin_pool = _get_pool(action.origin_minion_pool_id)
             origin_pool = _get_pool(action.origin_minion_pool_id)
-            if origin_pool.endpoint_id != action.origin_endpoint_id:
+            if origin_pool.origin_endpoint_id != action.origin_endpoint_id:
                 raise exception.InvalidMinionPoolSelection(
                 raise exception.InvalidMinionPoolSelection(
                     "The selected origin minion pool ('%s') belongs to a "
                     "The selected origin minion pool ('%s') belongs to a "
                     "different Coriolis endpoint ('%s') than the requested "
                     "different Coriolis endpoint ('%s') than the requested "
                     "origin endpoint ('%s')" % (
                     "origin endpoint ('%s')" % (
-                        action.origin_minion_pool_id, origin_pool.endpoint_id,
+                        action.origin_minion_pool_id,
+                        origin_pool.origin_endpoint_id,
                         action.origin_endpoint_id))
                         action.origin_endpoint_id))
 
 
         if action.destination_minion_pool_id:
         if action.destination_minion_pool_id:
             destination_pool = _get_pool(action.destination_minion_pool_id)
             destination_pool = _get_pool(action.destination_minion_pool_id)
-            if destination_pool.endpoint_id != action.destination_endpoint_id:
+            if destination_pool.origin_endpoint_id != action.destination_endpoint_id:
                 raise exception.InvalidMinionPoolSelection(
                 raise exception.InvalidMinionPoolSelection(
                     "The selected destination minion pool ('%s') belongs to a "
                     "The selected destination minion pool ('%s') belongs to a "
                     "different Coriolis endpoint ('%s') than the requested "
                     "different Coriolis endpoint ('%s') than the requested "
                     "destination endpoint ('%s')" % (
                     "destination endpoint ('%s')" % (
                         action.destination_minion_pool_id,
                         action.destination_minion_pool_id,
-                        destination_pool.endpoint_id,
+                        destination_pool.origin_endpoint_id,
                         action.destination_endpoint_id))
                         action.destination_endpoint_id))
 
 
         if action.instance_osmorphing_minion_pool_mappings:
         if action.instance_osmorphing_minion_pool_mappings:
             for (instance, pool_id) in (
             for (instance, pool_id) in (
                     action.instance_osmorphing_minion_pool_mappings):
                     action.instance_osmorphing_minion_pool_mappings):
                 osmorphing_pool = _get_pool(pool_id)
                 osmorphing_pool = _get_pool(pool_id)
-                if osmorphing_pool.endpoint_id != action.destination_endpoint_id:
+                if osmorphing_pool.origin_endpoint_id != (
+                        action.destination_endpoint_id):
                     raise exception.InvalidMinionPoolSelection(
                     raise exception.InvalidMinionPoolSelection(
                         "The selected OSMorphing minion pool for instance '%s'"
                         "The selected OSMorphing minion pool for instance '%s'"
                         " ('%s') belongs to a different Coriolis endpoint "
                         " ('%s') belongs to a different Coriolis endpoint "
                         "('%s') than the destination endpoint ('%s')" % (
                         "('%s') than the destination endpoint ('%s')" % (
                             instance, pool_id,
                             instance, pool_id,
-                            osmorphing_pool.endpoint_id,
+                            osmorphing_pool.origin_endpoint_id,
                             action.destination_endpoint_id))
                             action.destination_endpoint_id))
 
 
     def create_instances_replica(self, ctxt, origin_endpoint_id,
     def create_instances_replica(self, ctxt, origin_endpoint_id,
@@ -1447,6 +1555,202 @@ class ConductorServerEndpoint(object):
                 ret["instances"][instance] = instance_script
                 ret["instances"][instance] = instance_script
         return ret
         return ret
 
 
+    def _allocate_minion_machines_for_action(
+            self, ctxt, action, include_transfer_minions=True,
+            include_osmorphing_minions=True):
+        """ Returns a dict of the form:
+        {
+            "instance_id": {
+                "source_minion": <source minion properties>,
+                "target_minion": <target minion properties>,
+                "osmorphing_minion": <osmorphing minion properties>
+            }
+        }
+        """
+        minion_pool_ids = set()
+        if action.origin_minion_pool_id:
+            minion_pool_ids.add(action.origin_minion_pool_id)
+        if action.destination_minion_pool_id:
+            minion_pool_ids.add(action.destination_minion_pool_id)
+        if action.instance_osmorphing_minion_pool_mappings:
+            minion_pool_ids = minion_pool_ids.union(set(
+                action.instance_osmorphing_minion_pool_mappings.values()))
+
+        def _select_machine(minion_pool, exclude=None):
+            if not minion_pool.minion_machines:
+                raise exception.InvalidMinionPoolSelection(
+                    "Minion pool with ID '%s' has no machines defined." % (
+                        minion_pool.id))
+            selected_machine = None
+            for machine in minion_pool.minion_machines:
+                if exclude and machine.id in exclude:
+                    LOG.debug(
+                        "Excluding minion machine '%s' from search.",
+                        machine.id)
+                    continue
+                if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
+                    LOG.debug(
+                        "Minion machine with ID '%s' is in status '%s' "
+                        "instead of '%s'. Skipping.", machine.id,
+                        machine.status,
+                        constants.MINION_MACHINE_STATUS_AVAILABLE)
+                    continue
+                selected_machine = machine
+                break
+            if not selected_machine:
+                raise exception.InvalidMinionPoolSelection(
+                    "There are no more available minion machines within minion"
+                    " pool with ID '%s' (excluding the following already "
+                    "allocated ones: %s)" % (minion_pool.id, exclude))
+            return selected_machine
+
+        instance_pool_allocations = {
+            instance: {} for instance in action.instances}
+        osmorphing_pool_map = (
+            action.instance_osmorphing_minion_pool_mappings)
+        with contextlib.ExitStack() as stack:
+            _ = [
+                stack.enter_context(
+                    lockutils.lock(
+                        constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
+                        external=True))
+                for pool_id in minion_pool_ids]
+
+            minion_pools = db_api.get_minion_pool_lifecycles(
+                ctxt, include_tasks_executions=False, include_info=False,
+                include_machines=True, to_dict=False)
+            minion_pool_id_mappings = {
+                pool.id: pool for pool in minion_pools}
+
+            missing_pools = [
+                pool_id for pool_id in minion_pool_ids
+                if pool_id not in minion_pool_id_mappings]
+            if missing_pools:
+                raise exception.InvalidMinionPoolSelection(
+                    "The following minion pools could not be found: %s" % (
+                        missing_pools))
+
+            unallocated_pools = {
+                pool.id: pool.pool_status for pool in minion_pools
+                if pool.pool_status != constants.MINION_POOL_STATUS_ALLOCATED}
+            if unallocated_pools:
+                raise exception.InvalidMinionPoolSelection(
+                    "The following minion pools have not had their machines "
+                    "allocated and thus cannot be used: %s" % (
+                        unallocated_pools))
+
+            allocated_source_machine_ids = set()
+            allocated_target_machine_ids = set()
+            allocated_osmorphing_machine_ids = set()
+            for instance in action.instances:
+
+                if include_transfer_minions:
+                    if action.origin_minion_pool_id:
+                        origin_pool = minion_pool_id_mappings[
+                            action.origin_minion_pool_id]
+                        machine = _select_machine(
+                            origin_pool, exclude=allocated_source_machine_ids)
+                        allocated_source_machine_ids.add(machine.id)
+                        instance_pool_allocations[
+                            instance]['source_minion'] = machine
+                        LOG.debug(
+                            "Selected minion machine '%s' for source-side "
+                            "syncing of instance '%s' as part of transfer "
+                            "action '%s'.", machine.id, instance, action.id)
+
+                    if action.destination_minion_pool_id:
+                        dest_pool = minion_pool_id_mappings[
+                            action.destination_minion_pool_id]
+                        machine = _select_machine(
+                            dest_pool, exclude=allocated_target_machine_ids)
+                        allocated_target_machine_ids.add(machine.id)
+                        instance_pool_allocations[
+                            instance]['target_minion'] = machine
+                        LOG.debug(
+                            "Selected minion machine '%s' for target-side "
+                            "syncing of instance '%s' as part of transfer "
+                            "action '%s'.", machine.id, instance, action.id)
+
+                if include_osmorphing_minions:
+                    if instance not in osmorphing_pool_map:
+                        LOG.debug(
+                            "Instance '%s' is not listed in the OSMorphing "
+                            "minion pool mappings. Skipping." % instance)
+                    else:
+                        osmorphing_pool_id = osmorphing_pool_map[instance]
+                        # if the selected target and OSMorphing pools
+                        # are the same, reuse the same worker:
+                        if osmorphing_pool_id == (
+                                action.destination_minion_pool_id):
+                            allocated_target_machine = (
+                                instance_pool_allocations[
+                                    instance].get('target_minion'))
+                            LOG.debug(
+                                "Reusing disk sync minion '%s' for the "
+                                "OSMorphing of instance '%s' as port of "
+                                "transfer action '%s'",
+                                allocated_target_machine.id, instance,
+                                action.id)
+                            instance_pool_allocations[
+                                instance]['osmorphing_minion'] = (
+                                    allocated_target_machine)
+                        # else, allocate a new minion from the selected pool:
+                        else:
+                            osmorphing_pool = minion_pool_id_mappings[
+                                osmorphing_pool_id]
+                            machine = _select_machine(
+                                osmorphing_pool,
+                                exclude=allocated_osmorphing_machine_ids)
+                            allocated_osmorphing_machine_ids.add(machine.id)
+                            instance_pool_allocations[
+                                instance]['osmorphing_minion'] = machine
+                            LOG.debug(
+                                "Selected minion machine '%s' for OSMorphing "
+                                " of instance '%s' as part of transfer "
+                                "action '%s'.",
+                                machine.id, instance, action.id)
+
+            # mark the selected machines as allocated:
+            all_machine_ids = set(itertools.chain(
+                allocated_source_machine_ids,
+                allocated_target_machine_ids,
+                allocated_osmorphing_machine_ids))
+            db_api.set_minion_machines_allocation_statuses(
+                ctxt, all_machine_ids, action.id,
+                constants.MINION_MACHINE_STATUS_ALLOCATED)
+
+        return instance_pool_allocations
+
+    def _deallocate_minion_machines_for_action(self, ctxt, action):
+        minion_pool_ids = set()
+        if action.origin_minion_pool_id:
+            minion_pool_ids.add(action.origin_minion_pool_id)
+        if action.destination_minion_pool_id:
+            minion_pool_ids.add(action.destination_minion_pool_id)
+        if action.instance_osmorphing_minion_pool_mappings:
+            minion_pool_ids = minion_pool_ids.union(set(
+                action.instance_osmorphing_minion_pool_mappings.values()))
+
+        if minion_pool_ids:
+            with contextlib.ExitStack() as stack:
+                _ = [
+                    stack.enter_context(
+                        lockutils.lock(
+                            constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
+                            external=True))
+                    for pool_id in minion_pool_ids]
+
+                minion_machines = db_api.get_minion_machines(
+                    ctxt, allocated_action_id=action.base_id)
+                machine_ids = [m.id for m in minion_machines]
+                if machine_ids:
+                    LOG.info(
+                        "Releasing the following minion machines for "
+                        "action '%s': %s", action.base_id, machine_ids)
+                    db_api.set_minion_machines_allocation_statuses(
+                        ctxt, machine_ids, None,
+                        constants.MINION_MACHINE_STATUS_AVAILABLE)
+
     def migrate_instances(self, ctxt, origin_endpoint_id,
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, origin_minion_pool_id,
                           destination_endpoint_id, origin_minion_pool_id,
                           destination_minion_pool_id,
                           destination_minion_pool_id,
@@ -1837,6 +2141,11 @@ class ConductorServerEndpoint(object):
         if execution.type in constants.MINION_POOL_EXECUTION_TYPES:
         if execution.type in constants.MINION_POOL_EXECUTION_TYPES:
             self._update_minion_pool_status_for_finished_execution(
             self._update_minion_pool_status_for_finished_execution(
                 ctxt, execution, execution_status)
                 ctxt, execution, execution_status)
+        else:
+            if execution_status in constants.FINALIZED_EXECUTION_STATUSES:
+                action = db_api.get_action(ctxt, execution.action_id)
+                self._deallocate_minion_machines_for_action(
+                    ctxt, action)
 
 
     @staticmethod
     @staticmethod
     def _update_minion_pool_status_for_finished_execution(
     def _update_minion_pool_status_for_finished_execution(
@@ -2516,6 +2825,55 @@ class ConductorServerEndpoint(object):
                 constants.TASK_TYPE_DELETE_MINION, task.instance)
                 constants.TASK_TYPE_DELETE_MINION, task.instance)
             db_api.delete_minion_machine(ctxt, task.instance)
             db_api.delete_minion_machine(ctxt, task.instance)
 
 
+        elif task_type in (
+                constants.TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION,
+                constants.TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION):
+
+            updated_values = {
+                "provider_properties": task_info[
+                    "source_minion_provider_properties"]}
+
+            LOG.debug(
+                "Updating minion provider properties of minion machine '%s' "
+                "following the completion of task '%s' (type '%s') to %s",
+                task_info['source_minion_machine_id'],
+                task.id, task_type, updated_values)
+            db_api.update_minion_machine(
+                ctxt, task_info['source_minion_machine_id'], updated_values)
+
+        elif task_type in (
+                constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION,
+                constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION):
+
+            updated_values = {
+                "provider_properties": task_info[
+                    "target_minion_provider_properties"]}
+
+            LOG.debug(
+                "Updating minion provider properties of minion machine '%s' "
+                "following the completion of task '%s' (type '%s') to %s",
+                task_info['target_minion_machine_id'],
+                task.id, task_type, updated_values)
+            db_api.update_minion_machine(
+                ctxt, task_info['target_minion_machine_id'], updated_values)
+
+        elif task_type in (
+                constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION,
+                constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION):
+
+            updated_values = {
+                "provider_properties": task_info[
+                    "osmorphing_minion_provider_properties"]}
+
+            LOG.debug(
+                "Updating minion provider properties of minion machine '%s' "
+                "following the completion of task '%s' (type '%s') to %s",
+                task_info['osmorphing_minion_machine_id'],
+                task.id, task_type, updated_values)
+            db_api.update_minion_machine(
+                ctxt, task_info['osmorphing_minion_machine_id'],
+                updated_values)
+
         else:
         else:
             LOG.debug(
             LOG.debug(
                 "No post-task actions required for task '%s' of type '%s'",
                 "No post-task actions required for task '%s' of type '%s'",

+ 10 - 0
coriolis/constants.py

@@ -146,12 +146,19 @@ TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION = (
     "ATTACH_VOLUMES_TO_DESTINATION_MINION")
     "ATTACH_VOLUMES_TO_DESTINATION_MINION")
 TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION = (
 TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION = (
     "DETACH_VOLUMES_FROM_DESTINATION_MINION")
     "DETACH_VOLUMES_FROM_DESTINATION_MINION")
+TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION = (
+    "ATTACH_VOLUMES_TO_OSMORPHING_MINION")
+TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION = (
+    "DETACH_VOLUMES_FROM_OSMORPHING_MINION")
 TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY = (
 TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY = (
     "VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY")
     "VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY")
 TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY = (
 TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY = (
     "VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY")
     "VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY")
 TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY = (
 TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY = (
     "VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY")
     "VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY")
+TASK_TYPE_RELEASE_SOURCE_MINION = "RELEASE_SOURCE_MINION"
+TASK_TYPE_RELEASE_DESTINATION_MINION = "RELEASE_DESTINATION_MINION"
+TASK_TYPE_RELEASE_OSMORPHING_MINION = "RELEASE_OSMORPHING_MINION"
 
 
 TASK_PLATFORM_SOURCE = "source"
 TASK_PLATFORM_SOURCE = "source"
 TASK_PLATFORM_DESTINATION = "destination"
 TASK_PLATFORM_DESTINATION = "destination"
@@ -286,6 +293,9 @@ WORKER_MAIN_MESSAGING_TOPIC = "coriolis_worker"
 SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler"
 SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler"
 REPLICA_CRON_MAIN_MESSAGING_TOPIC = "coriolis_replica_cron_worker"
 REPLICA_CRON_MAIN_MESSAGING_TOPIC = "coriolis_replica_cron_worker"
 
 
+MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE = "delete"
+MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF = "poweroff"
+
 MINION_POOL_STATUS_UNKNOWN = "UNKNOWN"
 MINION_POOL_STATUS_UNKNOWN = "UNKNOWN"
 MINION_POOL_STATUS_ERROR = "ERROR"
 MINION_POOL_STATUS_ERROR = "ERROR"
 MINION_POOL_STATUS_UNINITIALIZED = "UNINITIALIZED"
 MINION_POOL_STATUS_UNINITIALIZED = "UNINITIALIZED"

+ 24 - 2
coriolis/db/api.py

@@ -1102,8 +1102,11 @@ def add_minion_machine(context, minion_machine):
 
 
 
 
 @enginefacade.reader
 @enginefacade.reader
-def get_minion_machines(context):
+def get_minion_machines(context, allocated_action_id=None):
     q = _soft_delete_aware_query(context, models.MinionMachine)
     q = _soft_delete_aware_query(context, models.MinionMachine)
+    if allocated_action_id:
+        q = q.filter(
+            models.MinionMachine.allocated_action == allocated_action_id)
     return q.all()
     return q.all()
 
 
 
 
@@ -1126,11 +1129,30 @@ def update_minion_machine(context, minion_machine_id, updated_values):
 
 
     updateable_fields = [
     updateable_fields = [
         "connection_info", "provider_properties", "status",
         "connection_info", "provider_properties", "status",
-        "backup_writer_connection_info"]
+        "backup_writer_connection_info", "allocated_action"]
     _update_sqlalchemy_object_fields(
     _update_sqlalchemy_object_fields(
         minion_machine, updateable_fields, updated_values)
         minion_machine, updateable_fields, updated_values)
 
 
 
 
+@enginefacade.writer
+def set_minion_machines_allocation_statuses(
+        context, minion_machine_ids, action_id, allocation_status):
+    machines = get_minion_machines(context)
+    existing_machine_ids = [
+        machine.id for machine in machines]
+    missing = [
+        mid for mid in minion_machine_ids
+        if mid not in existing_machine_ids]
+    if missing:
+        raise exception.NotFound(
+            "The following minion machines could not be found: %s" % (
+                missing))
+
+    for machine in machines:
+        machine.allocated_action = action_id
+        machine.status = allocation_status
+
+
 @enginefacade.writer
 @enginefacade.writer
 def delete_minion_machine(context, minion_machine_id):
 def delete_minion_machine(context, minion_machine_id):
     minion_machine = get_minion_machine(context, minion_machine_id)
     minion_machine = get_minion_machine(context, minion_machine_id)

+ 2 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -82,6 +82,8 @@ def upgrade(migrate_engine):
                 'pool_id', sqlalchemy.String(36),
                 'pool_id', sqlalchemy.String(36),
                 sqlalchemy.ForeignKey('minion_pool_lifecycle.id'),
                 sqlalchemy.ForeignKey('minion_pool_lifecycle.id'),
                 nullable=False),
                 nullable=False),
+            sqlalchemy.Column(
+                'allocated_action', sqlalchemy.String(36), nullable=True),
             sqlalchemy.Column(
             sqlalchemy.Column(
                 'status', sqlalchemy.String(255), nullable=False,
                 'status', sqlalchemy.String(255), nullable=False,
                 default=lambda: "UNKNOWN"),
                 default=lambda: "UNKNOWN"),

+ 4 - 0
coriolis/db/sqlalchemy/models.py

@@ -467,6 +467,9 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
         sqlalchemy.String(255), nullable=False,
         sqlalchemy.String(255), nullable=False,
         default=lambda: constants.MINION_MACHINE_STATUS_UNKNOWN)
         default=lambda: constants.MINION_MACHINE_STATUS_UNKNOWN)
 
 
+    allocated_action = sqlalchemy.Column(
+        sqlalchemy.String(36), nullable=True)
+
     connection_info = sqlalchemy.Column(
     connection_info = sqlalchemy.Column(
         types.Json, nullable=True)
         types.Json, nullable=True)
 
 
@@ -488,6 +491,7 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
             "pool_id": self.pool_id,
             "pool_id": self.pool_id,
             "status": self.status,
             "status": self.status,
             "connection_info": self.connection_info,
             "connection_info": self.connection_info,
+            "allocated_action": self.allocated_action,
             "backup_writer_connection_info": (
             "backup_writer_connection_info": (
                 self.backup_writer_connection_info),
                 self.backup_writer_connection_info),
             "provider_properties": self.provider_properties
             "provider_properties": self.provider_properties

+ 4 - 0
coriolis/exception.py

@@ -153,6 +153,10 @@ class InvalidMinionPoolSelection(Invalid):
     message = _("The selected minion pool is incompatible.")
     message = _("The selected minion pool is incompatible.")
 
 
 
 
+class MinionMachineAllocationFailure(Invalid):
+    message = _("No minion machines were available for allocation")
+
+
 class InvalidCustomOSDetectTools(Invalid):
 class InvalidCustomOSDetectTools(Invalid):
     message = _("The provided custom OS detect tools are invalid.")
     message = _("The provided custom OS detect tools are invalid.")
 
 

+ 12 - 1
coriolis/providers/base.py

@@ -561,6 +561,17 @@ class BaseMinionPoolProvider(
         """
         """
         pass
         pass
 
 
+    @abc.abstractmethod
+    def validate_osmorphing_minion_compatibility_for_transfer(
+            self, ctxt, connection_info, export_info, environment_options,
+            minion_properties):
+        """ Validates compatibility between the OSMorphing pool's options and
+        the options selected for a given transfer. Should raise if any options
+        of the minions in the pool might be deemed incompatible with the
+        desired transfer options.
+        """
+        pass
+
     @abc.abstractmethod
     @abc.abstractmethod
     def validate_minion_pool_options(
     def validate_minion_pool_options(
             self, ctxt, connection_info, environment_options):
             self, ctxt, connection_info, environment_options):
@@ -585,7 +596,7 @@ class BaseMinionPoolProvider(
     @abc.abstractmethod
     @abc.abstractmethod
     def create_minion(
     def create_minion(
             self, ctxt, connection_info, environment_options,
             self, ctxt, connection_info, environment_options,
-            pool_identifier, pool_shared_resources,
+            pool_identifier, pool_os_type, pool_shared_resources,
             new_minion_identifier):
             new_minion_identifier):
         pass
         pass
 
 

+ 11 - 1
coriolis/tasks/factory.py

@@ -101,12 +101,22 @@ _TASKS_MAP = {
         minion_pool_tasks.AttachVolumesToDestinationMinionTask,
         minion_pool_tasks.AttachVolumesToDestinationMinionTask,
     constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION:
     constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION:
         minion_pool_tasks.DetachVolumesFromDestinationMinionTask,
         minion_pool_tasks.DetachVolumesFromDestinationMinionTask,
+    constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION:
+        minion_pool_tasks.AttachVolumesToOSMorphingMinionTask,
+    constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION:
+        minion_pool_tasks.DetachVolumesFromOSMorphingMinionTask,
     constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY:
     constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY:
         minion_pool_tasks.ValidateSourceMinionCompatibilityTask,
         minion_pool_tasks.ValidateSourceMinionCompatibilityTask,
     constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY:
     constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY:
         minion_pool_tasks.ValidateDestinationMinionCompatibilityTask,
         minion_pool_tasks.ValidateDestinationMinionCompatibilityTask,
     constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY:
     constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY:
-        minion_pool_tasks.ValidateOSMorphingMinionCompatibilityTask
+        minion_pool_tasks.ValidateOSMorphingMinionCompatibilityTask,
+    constants.TASK_TYPE_RELEASE_SOURCE_MINION:
+        minion_pool_tasks.ReleaseSourceMinionTask,
+    constants.TASK_TYPE_RELEASE_DESTINATION_MINION:
+        minion_pool_tasks.ReleaseDestinationMinionTask,
+    constants.TASK_TYPE_RELEASE_OSMORPHING_MINION:
+        minion_pool_tasks.ReleaseOSMorphingMinionTask
 }
 }
 
 
 
 

+ 220 - 15
coriolis/tasks/minion_pool_tasks.py

@@ -1,4 +1,4 @@
-# Copyright 2016 Cloudbase Solutions Srl
+# Copyright 2020 Cloudbase Solutions Srl
 # All Rights Reserved.
 # All Rights Reserved.
 
 
 from oslo_log import log as logging
 from oslo_log import log as logging
@@ -12,6 +12,22 @@ from coriolis.tasks import base
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
 
 
 
 
+SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS = {
+    # NOTE: these redundancies are in place so as to have the
+    # 'Release*' task classes clear these fields after they run:
+    "source_minion_machine_id": "source_minion_machine_id",
+    "source_minion_provider_properties": "source_resources",
+    "source_minion_connection_info": "source_resources_connection_info"}
+TARGET_MINION_TASK_INFO_FIELD_MAPPINGS = {
+    "target_minion_machine_id": "target_minion_machine_id",
+    "target_minion_provider_properties": "target_resources",
+    "target_minion_backup_writer_connection_info": (
+        "target_resources_connection_info")}
+OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS = {
+    "osmorphing_minion_machine_id": "osmorphing_minion_machine_id",
+    "osmorphing_minion_provider_properties": "os_morphing_resources",
+    "osmorphing_minion_connection_info": "osmorphing_connection_info"}
+
 
 
 class ValidateMinionPoolOptionsTask(base.TaskRunner):
 class ValidateMinionPoolOptionsTask(base.TaskRunner):
 
 
@@ -72,7 +88,7 @@ class CreateMinionTask(base.TaskRunner):
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
         return [
         return [
             "pool_environment_options", "pool_shared_resources",
             "pool_environment_options", "pool_shared_resources",
-            "pool_identifier"]
+            "pool_identifier", "pool_os_type"]
 
 
     @classmethod
     @classmethod
     def get_returned_task_info_properties(cls):
     def get_returned_task_info_properties(cls):
@@ -103,9 +119,10 @@ class CreateMinionTask(base.TaskRunner):
         pool_identifier = task_info['pool_identifier']
         pool_identifier = task_info['pool_identifier']
         environment_options = task_info['pool_environment_options']
         environment_options = task_info['pool_environment_options']
         pool_shared_resources = task_info['pool_shared_resources']
         pool_shared_resources = task_info['pool_shared_resources']
+        pool_os_type = task_info["pool_os_type"]
         minion_properties = provider.create_minion(
         minion_properties = provider.create_minion(
             ctxt, connection_info, environment_options, pool_identifier,
             ctxt, connection_info, environment_options, pool_identifier,
-            pool_shared_resources, minion_pool_machine_id)
+            pool_os_type, pool_shared_resources, minion_pool_machine_id)
 
 
         missing = [
         missing = [
             key for key in [
             key for key in [
@@ -278,6 +295,10 @@ class TearDownPoolSupportingResourcesTask(base.TaskRunner):
 
 
 
 
 class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
 class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
+    """ The purposes of the volume attachment tasks are to:
+    1) attach the volumes of the minions
+    2) return any updated properties for the minions if needed
+    """
 
 
     @classmethod
     @classmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
@@ -286,11 +307,16 @@ class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
-        return ["volumes_info", cls._get_minion_properties_task_info_field()]
+        fields = list(cls._get_minion_task_info_field_mappings().keys())
+        fields.append("volumes_info")
+        return fields
 
 
     @classmethod
     @classmethod
     def get_returned_task_info_properties(cls):
     def get_returned_task_info_properties(cls):
-        return ["volumes_info", cls._get_minion_properties_task_info_field()]
+        fields = list(cls._get_minion_task_info_field_mappings().values())
+        fields.append(cls._get_minion_properties_task_info_field())
+        fields.append("volumes_info")
+        return fields
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
@@ -307,6 +333,11 @@ class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
         raise NotImplementedError(
         raise NotImplementedError(
             "No minion disk attachment provider operation specified.")
             "No minion disk attachment provider operation specified.")
 
 
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        raise NotImplementedError(
+            "No minion task info field mappings provided.")
+
     def _run(self, ctxt, instance, origin, destination,
     def _run(self, ctxt, instance, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
 
 
@@ -342,10 +373,21 @@ class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
                     self._get_provider_disk_operation.__name__,
                     self._get_provider_disk_operation.__name__,
                     platform_to_target))
                     platform_to_target))
 
 
-        return {
+        field_name_map = self._get_minion_task_info_field_mappings()
+        result = {
             "volumes_info": res['volumes_info'],
             "volumes_info": res['volumes_info'],
             self._get_minion_properties_task_info_field(): res[
             self._get_minion_properties_task_info_field(): res[
-                "minion_properties"]}
+                "minion_properties"],
+            field_name_map[
+                self._get_minion_properties_task_info_field()]: res[
+                    "minion_properties"]}
+
+        result.update({
+            field_name_map[field]: task_info[field]
+            for field in field_name_map
+            if field_name_map[field] not in result})
+
+        return result
 
 
 
 
 class AttachVolumesToSourceMinionTask(_BaseVolumesMinionMachineAttachmentTask):
 class AttachVolumesToSourceMinionTask(_BaseVolumesMinionMachineAttachmentTask):
@@ -358,6 +400,10 @@ class AttachVolumesToSourceMinionTask(_BaseVolumesMinionMachineAttachmentTask):
     def _get_minion_properties_task_info_field(cls):
     def _get_minion_properties_task_info_field(cls):
         return "source_minion_provider_properties"
         return "source_minion_provider_properties"
 
 
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS
+
     @classmethod
     @classmethod
     def _get_provider_disk_operation(cls, provider):
     def _get_provider_disk_operation(cls, provider):
         return provider.attach_volumes_to_minion
         return provider.attach_volumes_to_minion
@@ -378,12 +424,16 @@ class AttachVolumesToDestinationMinionTask(_BaseVolumesMinionMachineAttachmentTa
 
 
     @classmethod
     @classmethod
     def _get_minion_properties_task_info_field(cls):
     def _get_minion_properties_task_info_field(cls):
-        return "destination_minion_provider_properties"
+        return "target_minion_provider_properties"
 
 
     @classmethod
     @classmethod
     def _get_provider_disk_operation(cls, provider):
     def _get_provider_disk_operation(cls, provider):
         return provider.attach_volumes_to_minion
         return provider.attach_volumes_to_minion
 
 
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return TARGET_MINION_TASK_INFO_FIELD_MAPPINGS
+
 
 
 class DetachVolumesFromDestinationMinionTask(AttachVolumesToDestinationMinionTask):
 class DetachVolumesFromDestinationMinionTask(AttachVolumesToDestinationMinionTask):
 
 
@@ -392,7 +442,49 @@ class DetachVolumesFromDestinationMinionTask(AttachVolumesToDestinationMinionTas
         return provider.detach_volumes_from_minion
         return provider.detach_volumes_from_minion
 
 
 
 
+class AttachVolumesToOSMorphingMinionTask(
+        _BaseVolumesMinionMachineAttachmentTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        return "osmorphing_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.attach_volumes_to_minion
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS
+
+    @classmethod
+    def _clear_mapped_minion_task_info_field(cls):
+        return False
+
+
+class DetachVolumesFromOSMorphingMinionTask(
+        AttachVolumesToOSMorphingMinionTask):
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.detach_volumes_from_minion
+
+    @classmethod
+    def _clear_mapped_minion_task_info_field(cls):
+        return True
+
+
 class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
 class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
+    """ The purposes of the minion validation tasks are to:
+    1) run the afferent validation method on the provider
+    2) "translate" the fields related to the minion into fields
+    which are to be consumed by the other tasks
+    (e.g. REPLICATE_DISKS or OS_MORPHING)
+    """
 
 
     @classmethod
     @classmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
@@ -401,20 +493,29 @@ class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
-        return [
+        base_props = set([
             "export_info",
             "export_info",
             cls._get_transfer_properties_task_info_field(),
             cls._get_transfer_properties_task_info_field(),
-            cls._get_minion_properties_task_info_field()]
+            cls._get_minion_properties_task_info_field()])
+        base_props.union(set(
+            cls._get_minion_task_info_field_mappings().keys()))
+        return list(base_props)
 
 
     @classmethod
     @classmethod
     def get_returned_task_info_properties(cls):
     def get_returned_task_info_properties(cls):
-        return []
+        return list(
+            cls._get_minion_task_info_field_mappings().values())
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
         return {
         return {
             cls.get_required_platform(): [constants.PROVIDER_TYPE_MINION_POOL]}
             cls.get_required_platform(): [constants.PROVIDER_TYPE_MINION_POOL]}
 
 
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        raise NotImplementedError(
+            "No minion pool provider validation method was specified.")
+
     @classmethod
     @classmethod
     def _get_transfer_properties_task_info_field(cls):
     def _get_transfer_properties_task_info_field(cls):
         platform = cls.get_required_platform()
         platform = cls.get_required_platform()
@@ -431,6 +532,11 @@ class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
         raise NotImplementedError(
         raise NotImplementedError(
             "No minion validation task info field specified.")
             "No minion validation task info field specified.")
 
 
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        raise NotImplementedError(
+            "No minion task info field mappings provided.")
+
     def _run(self, ctxt, instance, origin, destination,
     def _run(self, ctxt, instance, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
 
 
@@ -453,13 +559,17 @@ class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
         export_info = task_info["export_info"]
         export_info = task_info["export_info"]
         minion_properties = task_info[
         minion_properties = task_info[
             self._get_minion_properties_task_info_field()]
             self._get_minion_properties_task_info_field()]
-        transfer_properties = [
+        transfer_properties = task_info[
             self._get_transfer_properties_task_info_field()]
             self._get_transfer_properties_task_info_field()]
-        provider.validate_minion_compatibility_for_transfer(
+        validation_op = self._get_provider_pool_validation_operation(provider)
+        validation_op(
             ctxt, connection_info, export_info, transfer_properties,
             ctxt, connection_info, export_info, transfer_properties,
             minion_properties)
             minion_properties)
 
 
-        return {}
+        field_mappings = self._get_minion_task_info_field_mappings()
+        return {
+            field_mappings[field]: task_info[field]
+            for field in field_mappings}
 
 
 
 
 class ValidateSourceMinionCompatibilityTask(
 class ValidateSourceMinionCompatibilityTask(
@@ -473,6 +583,14 @@ class ValidateSourceMinionCompatibilityTask(
     def _get_minion_properties_task_info_field(cls):
     def _get_minion_properties_task_info_field(cls):
         return "source_minion_provider_properties"
         return "source_minion_provider_properties"
 
 
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        return provider.validate_minion_compatibility_for_transfer
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS
+
 
 
 class ValidateDestinationMinionCompatibilityTask(
 class ValidateDestinationMinionCompatibilityTask(
         _BaseValidateMinionCompatibilityTask):
         _BaseValidateMinionCompatibilityTask):
@@ -483,7 +601,15 @@ class ValidateDestinationMinionCompatibilityTask(
 
 
     @classmethod
     @classmethod
     def _get_minion_properties_task_info_field(cls):
     def _get_minion_properties_task_info_field(cls):
-        return "destination_minion_provider_properties"
+        return "target_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        return provider.validate_minion_compatibility_for_transfer
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return TARGET_MINION_TASK_INFO_FIELD_MAPPINGS
 
 
 
 
 class ValidateOSMorphingMinionCompatibilityTask(
 class ValidateOSMorphingMinionCompatibilityTask(
@@ -496,3 +622,82 @@ class ValidateOSMorphingMinionCompatibilityTask(
     @classmethod
     @classmethod
     def _get_minion_properties_task_info_field(cls):
     def _get_minion_properties_task_info_field(cls):
         return "osmorphing_minion_provider_properties"
         return "osmorphing_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        return provider.validate_osmorphing_minion_compatibility_for_transfer
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return  OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class _BaseReleaseMinionTask(base.TaskRunner):
+    """ The purpose of releasal tasks is to clear (set to None) all of the
+    fields afferent to the minion for the respective task type.
+    """
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion releasing platform specified")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        prop_mappings = cls._get_minion_task_info_field_mappings()
+        return list(
+            set(prop_mappings.keys()).union(
+                prop_mappings.values()))
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return cls.get_required_task_info_properties()
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            cls.get_required_platform(): [constants.PROVIDER_TYPE_MINION_POOL]}
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        raise NotImplementedError(
+            "No minion task info field mappings provided.")
+
+    def _run(self, ctxt, instance, origin, destination,
+             task_info, event_handler):
+        return {
+            field: None
+            for field in self.get_returned_task_info_properties()}
+
+
+class ReleaseSourceMinionTask(_BaseReleaseMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class ReleaseDestinationMinionTask(_BaseReleaseMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return TARGET_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class ReleaseOSMorphingMinionTask(_BaseReleaseMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS