Przeglądaj źródła

Add automatic upscaling on minion machine allocation requests.

Nashwan Azhari 5 lat temu
rodzic
commit
8085463820

+ 10 - 1
coriolis/conductor/rpc/server.py

@@ -1510,8 +1510,11 @@ class ConductorServerEndpoint(object):
             ctxt, action.base_id)
 
     def _check_minion_pools_for_action(self, ctxt, action):
-        return self._minion_manager_client.validate_minion_pool_selections_for_action(
+        self._minion_manager_client.validate_minion_pool_selections_for_action(
             ctxt, action)
+        LOG.debug(
+            "Successfully checked minion pool selection for action '%s'",
+            action.base_id)
 
     def _update_task_info_for_minion_allocations(
             self, ctxt, action, minion_machine_allocations):
@@ -1633,6 +1636,9 @@ class ConductorServerEndpoint(object):
             minion_allocation_error_details)
         self._cancel_tasks_execution(
             ctxt, last_replica_execution, requery=True)
+        self._set_tasks_execution_status(
+            ctxt, last_replica_execution.id,
+            constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS)
 
     @migration_synchronized
     def confirm_migration_minions_allocation(
@@ -1673,6 +1679,9 @@ class ConductorServerEndpoint(object):
             migration_id, execution.id, minion_allocation_error_details)
         self._cancel_tasks_execution(
             ctxt, execution, requery=True)
+        self._set_tasks_execution_status(
+            ctxt, execution.id,
+            constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS)
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, origin_minion_pool_id,

+ 4 - 1
coriolis/constants.py

@@ -270,6 +270,9 @@ VALID_COMPRESSION_FORMATS = [
     COMPRESSION_FORMAT_ZLIB
 ]
 
+TRANSFER_ACTION_TYPE_MIGRATION = "migration"
+TRANSFER_ACTION_TYPE_REPLICA = "replica"
+
 EXECUTION_TYPE_REPLICA_EXECUTION = "replica_execution"
 EXECUTION_TYPE_REPLICA_DISKS_DELETE = "replica_disks_delete"
 EXECUTION_TYPE_REPLICA_DEPLOY = "replica_deploy"
@@ -355,7 +358,7 @@ ACTIVE_MINION_POOL_STATUSES = [
     MINION_POOL_STATUS_DEALLOCATING_SHARED_RESOURCES]
 
 MINION_MACHINE_IDENTIFIER_FORMAT = "coriolis-pool-%(pool_id)s-minion-%(minion_id)s"
-MINION_MACHINE_STATUS_UNKNOWN = "UNKNOWN"
+MINION_MACHINE_STATUS_UNINITIALIZED = "UNINITIALIZED"
 MINION_MACHINE_STATUS_DEPLOYING = "DEPLOYING"
 MINION_MACHINE_STATUS_ERROR = "ERROR"
 MINION_MACHINE_STATUS_ERROR_DEPLOYING = "ERROR_DEPLOYING"

+ 4 - 0
coriolis/exception.py

@@ -153,6 +153,10 @@ class InvalidMinionPoolSelection(Invalid):
     message = _("The selected minion pool is incompatible.")
 
 
+class InvalidMinionMachineState(Invalid):
+    message = _("The selected minion machine is in an invalid state.")
+
+
 class MinionMachineAllocationFailure(Invalid):
     message = _("No minion machines were available for allocation")
 

+ 729 - 254
coriolis/minion_manager/rpc/server.py

@@ -5,7 +5,6 @@ import contextlib
 import itertools
 import uuid
 
-from oslo_concurrency import lockutils
 from oslo_config import cfg
 from oslo_log import log as logging
 from taskflow.patterns import graph_flow
@@ -43,6 +42,11 @@ class MinionManagerServerEndpoint(object):
             constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC,
             max_workers=25)
 
+    # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated
+    # oslo_messaging clients as the underlying eventlet thread queues will
+    # be invalidated. Considering this class both serves from a "main
+    # process" as well as forking child processes, it is safest to
+    # re-instantiate the clients every time:
     @property
     def _rpc_worker_client(self):
         return rpc_worker_client.WorkerClient()
@@ -150,24 +154,30 @@ class MinionManagerServerEndpoint(object):
     def get_minion_pool_progress_step(self, ctxt, minion_pool_id):
         return db_api.get_minion_pool_progress_step(ctxt, minion_pool_id)
 
-    def validate_minion_pool_selections_for_action(self, ctxt, action):
-        """ Validates the minion pool selections for a given action. """
+    def _check_keys_for_action_dict(
+            self, action, required_action_properties, operation=None):
         if not isinstance(action, dict):
             raise exception.InvalidInput(
                 "Action must be a dict, got '%s': %s" % (
                     type(action), action))
-        required_action_properties = [
-            'id', 'origin_endpoint_id', 'destination_endpoint_id',
-            'origin_minion_pool_id', 'destination_minion_pool_id',
-            'instance_osmorphing_minion_pool_mappings', 'instances']
         missing = [
             prop for prop in required_action_properties
             if prop not in action]
         if missing:
             raise exception.InvalidInput(
                 "Missing the following required action properties for "
-                "minion pool selection validation: %s. Got %s" % (
-                    missing, action))
+                "%s: %s. Got %s" % (
+                    operation, missing, action))
+
+    def validate_minion_pool_selections_for_action(self, ctxt, action):
+        """ Validates the minion pool selections for a given action. """
+        required_action_properties = [
+            'id', 'origin_endpoint_id', 'destination_endpoint_id',
+            'origin_minion_pool_id', 'destination_minion_pool_id',
+            'instance_osmorphing_minion_pool_mappings', 'instances']
+        self._check_keys_for_action_dict(
+            action, required_action_properties,
+            operation="minion pool selection validation")
 
         minion_pools = {
             pool.id: pool
@@ -183,15 +193,24 @@ class MinionManagerServerEndpoint(object):
                     "Could not find minion pool with ID '%s'." % pool_id)
             return pool
         def _check_pool_minion_count(
-                minion_pool, instances, minion_pool_type=None):
+                minion_pool, instances, minion_pool_type=""):
             desired_minion_count = len(instances)
+            if minion_pool.status != constants.MINION_POOL_STATUS_ALLOCATED:
+                raise exception.InvalidMinionPoolState(
+                    "Minion Pool '%s' is an invalid state ('%s') to be "
+                    "used as a %s pool for action '%s'. The pool must be "
+                    "in '%s' status."  % (
+                        minion_pool.id, minion_pool.status,
+                        minion_pool_type.lower(), action['id'],
+                        constants.MINION_POOL_STATUS_ALLOCATED))
             if desired_minion_count > minion_pool.maximum_minions:
                 msg = (
                     "Minion Pool with ID '%s' has a lower maximum minion "
                     "count (%d) than the requested number of minions "
-                    "(%d) to handle all of the instances: %s" % (
+                    "(%d) to handle all of the instances of action '%s': "
+                    "%s" % (
                         minion_pool.id, minion_pool.maximum_minions,
-                        desired_minion_count, instances))
+                        desired_minion_count, action['id'], instances))
                 if minion_pool_type:
                     msg = "%s %s" % (minion_pool_type, msg)
                 raise exception.InvalidMinionPoolSelection(msg)
@@ -230,7 +249,7 @@ class MinionManagerServerEndpoint(object):
                 "'%s' for use with action '%s'." % (
                     action['origin_minion_pool_id'], action['id']))
 
-        # check target pool:
+        # check destination pool:
         if action['destination_minion_pool_id']:
             destination_pool = _get_pool(action['destination_minion_pool_id'])
             if destination_pool.endpoint_id != (
@@ -313,103 +332,92 @@ class MinionManagerServerEndpoint(object):
                     "minion pool '%s' for use as OSMorphing minion for "
                     "instances %s during action '%s'." % (
                         pool_id, instances_to_osmorph, action['id']))
+        LOG.debug(
+            "Successfully validated minion pool selections for action '%s' "
+            "with properties: %s", action['id'], action)
 
     def allocate_minion_machines_for_replica(
             self, ctxt, replica):
-        minion_allocations = self._allocate_minion_machines_for_action(
-            ctxt, replica, include_transfer_minions=True,
-            include_osmorphing_minions=False)
         try:
-            self._conductor_client.confirm_replica_minions_allocation(
-                ctxt, replica['id'], minion_allocations)
+            minion_allocations = self._run_machine_allocation_subflow_for_action(
+                ctxt, replica, constants.TRANSFER_ACTION_TYPE_REPLICA,
+                include_transfer_minions=True,
+                include_osmorphing_minions=False)
         except Exception as ex:
             LOG.warn(
                 "Error occured while reporting minion pool allocations for "
                 "Replica with ID '%s'. Removing all allocations. "
                 "Error was: %s" % (
                     replica['id'], utils.get_exception_details()))
+            self._cleanup_machines_with_statuses_for_action(
+                ctxt, replica['id'],
+                [constants.MINION_MACHINE_STATUS_UNINITIALIZED])
             self.deallocate_minion_machines_for_action(
                 ctxt, replica['id'])
+            self._conductor_client.report_replica_minions_allocation_error(
+                ctxt, replica['id'], str(ex))
             raise
 
     def allocate_minion_machines_for_migration(
             self, ctxt, migration, include_transfer_minions=True,
             include_osmorphing_minions=True):
-        minion_allocations = self._allocate_minion_machines_for_action(
-            ctxt, migration, include_transfer_minions=include_transfer_minions,
-            include_osmorphing_minions=include_osmorphing_minions)
         try:
-            self._conductor_client.confirm_migration_minions_allocation(
-                ctxt, migration['id'], minion_allocations)
+            self._run_machine_allocation_subflow_for_action(
+                ctxt, migration,
+                constants.TRANSFER_ACTION_TYPE_MIGRATION,
+                include_transfer_minions=include_transfer_minions,
+                include_osmorphing_minions=include_osmorphing_minions)
         except Exception as ex:
             LOG.warn(
                 "Error occured while reporting minion pool allocations for "
                 "Migration with ID '%s'. Removing all allocations. "
                 "Error was: %s" % (
                     migration['id'], utils.get_exception_details()))
+            self._cleanup_machines_with_statuses_for_action(
+                ctxt, migration['id'],
+                [constants.MINION_MACHINE_STATUS_UNINITIALIZED])
             self.deallocate_minion_machines_for_action(
                 ctxt, migration['id'])
+            self._conductor_client.report_migration_minions_allocation_error(
+                ctxt, migration['id'], str(ex))
             raise
 
-    def _allocate_minion_machines_for_action(
-            self, ctxt, action, include_transfer_minions=True,
-            include_osmorphing_minions=True):
-        """ Returns a dict of the form:
-        {
-            "instance_id": {
-                "source_minion": <source minion properties>,
-                "target_minion": <target minion properties>,
-                "osmorphing_minion": <osmorphing minion properties>
-            }
-        }
-        """
-        if not isinstance(action, dict):
-            raise exception.InvalidInput(
-                "Action must be a dict, got '%s': %s" % (
-                    type(action), action))
-        required_action_properties = [
-            'id', 'instances', 'origin_minion_pool_id',
-            'destination_minion_pool_id',
-            'instance_osmorphing_minion_pool_mappings']
-        missing = [
-            prop for prop in required_action_properties
-            if prop not in action]
-        if missing:
-            raise exception.InvalidInput(
-                "Missing the following required action properties for "
-                "minion pool machine allocation: %s. Got %s" % (
-                    missing, action))
+    def _make_minion_machine_allocation_subflow_for_action(
+            self, ctxt, minion_pool, action_id, action_instances,
+            subflow_name, inject_for_tasks=None):
+        """ Creates a subflow for allocating minion machines from the
+        provided minion pool to the given action (one for each instance)
 
-        instance_machine_allocations = {
-            instance: {} for instance in action['instances']}
-
-        minion_pool_ids = set()
-        if action['origin_minion_pool_id']:
-            minion_pool_ids.add(action['origin_minion_pool_id'])
-        if action['destination_minion_pool_id']:
-            minion_pool_ids.add(action['destination_minion_pool_id'])
-        if action['instance_osmorphing_minion_pool_mappings']:
-            minion_pool_ids = minion_pool_ids.union(set(
-                action['instance_osmorphing_minion_pool_mappings'].values()))
-        if None in minion_pool_ids:
-            minion_pool_ids.remove(None)
-
-        if not minion_pool_ids:
-            LOG.debug(
-                "No minion pool settings found for action '%s'. "
-                "Skipping minion machine allocations." % (
-                    action['id']))
-            return instance_machine_allocations
+        Returns a mapping between the action's instaces' IDs and the minion
+        machine ID, as well as the subflow to execute for said machines.
 
-        LOG.debug(
-            "All minion pool selections for action '%s': %s",
-            action['id'], minion_pool_ids)
+        Returns dict of the form: {
+            "flow": TheFlowClass(),
+            "action_instance_minion_allocation_mappings": {
+                "<action_instance_id>": "<allocated_minion_id>"}}
+        """
+        currently_available_machines = [
+            machine for machine in minion_pool.minion_machines
+            if machine.status == constants.MINION_MACHINE_STATUS_AVAILABLE]
+        extra_available_machine_slots = (
+            minion_pool.maximum_minions - len(minion_pool.minion_machines))
+        num_instances = len(action_instances)
+        num_currently_available_machines = len(currently_available_machines)
+        if num_instances > (len(currently_available_machines) + (
+                                extra_available_machine_slots)):
+            raise exception.InvalidMinionPoolState(
+                "Minion pool '%s' is unable to accommodate the requested "
+                "number of machines (%s) for transfer action '%s', as it only "
+                "has %d currently available machines, with room to upscale a "
+                "further %d until the maximum is reached. Please either "
+                "increase the number of maximum machines for the pool "
+                "or wait for other minions to become available before "
+                "retrying." % (
+                    minion_pool.id, num_instances, action_id,
+                    num_currently_available_machines,
+                    extra_available_machine_slots))
 
         def _select_machine(minion_pool, exclude=None):
-            if not minion_pool.minion_machines:
-                raise exception.InvalidMinionPoolSelection(
-                    "Minion pool with ID '%s' has no machines defined." % (
-                        minion_pool.id))
             selected_machine = None
             for machine in minion_pool.minion_machines:
                 if exclude and machine.id in exclude:
@@ -420,160 +428,390 @@ class MinionManagerServerEndpoint(object):
                 if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
                     LOG.debug(
                         "Minion machine with ID '%s' is in status '%s' "
-                        "instead of '%s'. Skipping.", machine.id,
-                        machine.status,
+                        "instead of '%s'. Skipping.",
+                        machine.id, machine.status,
                         constants.MINION_MACHINE_STATUS_AVAILABLE)
                     continue
                 selected_machine = machine
                 break
-            if not selected_machine:
-                raise exception.InvalidMinionPoolSelection(
-                    "There are no more available minion machines within minion"
-                    " pool with ID '%s' (excluding the following ones already "
-                    "planned for this transfer: %s). Please ensure that the "
-                    "minion pool has enough minion machines allocated and "
-                    "available (i.e. not being used for other operations) "
-                    "to satisfy the number of VMs required by the Migration or"
-                    " Replica." % (
-                        minion_pool.id, exclude))
             return selected_machine
 
-        osmorphing_pool_map = (
-            action['instance_osmorphing_minion_pool_mappings'])
-        with contextlib.ExitStack() as stack:
-            _ = [
-                stack.enter_context(
-                    lockutils.lock(
-                        constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
-                        external=True))
-                for pool_id in minion_pool_ids]
-
-            minion_pools = db_api.get_minion_pools(
-                ctxt, include_machines=True, to_dict=False)
-            minion_pool_id_mappings = {
-                pool.id: pool for pool in minion_pools
-                if pool.id in minion_pool_ids}
-
-            missing_pools = [
-                pool_id for pool_id in minion_pool_ids
-                if pool_id not in minion_pool_id_mappings]
-            if missing_pools:
-                raise exception.InvalidMinionPoolSelection(
-                    "The following minion pools could not be found: %s" % (
-                        missing_pools))
-
-            unallocated_pools = {
-                pool_id: pool.status
-                for (pool_id, pool) in minion_pool_id_mappings.items()
-                if pool.status != constants.MINION_POOL_STATUS_ALLOCATED}
-            if unallocated_pools:
-                raise exception.InvalidMinionPoolSelection(
-                    "The following minion pools have not had their machines "
-                    "allocated and thus cannot be used: %s" % (
-                        unallocated_pools))
-
-            allocated_source_machine_ids = set()
-            allocated_target_machine_ids = set()
-            allocated_osmorphing_machine_ids = set()
-            for instance in action['instances']:
-
-                if include_transfer_minions:
-                    if action['origin_minion_pool_id']:
-                        origin_pool = minion_pool_id_mappings[
-                            action['origin_minion_pool_id']]
-                        machine = _select_machine(
-                            origin_pool, exclude=allocated_source_machine_ids)
-                        allocated_source_machine_ids.add(machine.id)
-                        instance_machine_allocations[
-                            instance]['source_minion'] = machine
-                        LOG.debug(
-                            "Selected minion machine '%s' for source-side "
-                            "syncing of instance '%s' as part of transfer "
-                            "action '%s'.", machine.id, instance, action['id'])
-
-                    if action['destination_minion_pool_id']:
-                        dest_pool = minion_pool_id_mappings[
-                            action['destination_minion_pool_id']]
-                        machine = _select_machine(
-                            dest_pool, exclude=allocated_target_machine_ids)
-                        allocated_target_machine_ids.add(machine.id)
-                        instance_machine_allocations[
-                            instance]['target_minion'] = machine
-                        LOG.debug(
-                            "Selected minion machine '%s' for target-side "
-                            "syncing of instance '%s' as part of transfer "
-                            "action '%s'.", machine.id, instance, action['id'])
-
-                if include_osmorphing_minions:
-                    if instance not in osmorphing_pool_map:
-                        LOG.debug(
-                            "Instance '%s' is not listed in the OSMorphing "
-                            "minion pool mappings for action '%s'." % (
-                                instance, action['id']))
-                    elif osmorphing_pool_map[instance] is None:
-                        LOG.debug(
-                            "OSMorphing pool ID for instance '%s' is "
-                            "None in action '%s'. Ignoring." % (
-                                instance, action['id']))
-                    else:
-                        osmorphing_pool_id = osmorphing_pool_map[instance]
-                        # if the selected target and OSMorphing pools
-                        # are the same, reuse the same worker:
-                        ima = instance_machine_allocations[instance]
-                        if osmorphing_pool_id == (
-                                action['destination_minion_pool_id']) and (
-                                    'target_minion' in ima):
-                            allocated_target_machine = ima[
-                                'target_minion']
-                            LOG.debug(
-                                "Reusing disk sync minion '%s' for the "
-                                "OSMorphing of instance '%s' as part of "
-                                "transfer action '%s'",
-                                allocated_target_machine.id, instance,
-                                action['id'])
-                            instance_machine_allocations[
-                                instance]['osmorphing_minion'] = (
-                                    allocated_target_machine)
-                        # else, allocate a new minion from the selected pool:
-                        else:
-                            osmorphing_pool = minion_pool_id_mappings[
-                                osmorphing_pool_id]
-                            machine = _select_machine(
-                                osmorphing_pool,
-                                exclude=allocated_osmorphing_machine_ids)
-                            allocated_osmorphing_machine_ids.add(machine.id)
-                            instance_machine_allocations[
-                                instance]['osmorphing_minion'] = machine
-                            LOG.debug(
-                                "Selected minion machine '%s' for OSMorphing "
-                                " of instance '%s' as part of transfer "
-                                "action '%s'.",
-                                machine.id, instance, action['id'])
-
-            # mark the selected machines as allocated:
-            all_machine_ids = set(itertools.chain(
-                allocated_source_machine_ids,
-                allocated_target_machine_ids,
-                allocated_osmorphing_machine_ids))
+        allocation_subflow = unordered_flow.Flow(subflow_name)
+        instance_minion_allocations = {}
+        machine_db_entries_to_add = []
+        existing_machines_to_allocate = {}
+        for instance in action_instances:
+
+            if instance in instance_minion_allocations:
+                raise exception.InvalidInput(
+                    "Instance with identifier '%s' passed twice for "
+                    "minion machine allocation from pool '%s' for action "
+                    "'%s'. Full instances list was: %s" % (
+                        instance, minion_pool.id, action_id, action_instances))
+            minion_machine = _select_machine(
+                minion_pool, exclude=instance_minion_allocations.values())
+            if minion_machine:
+                # take note of the machine and setup a healthcheck:
+                instance_minion_allocations[instance] = minion_machine.id
+                existing_machines_to_allocate[minion_machine.id] = instance
+                allocation_subflow.add(
+                    minion_manager_tasks.HealthcheckMinionMachineTask(
+                        minion_pool.id, minion_machine.id,
+                        minion_pool.platform, fail_on_error=True,
+                        inject=inject_for_tasks))
+            else:
+                # add task which creates the new machine:
+                new_machine_id = str(uuid.uuid4())
+                LOG.debug(
+                    "New minion machine with ID '%s' will be created for "
+                    "minion pool '%s' for use with action '%s'.",
+                    new_machine_id, minion_pool.id, action_id)
+
+                new_minion_machine = models.MinionMachine()
+                new_minion_machine.id = new_machine_id
+                new_minion_machine.pool_id = minion_pool.id
+                new_minion_machine.status = (
+                    constants.MINION_MACHINE_STATUS_UNINITIALIZED)
+                new_minion_machine.allocated_action = action_id
+                machine_db_entries_to_add.append(new_minion_machine)
+
+                instance_minion_allocations[instance] = new_machine_id
+                allocation_subflow.add(
+                    minion_manager_tasks.AllocateMinionMachineTask(
+                        minion_pool.id, new_machine_id, minion_pool.platform,
+                        allocate_to_action=action_id,
+                        raise_on_cleanup_failure=False,
+                        inject=inject_for_tasks))
+
+        new_machine_db_entries_added = []
+        try:
+            # mark any existing machines as allocated:
+            LOG.debug(
+                "Marking the following pre-existing minion machines "
+                "from pool '%s' of action '%s' for each instance as "
+                "allocated with the DB: %s",
+                minion_pool.id, action_id, existing_machines_to_allocate)
             db_api.set_minion_machines_allocation_statuses(
-                ctxt, all_machine_ids, action['id'],
-                constants.MINION_MACHINE_STATUS_ALLOCATED,
+                ctxt, list(existing_machines_to_allocate.keys()),
+                action_id, constants.MINION_MACHINE_STATUS_ALLOCATED,
                 refresh_allocation_time=True)
 
-        # filter out redundancies:
+            # add any new machine entries to the DB:
+            for new_machine in machine_db_entries_to_add:
+                LOG.info(
+                    "Adding new minion machine with ID '%s' to the DB for pool "
+                    "'%s' for use with action '%s'.",
+                    new_machine_id, minion_pool.id, action_id)
+                db_api.add_minion_machine(ctxt, new_machine)
+                new_machine_db_entries_added.append(new_machine.id)
+        except Exception as ex:
+            LOG.warn(
+                "Exception occured while adding new minion machine entries to "
+                "the DB for pool '%s' for use with action '%s'. Clearing "
+                "any DB entries added so far (%s). Error was: %s",
+                minion_pool.id, action_id,
+                [m.id for m in new_machine_db_entries_added],
+                utils.get_exception_details())
+            try:
+                LOG.debug(
+                    "Reverting the following pre-existing minion machines from"
+                    " pool '%s' to '%s' due to allocation error for action "
+                    "'%s': %s",
+                    minion_pool.id,
+                    constants.MINION_MACHINE_STATUS_AVAILABLE,
+                    action_id,
+                    list(existing_machines_to_allocate.keys()))
+                db_api.set_minion_machines_allocation_statuses(
+                    ctxt, list(existing_machines_to_allocate.keys()),
+                    None, constants.MINION_MACHINE_STATUS_AVAILABLE,
+                    refresh_allocation_time=False)
+            except Exception:
+                LOG.warn(
+                    "Failed to deallocate the following machines from pool "
+                    "'%s' following allocation error for action '%s': %s. "
+                    "Error trace was: %s",
+                    minion_pool.id, action_id, existing_machines_to_allocate,
+                    utils.get_exception_details())
+            for new_machine in new_machine_db_entries_added:
+                try:
+                    db_api.delete_minion_machine(ctxt, new_machine.id)
+                except Exception as ex:
+                    LOG.warn(
+                        "Error occured while removing minion machine entry "
+                        "'%s' from the DB. This may leave the pool in an "
+                        "inconsistent state. Error trace was: %s" % (
+                            new_machine.id, utils.get_exception_details()))
+                    continue
+            raise
+
+        return {
+            "flow": allocation_subflow,
+            "action_instance_minion_allocation_mappings": (
+                instance_minion_allocations)}
+
+    def _run_machine_allocation_subflow_for_action(
+            self, ctxt, action, action_type, include_transfer_minions=True,
+            include_osmorphing_minions=True):
+        """ Defines and starts a taskflow subflow for allocating minion
+        machines for the given action.
+        If there are no more minion machines available, upscaling will occur.
+        Also adds to the DB/marks as allocated any minion machines on the
+        spot.
+        """
+        required_action_properties = [
+            'id', 'instances', 'origin_minion_pool_id',
+            'destination_minion_pool_id',
+            'instance_osmorphing_minion_pool_mappings']
+        self._check_keys_for_action_dict(
+            action, required_action_properties,
+            operation="minion machine selection")
+
+        allocation_flow_name_format = None
+        machines_allocation_subflow_name_format = None
+        machine_action_allocation_subflow_name_format = None
+        allocation_failure_reporting_task_class = None
+        allocation_confirmation_reporting_task_class = None
+        if action_type == constants.TRANSFER_ACTION_TYPE_MIGRATION:
+            allocation_flow_name_format = (
+                minion_manager_tasks.MINION_POOL_MIGRATION_ALLOCATION_FLOW_NAME_FORMAT)
+            allocation_failure_reporting_task_class = (
+                minion_manager_tasks.ReportMinionAllocationFailureForMigrationTask)
+            allocation_confirmation_reporting_task_class = (
+                minion_manager_tasks.ConfirmMinionAllocationForMigrationTask)
+            machines_allocation_subflow_name_format = (
+                minion_manager_tasks.MINION_POOL_MIGRATION_ALLOCATION_SUBFLOW_NAME_FORMAT)
+            machine_action_allocation_subflow_name_format = (
+                minion_manager_tasks.MINION_POOL_ALLOCATE_MACHINES_FOR_MIGRATION_SUBFLOW_NAME_FORMAT)
+        elif action_type == constants.TRANSFER_ACTION_TYPE_REPLICA:
+            allocation_flow_name_format = (
+                minion_manager_tasks.MINION_POOL_REPLICA_ALLOCATION_FLOW_NAME_FORMAT)
+            allocation_failure_reporting_task_class = (
+                minion_manager_tasks.ReportMinionAllocationFailureForReplicaTask)
+            allocation_confirmation_reporting_task_class = (
+                minion_manager_tasks.ConfirmMinionAllocationForReplicaTask)
+            machines_allocation_subflow_name_format = (
+                minion_manager_tasks.MINION_POOL_REPLICA_ALLOCATION_SUBFLOW_NAME_FORMAT)
+            machine_action_allocation_subflow_name_format = (
+                minion_manager_tasks.MINION_POOL_ALLOCATE_MACHINES_FOR_REPLICA_SUBFLOW_NAME_FORMAT)
+        else:
+            raise exception.InvalidInput(
+                "Unknown transfer action type '%s'" % action_type)
+
+        # define main flow:
+        main_allocation_flow_name = (
+            allocation_flow_name_format % action['id'])
+        main_allocation_flow = linear_flow.Flow(main_allocation_flow_name)
         instance_machine_allocations = {
-            instance: allocations
-            for (instance, allocations) in instance_machine_allocations.items()
-            if allocations}
+            instance: {} for instance in action['instances']}
 
-        LOG.debug(
-            "Allocated the following minion machines for action '%s': %s",
-            action['id'], {
-                instance: {
-                    typ: machine.id
-                    for (typ, machine) in allocation.items()}
-                for (instance, allocation) in instance_machine_allocations.items()})
-        return instance_machine_allocations
+        # add allocation failure reporting task:
+        main_allocation_flow.add(
+            allocation_failure_reporting_task_class(
+                action['id']))
+
+        # define subflow for all the pool minions allocations:
+        machines_subflow = unordered_flow.Flow(
+            machines_allocation_subflow_name_format % action['id'])
+        new_pools_machines_db_entries = {}
+
+        # add subflow for origin pool:
+        if include_transfer_minions and action['origin_minion_pool_id']:
+            with minion_manager_utils.get_minion_pool_lock(
+                    action['origin_minion_pool_id'], external=True):
+                # fetch pool, origin endpoint, and initial store:
+                minion_pool = self._get_minion_pool(
+                    ctxt, action['origin_minion_pool_id'],
+                    include_machines=True, include_events=False,
+                    include_progress_updates=False)
+                endpoint_dict = self._conductor_client.get_endpoint(
+                    ctxt, minion_pool.endpoint_id)
+                origin_pool_store = self._get_pool_initial_taskflow_store_base(
+                    ctxt, minion_pool, endpoint_dict)
+
+                # add subflow for machine allocations from origin pool:
+                subflow_name = machine_action_allocation_subflow_name_format % (
+                    minion_pool.id, action['id'])
+                # NOTE: required to avoid internal taskflow conflicts
+                subflow_name = "origin-%s" % subflow_name
+                allocations_subflow_result = (
+                    self._make_minion_machine_allocation_subflow_for_action(
+                        ctxt, minion_pool, action['id'], action['instances'],
+                        subflow_name, inject_for_tasks=origin_pool_store))
+                machines_subflow.add(allocations_subflow_result['flow'])
+
+                # register each instances' origin minion:
+                source_machine_allocations = allocations_subflow_result[
+                    'action_instance_minion_allocation_mappings']
+                for (action_instance_id, allocated_minion_id) in (
+                        source_machine_allocations.items()):
+                    instance_machine_allocations[
+                        action_instance_id]['origin_minion_id'] = (
+                            allocated_minion_id)
+
+        # add subflow for destination pool:
+        if include_transfer_minions and action['destination_minion_pool_id']:
+            with minion_manager_utils.get_minion_pool_lock(
+                    action['destination_minion_pool_id'], external=True):
+                # fetch pool, destination endpoint, and initial store:
+                minion_pool = self._get_minion_pool(
+                    ctxt, action['destination_minion_pool_id'],
+                    include_machines=True, include_events=False,
+                    include_progress_updates=False)
+                endpoint_dict = self._conductor_client.get_endpoint(
+                    ctxt, minion_pool.endpoint_id)
+                destination_pool_store = (
+                    self._get_pool_initial_taskflow_store_base(
+                        ctxt, minion_pool, endpoint_dict))
+
+                # add subflow for machine allocations from destination pool:
+                subflow_name = machine_action_allocation_subflow_name_format % (
+                    minion_pool.id, action['id'])
+                # NOTE: required to avoid internal taskflow conflicts
+                subflow_name = "destination-%s" % subflow_name
+                allocations_subflow_result = (
+                    self._make_minion_machine_allocation_subflow_for_action(
+                        ctxt, minion_pool, action['id'], action['instances'],
+                        subflow_name,
+                        inject_for_tasks=destination_pool_store))
+                machines_subflow.add(allocations_subflow_result['flow'])
+                destination_machine_allocations = allocations_subflow_result[
+                    'action_instance_minion_allocation_mappings']
+
+                # register each instances' destination minion:
+                for (action_instance_id, allocated_minion_id) in (
+                        destination_machine_allocations.items()):
+                    instance_machine_allocations[
+                        action_instance_id]['destination_minion_id'] = (
+                            allocated_minion_id)
+
+        # add subflow for OSMorphing minions:
+        osmorphing_pool_instance_mappings = {}
+        for (action_instance_id, mapped_pool_id) in action[
+                'instance_osmorphing_minion_pool_mappings'].items():
+            if mapped_pool_id not in osmorphing_pool_instance_mappings:
+                osmorphing_pool_instance_mappings[
+                    mapped_pool_id] = [action_instance_id]
+            else:
+                osmorphing_pool_instance_mappings[mapped_pool_id].append(
+                    action_instance_id)
+        if include_osmorphing_minions and osmorphing_pool_instance_mappings:
+            for (osmorphing_pool_id, action_instance_ids) in (
+                    osmorphing_pool_instance_mappings.items()):
+                # if the destination pool was selected as an OSMorphing pool
+                # for any instances, we simply re-use all of the destination
+                # minions for said instances:
+                if action['destination_minion_pool_id'] and (
+                        include_osmorphing_minions and (
+                            osmorphing_pool_id == (
+                                action['destination_minion_pool_id']))):
+                    LOG.debug(
+                        "Reusing destination minion pool with ID '%s' for the "
+                        "following instances which had it selected as an "
+                        "OSMorphing pool for action '%s': %s",
+                        osmorphing_pool_id, action['id'], action_instance_ids)
+                    for instance in action_instance_ids:
+                        instance_machine_allocations[
+                            instance]['osmorphing_minion_id'] = (
+                                instance_machine_allocations[
+                                    instance]['destination_minion_id'])
+                    continue
+
+                with minion_manager_utils.get_minion_pool_lock(
+                        osmorphing_pool_id, external=True):
+                    # fetch pool, destination endpoint, and initial store:
+                    minion_pool = self._get_minion_pool(
+                        ctxt, osmorphing_pool_id,
+                        include_machines=True, include_events=False,
+                        include_progress_updates=False)
+                    endpoint_dict = self._conductor_client.get_endpoint(
+                        ctxt, minion_pool.endpoint_id)
+                    osmorphing_pool_store = self._get_pool_initial_taskflow_store_base(
+                        ctxt, minion_pool, endpoint_dict)
+
+                    # add subflow for machine allocations from osmorphing pool:
+                    subflow_name = machine_action_allocation_subflow_name_format % (
+                        minion_pool.id, action['id'])
+                    # NOTE: required to avoid internal taskflow conflicts
+                    subflow_name = "osmorphing-%s" % subflow_name
+                    allocations_subflow_result = (
+                        self._make_minion_machine_allocation_subflow_for_action(
+                            ctxt, minion_pool, action['id'],
+                            action_instance_ids,
+                            subflow_name, inject_for_tasks=osmorphing_pool_store))
+                    machines_subflow.add(allocations_subflow_result['flow'])
+
+                    # register each instances' osmorphing minion:
+                    osmorphing_machine_allocations = allocations_subflow_result[
+                        'action_instance_minion_allocation_mappings']
+                    for (action_instance_id, allocated_minion_id) in (
+                            osmorphing_machine_allocations.items()):
+                        instance_machine_allocations[
+                            action_instance_id]['osmorphing_minion_id'] = (
+                                allocated_minion_id)
+
+        # add the machines subflow to the main flow:
+        main_allocation_flow.add(machines_subflow)
+
+        # add final task to report minion machine availablity
+        # to the conductor at the end of the flow:
+        main_allocation_flow.add(
+            allocation_confirmation_reporting_task_class(
+                action['id'], instance_machine_allocations))
+
+        LOG.info(
+            "Starting main minion allocation flow '%s' for with ID '%s'. "
+            "The minion allocations will be: %s" % (
+                main_allocation_flow_name, action['id'],
+                instance_machine_allocations))
+
+        self._taskflow_runner.run_flow_in_background(
+            main_allocation_flow, store={"context": ctxt})
+
+        return main_allocation_flow
+
+    def _cleanup_machines_with_statuses_for_action(
+            self, ctxt, action_id, targeted_statuses, exclude_pools=None):
+        """ Deletes all minion machines which are marked with the given
+        from the DB.
+        """
+        if exclude_pools is None:
+            exclude_pools = []
+        machines = db_api.get_minion_machines(ctxt, action_id)
+        if not machines:
+            LOG.debug(
+                "No minion machines allocated to action '%s'. Returning.",
+                action_id)
+            return
+
+        pool_machine_mappings = {}
+        for machine in machines:
+            if machine.status not in targeted_statuses:
+                LOG.debug(
+                    "Skipping deletion of machine '%s' from pool '%s' as "
+                    "its status (%s) is not one of the targeted statuses (%s)",
+                    machine.id, machine.pool_id, machine.status,
+                    targeted_statuses)
+                continue
+            if machine.pool_id in exclude_pools:
+                LOG.debug(
+                    "Skipping deletion of machine '%s' (status '%s') from "
+                    "whitelisted pool '%s'", machine.id, machine.status,
+                    machine.pool_id)
+                continue
+
+            if machine.pool_id not in pool_machine_mappings:
+                pool_machine_mappings[machine.pool_id] = [machine]
+            else:
+                pool_machine_mappings[machine.pool_id].append(machine)
+
+        for (pool_id, machines) in pool_machine_mappings.items():
+            with minion_manager_utils.get_minion_pool_lock(
+                   pool_id, external=True):
+                for machine in machines:
+                    LOG.debug(
+                        "Deleting machine with ID '%s' (pool '%s', status '%s') "
+                        "from the DB.", machine.id, pool_id, machine.status)
+                    db_api.delete_minion_machine(ctxt, machine.id)
 
     def deallocate_minion_machine(self, ctxt, minion_machine_id):
 
@@ -587,11 +825,8 @@ class MinionManagerServerEndpoint(object):
             return
 
         machine_allocated_status = constants.MINION_MACHINE_STATUS_ALLOCATED
-        with lockutils.lock(
-                constants.MINION_POOL_LOCK_NAME_FORMAT % (
-                    minion_machine.pool_id),
-                external=True):
-
+        with minion_manager_utils.get_minion_pool_lock(
+                minion_machine.pool_id, external=True):
             if minion_machine.status != machine_allocated_status or (
                     not minion_machine.allocated_action):
                 LOG.warn(
@@ -625,33 +860,55 @@ class MinionManagerServerEndpoint(object):
                 action_id)
             return
 
-        minion_pool_ids = {
-            machine.pool_id for machine in allocated_minion_machines}
+        # categorise machine objects by pool:
+        pool_machine_mappings = {}
+        for machine in allocated_minion_machines:
+            if machine.pool_id not in pool_machine_mappings:
+                pool_machine_mappings[machine.pool_id] = []
+            pool_machine_mappings[machine.pool_id].append(machine)
+
+        # iterate over each pool and its machines allocated to this action:
+        for (pool_id, pool_machines) in pool_machine_mappings.items():
+            with minion_manager_utils.get_minion_pool_lock(
+                    pool_id, external=True):
+                machine_ids_to_deallocate = []
+                # NOTE: this is a workaround in case some crash/restart happens
+                # in the minion-manager service while new machine DB entries
+                # are added to the DB without their point of deployment being
+                # reached for them to ever get out of 'UNINITIALIZED' status:
+                for machine in pool_machines:
+                    if machine.status == (
+                            constants.MINION_MACHINE_STATUS_UNINITIALIZED):
+                        LOG.warn(
+                            "Found minion machine '%s' in pool '%s' which "
+                            "is in '%s' status. Removing from the DB "
+                            "entirely." % (
+                                machine.id, pool_id, machine.status))
+                        db_api.delete_minion_machine(
+                            ctxt, machine.id)
+                        LOG.info(
+                            "Successfully deleted minion machine entry '%s' "
+                            "from pool '%s' from the DB.", machine.id, pool_id)
+                        continue
+                    LOG.debug(
+                        "Going to mark minion machine '%s' (current status "
+                        "'%s') of pool '%s' as available following machine "
+                        "deallocation request for action '%s'.",
+                        machine.id, machine.status, pool_id, action_id)
+                    machine_ids_to_deallocate.append(machine.id)
+
+                LOG.info(
+                    "Marking minion machines '%s' from pool '%s' for "
+                    "as available after having been allocated to action '%s'.",
+                    machine_ids_to_deallocate, pool_id, action_id)
+                db_api.set_minion_machines_allocation_statuses(
+                    ctxt, machine_ids_to_deallocate, None,
+                    constants.MINION_MACHINE_STATUS_AVAILABLE,
+                    refresh_allocation_time=False)
 
         LOG.debug(
-            "Attempting to deallocate all minion pool machine selections "
-            "for action '%s'. Afferent pools are: %s",
-            action_id, minion_pool_ids)
-
-        with contextlib.ExitStack() as stack:
-            _ = [
-                stack.enter_context(
-                    lockutils.lock(
-                        constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
-                        external=True))
-                for pool_id in minion_pool_ids]
-
-            machine_ids = [m.id for m in allocated_minion_machines]
-            LOG.info(
-                "Releasing the following minion machines for "
-                "action '%s': %s", action_id, machine_ids)
-            db_api.set_minion_machines_allocation_statuses(
-                ctxt, machine_ids, None,
-                constants.MINION_MACHINE_STATUS_AVAILABLE,
-                refresh_allocation_time=False)
-            LOG.debug(
-                "Successfully released all minion machines associated "
-                "with action with base_id '%s'.", action_id)
+            "Successfully released all minion machines associated "
+            "with action with base_id '%s'.", action_id)
 
     def _get_minion_pool_healthcheck_flow(
             self, ctxt, minion_pool, requery=True):
@@ -847,19 +1104,23 @@ class MinionManagerServerEndpoint(object):
             self, ctxt, minion_pool, endpoint_dict):
         # NOTE: considering pools are associated to strictly one endpoint,
         # we can duplicate the 'origin/destination':
-        origin_dest_info = {
+        origin_info = {
             "id": endpoint_dict['id'],
             "connection_info": endpoint_dict['connection_info'],
             "mapped_regions": endpoint_dict['mapped_regions'],
             "type": endpoint_dict['type']}
         initial_store = {
             "context": ctxt,
-            "origin": origin_dest_info,
-            "destination": origin_dest_info,
+            "origin": origin_info,
+            "destination": origin_info,
             "task_info": {
                 "pool_identifier": minion_pool.id,
                 "pool_os_type": minion_pool.os_type,
                 "pool_environment_options": minion_pool.environment_options}}
+        shared_resources = minion_pool.shared_resources
+        if shared_resources is None:
+            shared_resources = {}
+        initial_store['task_info']['pool_shared_resources'] = shared_resources
         return initial_store
 
     def _check_pool_machines_in_use(
@@ -1529,3 +1790,217 @@ class MinionManagerServerEndpoint(object):
     #                 LOG.debug(
     #                     "No minion machines were found to be associated "
     #                     "with action with base_id '%s'.", action['base_id'])
+
+    # def _allocate_minion_machines_for_action(
+    #         self, ctxt, action, include_transfer_minions=True,
+    #         include_osmorphing_minions=True):
+    #     """ Returns a dict of the form:
+    #     {
+    #         "instance_id": {
+    #             "source_minion": <source minion properties>,
+    #             "destination_minion": <target minion properties>,
+    #             "osmorphing_minion": <osmorphing minion properties>
+    #         }
+    #     }
+    #     """
+    #     required_action_properties = [
+    #         'id', 'instances', 'origin_minion_pool_id',
+    #         'destination_minion_pool_id',
+    #         'instance_osmorphing_minion_pool_mappings']
+    #     self._check_keys_for_action_dict(
+    #         action, required_action_properties,
+    #         operation="minion machine selection")
+
+    #     instance_machine_allocations = {
+    #         instance: {} for instance in action['instances']}
+
+    #     minion_pool_ids = set()
+    #     if action['origin_minion_pool_id']:
+    #         minion_pool_ids.add(action['origin_minion_pool_id'])
+    #     if action['destination_minion_pool_id']:
+    #         minion_pool_ids.add(action['destination_minion_pool_id'])
+    #     if action['instance_osmorphing_minion_pool_mappings']:
+    #         minion_pool_ids = minion_pool_ids.union(set(
+    #             action['instance_osmorphing_minion_pool_mappings'].values()))
+    #     if None in minion_pool_ids:
+    #         minion_pool_ids.remove(None)
+
+    #     if not minion_pool_ids:
+    #         LOG.debug(
+    #             "No minion pool settings found for action '%s'. "
+    #             "Skipping minion machine allocations." % (
+    #                 action['id']))
+    #         return instance_machine_allocations
+
+    #     LOG.debug(
+    #         "All minion pool selections for action '%s': %s",
+    #         action['id'], minion_pool_ids)
+
+    #     def _select_machine(minion_pool, exclude=None):
+    #         if not minion_pool.minion_machines:
+    #             raise exception.InvalidMinionPoolSelection(
+    #                 "Minion pool with ID '%s' has no machines defined." % (
+    #                     minion_pool.id))
+    #         selected_machine = None
+    #         for machine in minion_pool.minion_machines:
+    #             if exclude and machine.id in exclude:
+    #                 LOG.debug(
+    #                     "Excluding minion machine '%s' from search.",
+    #                     machine.id)
+    #                 continue
+    #             if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
+    #                 LOG.debug(
+    #                     "Minion machine with ID '%s' is in status '%s' "
+    #                     "instead of '%s'. Skipping.", machine.id,
+    #                     machine.status,
+    #                     constants.MINION_MACHINE_STATUS_AVAILABLE)
+    #                 continue
+    #             selected_machine = machine
+    #             break
+    #         if not selected_machine:
+    #             raise exception.InvalidMinionPoolSelection(
+    #                 "There are no more available minion machines within minion"
+    #                 " pool with ID '%s' (excluding the following ones already "
+    #                 "planned for this transfer: %s). Please ensure that the "
+    #                 "minion pool has enough minion machines allocated and "
+    #                 "available (i.e. not being used for other operations) "
+    #                 "to satisfy the number of VMs required by the Migration or"
+    #                 " Replica." % (
+    #                     minion_pool.id, exclude))
+    #         return selected_machine
+
+    #     osmorphing_pool_map = (
+    #         action['instance_osmorphing_minion_pool_mappings'])
+    #     with contextlib.ExitStack() as stack:
+    #         _ = [
+    #             stack.enter_context(
+    #                 minion_manager_utils.get_minion_pool_lock(
+    #                     pool_id, external=True))
+    #             for pool_id in minion_pool_ids]
+
+    #         minion_pools = db_api.get_minion_pools(
+    #             ctxt, include_machines=True, to_dict=False)
+    #         minion_pool_id_mappings = {
+    #             pool.id: pool for pool in minion_pools
+    #             if pool.id in minion_pool_ids}
+
+    #         missing_pools = [
+    #             pool_id for pool_id in minion_pool_ids
+    #             if pool_id not in minion_pool_id_mappings]
+    #         if missing_pools:
+    #             raise exception.InvalidMinionPoolSelection(
+    #                 "The following minion pools could not be found: %s" % (
+    #                     missing_pools))
+
+    #         unallocated_pools = {
+    #             pool_id: pool.status
+    #             for (pool_id, pool) in minion_pool_id_mappings.items()
+    #             if pool.status != constants.MINION_POOL_STATUS_ALLOCATED}
+    #         if unallocated_pools:
+    #             raise exception.InvalidMinionPoolSelection(
+    #                 "The following minion pools have not had their machines "
+    #                 "allocated and thus cannot be used: %s" % (
+    #                     unallocated_pools))
+
+    #         allocated_source_machine_ids = set()
+    #         allocated_target_machine_ids = set()
+    #         allocated_osmorphing_machine_ids = set()
+    #         for instance in action['instances']:
+
+    #             if include_transfer_minions:
+    #                 if action['origin_minion_pool_id']:
+    #                     origin_pool = minion_pool_id_mappings[
+    #                         action['origin_minion_pool_id']]
+    #                     machine = _select_machine(
+    #                         origin_pool, exclude=allocated_source_machine_ids)
+    #                     allocated_source_machine_ids.add(machine.id)
+    #                     instance_machine_allocations[
+    #                         instance]['source_minion'] = machine
+    #                     LOG.debug(
+    #                         "Selected minion machine '%s' for source-side "
+    #                         "syncing of instance '%s' as part of transfer "
+    #                         "action '%s'.", machine.id, instance, action['id'])
+
+    #                 if action['destination_minion_pool_id']:
+    #                     dest_pool = minion_pool_id_mappings[
+    #                         action['destination_minion_pool_id']]
+    #                     machine = _select_machine(
+    #                         dest_pool, exclude=allocated_target_machine_ids)
+    #                     allocated_target_machine_ids.add(machine.id)
+    #                     instance_machine_allocations[
+    #                         instance]['destination_minion'] = machine
+    #                     LOG.debug(
+    #                         "Selected minion machine '%s' for target-side "
+    #                         "syncing of instance '%s' as part of transfer "
+    #                         "action '%s'.", machine.id, instance, action['id'])
+
+    #             if include_osmorphing_minions:
+    #                 if instance not in osmorphing_pool_map:
+    #                     LOG.debug(
+    #                         "Instance '%s' is not listed in the OSMorphing "
+    #                         "minion pool mappings for action '%s'." % (
+    #                             instance, action['id']))
+    #                 elif osmorphing_pool_map[instance] is None:
+    #                     LOG.debug(
+    #                         "OSMorphing pool ID for instance '%s' is "
+    #                         "None in action '%s'. Ignoring." % (
+    #                             instance, action['id']))
+    #                 else:
+    #                     osmorphing_pool_id = osmorphing_pool_map[instance]
+    #                     # if the selected target and OSMorphing pools
+    #                     # are the same, reuse the same worker:
+    #                     ima = instance_machine_allocations[instance]
+    #                     if osmorphing_pool_id == (
+    #                             action['destination_minion_pool_id']) and (
+    #                                 'destination_minion' in ima):
+    #                         allocated_target_machine = ima[
+    #                             'destination_minion']
+    #                         LOG.debug(
+    #                             "Reusing disk sync minion '%s' for the "
+    #                             "OSMorphing of instance '%s' as part of "
+    #                             "transfer action '%s'",
+    #                             allocated_target_machine.id, instance,
+    #                             action['id'])
+    #                         instance_machine_allocations[
+    #                             instance]['osmorphing_minion'] = (
+    #                                 allocated_target_machine)
+    #                     # else, allocate a new minion from the selected pool:
+    #                     else:
+    #                         osmorphing_pool = minion_pool_id_mappings[
+    #                             osmorphing_pool_id]
+    #                         machine = _select_machine(
+    #                             osmorphing_pool,
+    #                             exclude=allocated_osmorphing_machine_ids)
+    #                         allocated_osmorphing_machine_ids.add(machine.id)
+    #                         instance_machine_allocations[
+    #                             instance]['osmorphing_minion'] = machine
+    #                         LOG.debug(
+    #                             "Selected minion machine '%s' for OSMorphing "
+    #                             " of instance '%s' as part of transfer "
+    #                             "action '%s'.",
+    #                             machine.id, instance, action['id'])
+
+    #         # mark the selected machines as allocated:
+    #         all_machine_ids = set(itertools.chain(
+    #             allocated_source_machine_ids,
+    #             allocated_target_machine_ids,
+    #             allocated_osmorphing_machine_ids))
+    #         db_api.set_minion_machines_allocation_statuses(
+    #             ctxt, all_machine_ids, action['id'],
+    #             constants.MINION_MACHINE_STATUS_ALLOCATED,
+    #             refresh_allocation_time=True)
+
+    #     # filter out redundancies:
+    #     instance_machine_allocations = {
+    #         instance: allocations
+    #         for (instance, allocations) in instance_machine_allocations.items()
+    #         if allocations}
+
+    #     LOG.debug(
+    #         "Allocated the following minion machines for action '%s': %s",
+    #         action['id'], {
+    #             instance: {
+    #                 typ: machine.id
+    #                 for (typ, machine) in allocation.items()}
+    #             for (instance, allocation) in instance_machine_allocations.items()})
+    #     return instance_machine_allocations

+ 506 - 77
coriolis/minion_manager/rpc/tasks.py

@@ -10,14 +10,25 @@ from oslo_utils import timeutils
 from coriolis import constants
 from coriolis import exception
 from coriolis import utils
+from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
+from coriolis.minion_manager.rpc import client as rpc_minion_manager_client
 from coriolis.minion_manager.rpc import utils as minion_manager_utils
 from coriolis.taskflow import base as coriolis_taskflow_base
+from taskflow.types import failure
 
 
 LOG = logging.getLogger(__name__)
 
+MINION_POOL_MIGRATION_ALLOCATION_FLOW_NAME_FORMAT = (
+    "migration-%s-minions-allocation")
+MINION_POOL_REPLICA_ALLOCATION_FLOW_NAME_FORMAT = (
+    "replica-%s-minions-allocation")
+MINION_POOL_MIGRATION_ALLOCATION_SUBFLOW_NAME_FORMAT = (
+    "migration-%s-minions-machines-allocation")
+MINION_POOL_REPLICA_ALLOCATION_SUBFLOW_NAME_FORMAT = (
+    "replica-%s-minions-machines-allocation")
 MINION_POOL_ALLOCATION_FLOW_NAME_FORMAT = "pool-%s-allocation"
 MINION_POOL_DEALLOCATION_FLOW_NAME_FORMAT = "pool-%s-deallocation"
 MINION_POOL_HEALTHCHECK_FLOW_NAME_FORMAT = "pool-%s-healthcheck"
@@ -37,15 +48,293 @@ MINION_POOL_HEALTHCHECK_MACHINE_SUBFLOW_NAME_FORMAT = (
     "pool-%s-machine-%s-healthcheck")
 MINION_POOL_REALLOCATE_MACHINE_SUBFLOW_NAME_FORMAT = (
     "pool-%s-machine-%s-reallocation")
+MINION_POOL_ALLOCATE_MACHINES_FOR_REPLICA_SUBFLOW_NAME_FORMAT = (
+    "pool-%s-allocate-replica-%s-machines")
+MINION_POOL_ALLOCATE_MACHINES_FOR_MIGRATION_SUBFLOW_NAME_FORMAT = (
+    "pool-%s-allocate-migration-%s-machines")
 MINION_POOL_ALLOCATE_MACHINE_TASK_NAME_FORMAT = (
     "pool-%s-machine-%s-allocation")
 MINION_POOL_DEALLOCATE_MACHINE_TASK_NAME_FORMAT = (
     "pool-%s-machine-%s-deallocation")
+MINION_POOL_CONFIRM_MIGRATION_MINION_ALLOCATION_TASK_NAME_FORMAT = (
+    "migration-%s-minion-allocation-confirmation")
+MINION_POOL_CONFIRM_REPLICA_MINION_ALLOCATION_TASK_NAME_FORMAT = (
+    "replica-%s-minion-allocation-confirmation")
+MINION_POOL_REPORT_MIGRATION_ALLOCATION_FAILURE_TASK_NAME_FORMAT = (
+    "migration-%s-minion-allocation-failure")
+MINION_POOL_REPORT_REPLICA_ALLOCATION_FAILURE_TASK_NAME_FORMAT = (
+    "replica-%s-minion-allocation-failure")
+
+
+class MinionManagerTaskEventMixin(object):
+
+    # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated
+    # oslo_messaging clients as the underlying eventlet thread queues will
+    # be invalidated. Considering this class both serves from a "main
+    # process" as well as forking child processes, it is safest to
+    # re-instantiate the clients every time:
+    @property
+    def _conductor_client(self):
+        return rpc_conductor_client.ConductorClient()
+
+    @property
+    def _minion_manager_client(self):
+        return rpc_minion_manager_client.MinionManagerClient()
+
+    def _add_minion_pool_event(
+            self, context, message, level=constants.TASK_EVENT_INFO):
+        LOG.debug("Minion pool '%s' event: %s", self._minion_pool_id, message)
+        db_api.add_minion_pool_event(
+            context, self._minion_pool_id, level, message)
+
+    def _get_minion_machine(
+            self, context, minion_machine_id,
+            raise_if_not_found=False):
+        machine = db_api.get_minion_machine(context, minion_machine_id)
+        if not machine and raise_if_not_found:
+            raise exception.NotFound(
+                "Could not find minion machine with ID '%s'" % (
+                    minion_machine_id))
+        return machine
+
+
+class _BaseReportMinionAllocationFailureForActionTask(
+        coriolis_taskflow_base.BaseCoriolisTaskflowTask,
+        MinionManagerTaskEventMixin):
+    """ Task with no operation on `execute()`, but whose `revert()` method
+    reports a minion allocation failure to the conductor for the afferent
+    transfer action. """
+
+    def __init__(self, action_id, **kwargs):
+        self._action_id = action_id
+        self._task_name = self._get_task_name(action_id)
+        super(_BaseReportMinionAllocationFailureForActionTask, self).__init__(
+            name=self._task_name, **kwargs)
+
+    @abc.abstractmethod
+    def _get_task_name(self, action_id):
+        raise NotImplementedError(
+            "No allocation failure task name provided")
+
+    @abc.abstractmethod
+    def _report_machine_allocation_failure(
+            self, context, action_id, failure_str):
+        raise NotImplementedError(
+            "No allocation failure operation defined")
+
+    def execute(self, context):
+        super(
+            _BaseReportMinionAllocationFailureForActionTask, self).execute()
+        LOG.debug(
+            "Nothing to execute for task '%s'", self._task_name)
+
+    def revert(self, context, *args, **kwargs):
+        super(
+            _BaseReportMinionAllocationFailureForActionTask, self).revert(
+                *args, **kwargs)
+        flow_failures = kwargs.get('flow_failures', {})
+        flow_failures_str = self._get_error_str_for_flow_failures(
+            flow_failures, full_tracebacks=False)
+        LOG.info(
+            "Reporting minion allocation failure for action '%s': %s",
+            self._action_id, flow_failures_str)
+        self._minion_manager_client.deallocate_minion_machines_for_action(
+            context, self._action_id)
+        self._report_machine_allocation_failure(
+            context, self._action_id, flow_failures_str)
+
+
+class ReportMinionAllocationFailureForMigrationTask(
+        _BaseReportMinionAllocationFailureForActionTask):
+
+    def _get_task_name(self, action_id):
+        return MINION_POOL_REPORT_MIGRATION_ALLOCATION_FAILURE_TASK_NAME_FORMAT % (
+            action_id)
+
+    def _report_machine_allocation_failure(
+            self, context, action_id, failure_str):
+        self._conductor_client.report_migration_minions_allocation_error(
+            context, action_id, failure_str)
+
+
+class ReportMinionAllocationFailureForReplicaTask(
+        _BaseReportMinionAllocationFailureForActionTask):
+
+    def _get_task_name(self, action_id):
+        return MINION_POOL_REPORT_REPLICA_ALLOCATION_FAILURE_TASK_NAME_FORMAT % (
+            action_id)
+
+    def _report_machine_allocation_failure(
+            self, context, action_id, failure_str):
+        self._conductor_client.report_replica_minions_allocation_error(
+            context, action_id, failure_str)
+
+
+class _BaseConfirmMinionAllocationForActionTask(
+        coriolis_taskflow_base.BaseCoriolisTaskflowTask,
+        MinionManagerTaskEventMixin):
+    """ Task which confirms the minion machine allocations for the given action
+    to the conductor.
+    """
+
+    def __init__(self, action_id, allocated_machine_id_mappings, **kwargs):
+        """
+        param allocated_machine_id_mappings: dict of the form:
+        {
+            "<action_instance_identifier>": {
+                "origin_minion_id": "<origin_minion_id>",
+                "destination_minion_id": "<destination_minion_id>",
+                "osmorphing_minion_id": "<osmorphing_minion_id>"}}
+        """
+        self._action_id = action_id
+        self._task_name = self._get_task_name(action_id)
+        self._allocated_machine_id_mappings = allocated_machine_id_mappings
+        super(_BaseConfirmMinionAllocationForActionTask, self).__init__(
+            name=self._task_name, **kwargs)
+
+    @abc.abstractmethod
+    def _get_task_name(self, action_id):
+        raise NotImplementedError(
+            "No minion allocation confirmation task name defined")
+
+    @abc.abstractmethod
+    def _confirm_machine_allocation_for_action(
+            self, context, action_id, machine_allocations):
+        raise NotImplementedError(
+            "No minion allocation confrimation operation defined")
+
+    def execute(self, context):
+        machines_cache = {}
+        machine_allocations = {}
+
+        def _check_minion_properties(
+                minion_machine, instance, minion_purpose="unknown"):
+            if minion_machine.status != (
+                    constants.MINION_MACHINE_STATUS_ALLOCATED):
+                raise exception.InvalidMinionMachineState(
+                    "Minion machine with ID '%s' is in '%s' status instead "
+                    "of the expected '%s' for it to be used as a '%s' "
+                    "minion for instance '%s' of transfer action '%s'." % (
+                        minion_machine.id, minion_machine.status,
+                        constants.MINION_MACHINE_STATUS_ALLOCATED,
+                        minion_purpose, instance, self._action_id))
+
+            if minion_machine.allocated_action != self._action_id:
+                raise exception.InvalidMinionMachineState(
+                    "Minion machine with ID '%s' appears to be allocated to "
+                    "action with ID '%s' instead of the expected '%s' for it "
+                    "to be used as a '%s' minion for instance '%s'." % (
+                        minion_machine.id, minion_machine.allocated_action,
+                        self._action_id, minion_purpose, instance))
+
+            # TODO(aznashwan): add extra checks for conn info schemas here?
+            required_props = {
+                "provider_properties": minion_machine.provider_properties,
+                "connection_info": minion_machine.connection_info}
+            if not all(required_props.values()):
+                raise exception.InvalidMinionMachineState(
+                    "One or more required paroperties for minion machine '%s' "
+                    "(to be used as a '%s' minion for instance '%s' of action "
+                    "'%s') were missing: %s" % (
+                        minion_machine.id, minion_purpose, instance,
+                        self._action_id, required_props))
+
+        for (instance, allocated_machines_for_instance) in (
+                self._allocated_machine_id_mappings.items()):
+
+            if not allocated_machines_for_instance:
+                LOG.warn(
+                    "No machine allocations were provided for instance '%s' "
+                    "for action '%s'. Skipping. mappings were: %s",
+                    instance, self._action_id, allocated_machines_for_instance)
+                continue
+
+            if instance not in machine_allocations:
+                machine_allocations[instance] = {}
+
+            # check for and fetch the source minion:
+            origin_minion_id = allocated_machines_for_instance.get(
+                'origin_minion_id')
+            if origin_minion_id:
+                origin_minion_machine = machines_cache.get(origin_minion_id)
+                if not origin_minion_machine:
+                    origin_minion_machine = self._get_minion_machine(
+                        context, origin_minion_id, raise_if_not_found=True)
+                    machines_cache[origin_minion_id] = origin_minion_machine
+                    _check_minion_properties(
+                        origin_minion_machine, instance, minion_purpose="source")
+                machine_allocations[instance]['origin_minion'] = (
+                    origin_minion_machine.to_dict())
+
+            # check for and fetch the destination minion:
+            destination_minion_id = allocated_machines_for_instance.get(
+                'destination_minion_id')
+            if destination_minion_id:
+                destination_minion_machine = machines_cache.get(
+                    destination_minion_id)
+                if not destination_minion_machine:
+                    destination_minion_machine = self._get_minion_machine(
+                        context, destination_minion_id,
+                        raise_if_not_found=True)
+                    _check_minion_properties(
+                        destination_minion_machine, instance,
+                        minion_purpose="destination")
+                    machines_cache[destination_minion_id] = (
+                        destination_minion_machine)
+                machine_allocations[instance]['destination_minion'] = (
+                    destination_minion_machine.to_dict())
+
+            # check for and fetch the OSMorphing minion:
+            osmorphing_minion_id = allocated_machines_for_instance.get(
+                'osmorphing_minion_id')
+            if osmorphing_minion_id:
+                osmorphing_minion_machine = machines_cache.get(
+                    osmorphing_minion_id)
+                if not osmorphing_minion_machine:
+                    osmorphing_minion_machine = self._get_minion_machine(
+                        context, osmorphing_minion_id, raise_if_not_found=True)
+                    _check_minion_properties(
+                        osmorphing_minion_machine, instance,
+                        minion_purpose="OSMorphing")
+                    machines_cache[osmorphing_minion_id] = (
+                        osmorphing_minion_machine)
+                machine_allocations[instance]['osmorphing_minion'] = (
+                    osmorphing_minion_machine.to_dict())
+
+        self._confirm_machine_allocation_for_action(
+            context, self._action_id, machine_allocations)
+
+
+class ConfirmMinionAllocationForMigrationTask(
+        _BaseConfirmMinionAllocationForActionTask):
+
+    def _get_task_name(self, action_id):
+        return MINION_POOL_CONFIRM_MIGRATION_MINION_ALLOCATION_TASK_NAME_FORMAT % (
+            action_id)
+
+    def _confirm_machine_allocation_for_action(
+            self, context, action_id, machine_allocations):
+        self._conductor_client.confirm_migration_minions_allocation(
+            context, action_id, machine_allocations)
+
+
+class ConfirmMinionAllocationForReplicaTask(
+        _BaseConfirmMinionAllocationForActionTask):
+
+    def _get_task_name(self, action_id):
+        return MINION_POOL_CONFIRM_REPLICA_MINION_ALLOCATION_TASK_NAME_FORMAT % (
+            action_id)
+
+    def _confirm_machine_allocation_for_action(
+            self, context, action_id, machine_allocations):
+        self._conductor_client.confirm_replica_minions_allocation(
+            context, action_id, machine_allocations)
 
 
 class UpdateMinionPoolStatusTask(
-        coriolis_taskflow_base.BaseCoriolisTaskflowTask):
-    """Task which updates the status of the given pool.
+        coriolis_taskflow_base.BaseCoriolisTaskflowTask,
+        MinionManagerTaskEventMixin):
+    """ Task which updates the status of the given pool.
     Is capable of recording and reverting the state.
     """
 
@@ -65,12 +354,6 @@ class UpdateMinionPoolStatusTask(
         super(UpdateMinionPoolStatusTask, self).__init__(
             name=self._task_name, **kwargs)
 
-    def _add_minion_pool_event(
-            self, ctxt, message, level=constants.TASK_EVENT_INFO):
-        LOG.debug("Minion pool '%s' event: %s", self._minion_pool_id, message)
-        db_api.add_minion_pool_event(
-            ctxt, self._minion_pool_id, level, message)
-
     def execute(self, context, *args):
         super(UpdateMinionPoolStatusTask, self).execute(*args)
 
@@ -147,7 +430,9 @@ class UpdateMinionPoolStatusTask(
                     minion_pool.status, previous_status))
 
 
-class BaseMinionManangerTask(coriolis_taskflow_base.BaseRunWorkerTask):
+class BaseMinionManangerTask(
+        coriolis_taskflow_base.BaseRunWorkerTask,
+        MinionManagerTaskEventMixin):
 
     """Base taskflow.Task implementation for Minion Mananger tasks.
 
@@ -174,22 +459,6 @@ class BaseMinionManangerTask(coriolis_taskflow_base.BaseRunWorkerTask):
     def _get_task_name(self, minion_pool_id, minion_machine_id):
         raise NotImplementedError("No task name providable")
 
-    def _get_minion_machine(
-            self, ctxt, minion_machine_id,
-            raise_if_not_found=False):
-        machine = db_api.get_minion_machine(ctxt, minion_machine_id)
-        if not machine and raise_if_not_found:
-            raise exception.NotFound(
-                "Could not find minion machine with ID '%s'" % (
-                    minion_machine_id))
-        return machine
-
-    def _add_minion_pool_event(
-            self, ctxt, message, level=constants.TASK_EVENT_INFO):
-        LOG.debug("Minion pool '%s' event: %s", self._minion_pool_id, message)
-        db_api.add_minion_pool_event(
-            ctxt, self._minion_pool_id, level, message)
-
     def execute(self, context, origin, destination, task_info):
         LOG.info(
             "Starting minion pool task '%s' (runner type '%s')",
@@ -269,6 +538,22 @@ class AllocateSharedPoolResourcesTask(BaseMinionManangerTask):
             minion_pool_id)
 
     def execute(self, context, origin, destination, task_info):
+        with minion_manager_utils.get_minion_pool_lock(
+                self._minion_pool_id, external=True):
+            minion_pool = db_api.get_minion_pool(
+                context, self._minion_pool_id)
+            if not minion_pool:
+                raise exception.InvalidMinionPoolSelection(
+                    "[Task '%s'] Minion pool '%s' doesn't exist in the DB. "
+                    "It cannot have shared resources deployed for it." % (
+                        self._task_name))
+            if minion_pool.shared_resources:
+                raise exception.InvalidMinionPoolState(
+                    "[Task '%s'] Minion pool already has shared resources "
+                    "defined for it. Cannot re-deploy shared resources. "
+                    "DB entry is: %s" % (
+                        self._task_name, minion_pool.shared_resources))
+
         self._add_minion_pool_event(
             context, "Deploying shared pool resources")
         res = super(AllocateSharedPoolResourcesTask, self).execute(
@@ -279,34 +564,37 @@ class AllocateSharedPoolResourcesTask(BaseMinionManangerTask):
 
         updated_values = {
             "shared_resources": pool_shared_resources}
+
         db_api.add_minion_pool_event(
             context, self._minion_pool_id, constants.TASK_EVENT_INFO,
             "Successfully deployed shared pool resources" % (
                 pool_shared_resources))
-        db_api.update_minion_pool(
-            context, self._minion_pool_id, updated_values)
+        with minion_manager_utils.get_minion_pool_lock(
+                self._minion_pool_id, external=True):
+            db_api.update_minion_pool(
+                context, self._minion_pool_id, updated_values)
 
         task_info['pool_shared_resources'] = res['pool_shared_resources']
         return task_info
 
     def revert(self, context, origin, destination, task_info, **kwargs):
         if 'pool_shared_resources' not in task_info:
+            LOG.debug(
+                "[Task '%s'] Failed to find 'pool_shared_resources' in "
+                "provided task_info from original execution of allocation "
+                "task for pool '%s'. Defaulting to None.",
+                self._task_name, self._minion_pool_id)
             task_info['pool_shared_resources'] = {}
 
-        res = super(AllocateSharedPoolResourcesTask, self).revert(
+        super(AllocateSharedPoolResourcesTask, self).revert(
             context, origin, destination, task_info, **kwargs)
 
-        if res and res.get('pool_shared_resources'):
-            LOG.warn(
-                "Pool shared resources cleanup task has returned non-void "
-                "resources dict: %s", res.get['pool_shared_resources'])
-
-        updated_values = {
-            "pool_shared_resources": None}
-        db_api.update_minion_pool(
-            context, self._minion_pool_id, updated_values)
-
-        task_info['pool_shared_resources'] = None
+        with minion_manager_utils.get_minion_pool_lock(
+                self._minion_pool_id, external=True):
+            updated_values = {
+                "pool_shared_resources": None}
+            db_api.update_minion_pool(
+                context, self._minion_pool_id, updated_values)
 
 
 class DeallocateSharedPoolResourcesTask(BaseMinionManangerTask):
@@ -359,7 +647,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
 
     def __init__(
             self, minion_pool_id, minion_machine_id, minion_pool_type,
-            allocate_to_action=None, **kwargs):
+            raise_on_cleanup_failure=True, allocate_to_action=None, **kwargs):
         resource_deployment_task_type = (
             constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
         resource_cleanup_task_type = (
@@ -370,6 +658,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             resource_cleanup_task_type = (
                 constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
         self._allocate_to_action = allocate_to_action
+        self._raise_on_cleanup_failure = raise_on_cleanup_failure
         super(AllocateMinionMachineTask, self).__init__(
             minion_pool_id, minion_machine_id, resource_deployment_task_type,
             cleanup_task_runner_type=resource_cleanup_task_type, **kwargs)
@@ -379,12 +668,60 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             minion_pool_id, minion_machine_id)
 
     def execute(self, context, origin, destination, task_info):
-        minion_machine = models.MinionMachine()
-        minion_machine.id = self._minion_machine_id
-        minion_machine.pool_id = self._minion_pool_id
-        minion_machine.status = (
-            constants.MINION_MACHINE_STATUS_DEPLOYING)
-        db_api.add_minion_machine(context, minion_machine)
+        minion_machine = self._get_minion_machine(
+            context, self._minion_machine_id, raise_if_not_found=False)
+        if minion_machine:
+            if minion_machine.status != (
+                    constants.MINION_MACHINE_STATUS_UNINITIALIZED):
+                raise exception.InvalidMinionMachineState(
+                    "Minion machine entry with ID '%s' already exists within "
+                    "the DB and it is in '%s' status instead of the expected "
+                    "'%s' status. Existing machine's properties are: %s" % (
+                        self._minion_machine_id, minion_machine.status,
+                        constants.MINION_MACHINE_STATUS_UNINITIALIZED,
+                        minion_machine.to_dict()))
+            if minion_machine.pool_id != self._minion_pool_id:
+                raise exception.InvalidMinionMachineState(
+                    "Minion machine entry with ID '%s' already exists within "
+                    "the DB but it belongs to a different minion pool ('%s') "
+                    "from the one requested by this task ('%s')." % (
+                        self._minion_machine_id, minion_machine.pool_id,
+                        self._minion_pool_id))
+            if self._allocate_to_action and (
+                    minion_machine.allocated_action and (
+                        self._allocate_to_action != (
+                            minion_machine.allocated_action))):
+                raise exception.InvalidMinionMachineState(
+                    "Minion machine entry with ID '%s' already exists in the "
+                    "DB but it is already allocated to a different action "
+                    "('%s') from the one requested by the task ('%s')." % (
+                        self._minion_machine_id,
+                        minion_machine.allocated_action,
+                        self._allocate_to_action))
+            LOG.info(
+                "[Task '%s'] Found existing entry in DB for minion machine "
+                "'%s'. Reusing that for deployment task.",
+                self._task_name, self._minion_machine_id)
+            with minion_manager_utils.get_minion_pool_lock(
+                    self._minion_pool_id, external=True):
+                db_api.update_minion_machine(
+                    context, self._minion_machine_id,
+                    {"status": constants.MINION_MACHINE_STATUS_DEPLOYING})
+        else:
+            minion_machine = models.MinionMachine()
+            minion_machine.id = self._minion_machine_id
+            minion_machine.pool_id = self._minion_pool_id
+            minion_machine.status = (
+                constants.MINION_MACHINE_STATUS_DEPLOYING)
+            log_msg = (
+                "[Task '%s'] Adding new minion machine with ID '%s' "
+                "to the DB" % (self._task_name, self._minion_machine_id))
+            if self._allocate_to_action:
+                minion_machine.allocated_action = self._allocate_to_action
+                log_msg = "%s (allocated to action '%s')" % (
+                    log_msg, self._allocate_to_action)
+            LOG.info(log_msg)
+            db_api.add_minion_machine(context, minion_machine)
 
         execution_info = {
             "pool_environment_options": task_info["pool_environment_options"],
@@ -392,10 +729,15 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             "pool_shared_resources": task_info["pool_shared_resources"],
             "pool_os_type": task_info["pool_os_type"]}
 
-        self._add_minion_pool_event(
-            context,
+        event_message = (
             "Allocating minion machine with internal pool ID '%s'" % (
                 self._minion_machine_id))
+        if self._allocate_to_action:
+            event_message = (
+                "%s to be used for transfer action with ID '%s'" % (
+                    event_message, self._allocate_to_action))
+        self._add_minion_pool_event(
+            context, event_message)
 
         try:
             res = super(AllocateMinionMachineTask, self).execute(
@@ -421,43 +763,126 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
         if self._allocate_to_action:
             updated_values["allocated_action"] = self._allocate_to_action
             updated_values["status"] = (
-                constants.MINION_MACHINE_STATUS_RESERVED)
+                constants.MINION_MACHINE_STATUS_ALLOCATED)
         db_api.update_minion_machine(
             context, self._minion_machine_id, updated_values)
 
         return task_info
 
     def revert(self, context, origin, destination, task_info, **kwargs):
+
+        minion_provider_properties = None
+        task_info_minion_provider_properties = task_info.get(
+            'minion_provider_properties')
+
+        # check if the original result is a taskflow Failure object:
         original_result = kwargs.get('result', {})
-        if original_result:
-            if not isinstance(original_result, dict):
-                LOG.debug(
-                    "Reversion for Minion Machine '%s' (pool '%s') did not "
-                    "receive any dict result from the original run. Presuming "
-                    "that the task had not initially run successfully. "
-                    "Result was: %s",
-                    self._minion_machine_id, self._minion_pool_id,
+        if isinstance(original_result, failure.Failure):
+            LOG.debug(
+                "[Task '%s'] Reversion for allocation Minion Machine '%s' "
+                "(pool '%s') received a failure as the original result. "
+                "Presuming the original execution failed and found the "
+                "following 'machine_properties' key in task info: %s",
+                self._task_name, self._minion_machine_id,
+                self._minion_pool_id, task_info_minion_provider_properties)
+            LOG.warn(
+                "[Task '%s'] Allocation failed for machine '%s'. Error "
+                "details were: %s",
+                self._task_name, self._minion_machine_id,
+                original_result.traceback_str)
+        # else, if it's a dict, fetch it:
+        elif isinstance(original_result, dict):
+            minion_provider_properties = original_result.get(
+                'minion_provider_properties', None)
+        else:
+            LOG.warn(
+                "[Task '%s'] Allocation task reversion for machine '%s' "
+                "of pool '%s' got an unexpected task result type (%s): %s",
+                    self._task_name, self._minion_machine_id,
+                    self._minion_pool_id, type(original_result),
                     original_result)
-            elif 'minion_provider_properties' not in original_result:
+            minion_provider_properties = None
+
+        # default to any minion properties found in the task_info:
+        task_info_minion_provider_properties = task_info.get(
+            'minion_provider_properties')
+        if not minion_provider_properties:
+            LOG.debug(
+                "[Task '%s'] Reversion for Minion Machine '%s' (pool '%s')"
+                " did not return any 'minion_provider_properties' after "
+                "its initial execution. Defaulting to task_info value: %s",
+                self._task_name, self._minion_machine_id,
+                self._minion_pool_id,
+                task_info_minion_provider_properties)
+            minion_provider_properties = task_info_minion_provider_properties
+
+        # lastly, if the machine entry exists in the DB:
+        with minion_manager_utils.get_minion_pool_lock(
+            self._minion_pool_id, external=True):
+            machine_db_entry = (
+                db_api.get_minion_machine(context, self._minion_machine_id))
+            if machine_db_entry:
                 LOG.debug(
-                    "Reversion for Minion Machine '%s' (pool '%s') did not "
-                    "receive any result from the original run. Presuming "
-                    "that the task had not initially run successfully. "
-                    "Result was: %s",
-                    self._minion_machine_id, self._minion_pool_id,
-                    original_result)
+                    "[Task %s] Removing minion machine entry with ID '%s' for "
+                    "minion pool '%s' from the DB as part of reversion of its "
+                    "allocation task. Machine properties at deletion time "
+                    "were: %s",
+                    self._task_name, self._minion_machine_id,
+                    self._minion_pool_id,
+                    machine_db_entry.to_dict())
+                if not minion_provider_properties and (
+                        machine_db_entry.provider_properties):
+                    minion_provider_properties = (
+                         machine_db_entry.provider_properties)
+                    LOG.debug(
+                        "[Task '%s'] Using minion provider properties of "
+                        "minion machine with ID '%s' from DB entry during the "
+                        "reversion of its allocation task. DB props are: %s",
+                        self._task_name, self._minion_machine_id,
+                        minion_provider_properties)
 
-        cleanup_info = copy.deepcopy(task_info)
-        cleanup_info['minion_provider_properties'] = original_result[
-            'minion_provider_properties']
-        _ = super(AllocateMinionMachineTask, self).revert(
-            context, origin, destination, cleanup_info, **kwargs)
+            LOG.debug(
+                "[Task %s] Deleting minion machine with ID '%s' from the DB.",
+                self._task_name, self._minion_machine_id)
+            try:
+                db_api.delete_minion_machine(context, self._minion_machine_id)
+            except Exception as ex:
+                LOG.warn(
+                    "[Task '%s'] Failed to delete DB entry for minion machine "
+                    "'%s' following reversion of its allocation task. Error "
+                    "trace was: %s",
+                    self._task_name, self._minion_machine_id,
+                    utils.get_exception_details())
 
-        if db_api.get_minion_machine(context, self._minion_machine_id):
+        if not minion_provider_properties:
             LOG.debug(
-                "Removing minion machine entry with ID '%s' for minion pool "
-                "'%s' from the DB.", self._minion_machine_id, self._minion_pool_id)
-            db_api.delete_minion_machine(context, self._minion_machine_id)
+                "[Task '%s'] Reversion for Minion Machine '%s' (pool '%s')"
+                "found no 'minion_provider_properties'. Presuming the machine "
+                "never got created on the cloud and skiping any deletion task",
+                self._task_name, self._minion_machine_id,
+                self._minion_pool_id)
+        else:
+            cleanup_info = copy.deepcopy(task_info)
+            cleanup_info['minion_provider_properties'] = (
+                minion_provider_properties)
+            try:
+                super(AllocateMinionMachineTask, self).revert(
+                    context, origin, destination, cleanup_info, **kwargs)
+            except Exception as ex:
+                log_msg = (
+                    "[Task '%s'] Exception occured while attempting to revert "
+                    "deployment of minion machine with ID '%s' for pool '%s'." % (
+                        self._task_name, self._minion_machine_id,
+                        self._minion_pool_id))
+                if not self._raise_on_cleanup_failure:
+                    log_msg = (
+                        "%s Ignoring exception." % log_msg)
+                log_msg = (
+                    "%s Exception details were: %s" % (
+                        log_msg, utils.get_exception_details))
+                LOG.warn(log_msg)
+                if self._raise_on_cleanup_failure:
+                    raise
 
 
 class DeallocateMinionMachineTask(BaseMinionManangerTask):
@@ -523,7 +948,8 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
 
     def __init__(
             self, minion_pool_id, minion_machine_id, minion_pool_type,
-            **kwargs):
+            fail_on_error=False, **kwargs):
+        self._fail_on_error = fail_on_error
         resource_healthcheck_task = (
             constants.TASK_TYPE_HEALTHCHECK_SOURCE_MINION)
         if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
@@ -574,9 +1000,12 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
                 "Full trace was:\n%s", self._task_name,
                 self._minion_machine_id, self._minion_pool_id,
                 utils.get_exception_details())
-            res = {
-                "healthy": False,
-                "error": str(ex)}
+            if not self._fail_on_error:
+                res = {
+                    "healthy": False,
+                    "error": str(ex)}
+            else:
+                raise
 
         return res
 

+ 6 - 0
coriolis/minion_manager/rpc/utils.py

@@ -8,6 +8,12 @@ from oslo_concurrency import lockutils
 from coriolis import constants
 
 
+def get_minion_pool_lock(minion_pool_id, external=True):
+    return lockutils.lock(
+        constants.MINION_POOL_LOCK_NAME_FORMAT % minion_pool_id,
+        external=external)
+
+
 def minion_pool_synchronized(minion_pool_id, func):
     @functools.wraps(func)
     def wrapper(*args, **kwargs):

+ 3 - 5
coriolis/taskflow/base.py

@@ -189,7 +189,7 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
         LOG.debug(
             "Was offered the following worker service for executing TaskFlow "
             "task '%s' (taskflow ID %s): %s",
-            self._task_name, task_id, worker_service)
+            self._task_name, task_id, worker_service['id'])
 
         return rpc_worker_client.WorkerClient.from_service_definition(
             worker_service, timeout=rpc_timeout)
@@ -227,13 +227,12 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
     def revert(self, context, origin, destination, task_info, **kwargs):
         super(BaseRunWorkerTask, self).revert(
             context, origin, destination, task_info, **kwargs)
-        original_result = kwargs.get('result')
         if not self._cleanup_task_runner_type:
             LOG.debug(
                 "Task '%s' (main type '%s') had no cleanup task runner "
                 "associated with it. Skipping any reversion logic",
                 self._task_name, self._main_task_runner_type)
-            return original_result
+            return
 
         try:
             res = self._execute_task(
@@ -247,11 +246,10 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
                 self._cleanup_task_runner_type, utils.get_exception_details())
             if self._raise_on_cleanup_failure:
                 raise
-            return original_result
+            return
 
         LOG.debug(
             "Reversion of taskflow task '%s' (ID '%s') was successfully "
             "executed using task runner '%s' with the following result: %s" % (
                 self._task_name, self._task_id, self._cleanup_task_runner_type,
                 res))
-        return res

+ 7 - 1
coriolis/taskflow/runner.py

@@ -74,7 +74,13 @@ class TaskFlowRunner(object):
         engine.prepare()
 
         LOG.debug("Running flow with name '%s'", flow.name)
-        engine.run()
+        try:
+            engine.run()
+        except Exception as ex:
+            LOG.warn(
+                "Fatal error occured while attempting to run flow '%s'. "
+                "Full trace was: %s", flow.name, utils.get_exception_details())
+            raise
         LOG.info(
             "Successfully ran flow with name '%s'. Statistics were: %s",
             flow.name, engine.statistics)

+ 1 - 1
coriolis/tasks/minion_pool_tasks.py

@@ -910,7 +910,7 @@ class _BaseHealthcheckMinionMachineTask(base.TaskRunner):
         provider.healthcheck_minion(
             ctxt, connection_info, minion_properties, minion_connection_info)
 
-        return task_info
+        return {}
 
 
 class HealthcheckSourceMinionMachineTask(_BaseHealthcheckMinionMachineTask):