فهرست منبع

Add minion manager skeleton.

Nashwan Azhari 5 سال پیش
والد
کامیت
2599ca85ff

+ 0 - 9
coriolis/api/v1/views/minion_pool_view.py

@@ -13,15 +13,6 @@ def _format_minion_pool(req, minion_pool, keys=None):
     minion_pool_dict = dict(itertools.chain.from_iterable(
         transform(k, v) for k, v in minion_pool.items()))
 
-    # TODO(aznashwan): remove these redundancies once the base
-    # DB action model hirearchy will be overhauled:
-    for key in ["origin_endpoint_id", "destination_endpoint_id"]:
-        if key in minion_pool_dict:
-            minion_pool_dict["endpoint_id"] = minion_pool_dict.pop(key)
-    for key in ["source_environment", "destination_environment"]:
-        if key in minion_pool_dict:
-            minion_pool_dict["environment_options"] = minion_pool_dict.pop(key)
-
     def _hide_minion_creds(minion_conn):
         if 'pkey' in minion_conn:
             minion_conn['pkey'] = '***'

+ 31 - 0
coriolis/cmd/minion_manager.py

@@ -0,0 +1,31 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import sys
+
+from oslo_config import cfg
+
+from coriolis import constants
+from coriolis import service
+from coriolis import utils
+from coriolis.minion_manager.rpc import server as rpc_server
+
+CONF = cfg.CONF
+
+
+def main():
+    CONF(sys.argv[1:], project='coriolis',
+         version="1.0.0")
+    utils.setup_logging()
+
+    server = service.MessagingService(
+        constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC,
+        [rpc_server.MinionManagerServerEndpoint()],
+        rpc_server.VERSION, worker_count=1)
+    launcher = service.service.launch(
+        CONF, server, workers=server.get_workers_count())
+    launcher.wait()
+
+
+if __name__ == "__main__":
+    main()

+ 0 - 102
coriolis/conductor/rpc/client.py

@@ -402,105 +402,3 @@ class ConductorClient(rpc.BaseRPCClient):
     def delete_service(self, ctxt, service_id):
         return self._call(
             ctxt, 'delete_service', service_id=service_id)
-
-    def create_minion_pool(
-            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
-            environment_options, minimum_minions, maximum_minions,
-            minion_max_idle_time, minion_retention_strategy, notes=None):
-        return self._call(
-            ctxt, 'create_minion_pool', name=name, endpoint_id=endpoint_id,
-            pool_platform=pool_platform, pool_os_type=pool_os_type,
-            environment_options=environment_options,
-            minimum_minions=minimum_minions,
-            maximum_minions=maximum_minions,
-            minion_max_idle_time=minion_max_idle_time,
-            minion_retention_strategy=minion_retention_strategy,
-            notes=notes)
-
-    def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
-        return self._call(
-            ctxt, "set_up_shared_minion_pool_resources",
-            minion_pool_id=minion_pool_id)
-
-    def tear_down_shared_minion_pool_resources(
-            self, ctxt, minion_pool_id, force=False):
-        return self._call(
-            ctxt, "tear_down_shared_minion_pool_resources",
-            minion_pool_id=minion_pool_id, force=force)
-
-    def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
-        return self._call(
-            ctxt, "allocate_minion_pool_machines",
-            minion_pool_id=minion_pool_id)
-
-    def deallocate_minion_pool_machines(
-            self, ctxt, minion_pool_id, force=False):
-        return self._call(
-            ctxt, "deallocate_minion_pool_machines",
-            minion_pool_id=minion_pool_id,
-            force=force)
-
-    def get_minion_pools(self, ctxt):
-        return self._call(ctxt, 'get_minion_pools')
-
-    def get_minion_pool(self, ctxt, minion_pool_id):
-        return self._call(
-            ctxt, 'get_minion_pool', minion_pool_id=minion_pool_id)
-
-    def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
-        return self._call(
-            ctxt, 'update_minion_pool',
-            minion_pool_id=minion_pool_id, updated_values=updated_values)
-
-    def delete_minion_pool(self, ctxt, minion_pool_id):
-        return self._call(
-            ctxt, 'delete_minion_pool', minion_pool_id=minion_pool_id)
-
-    def get_minion_pool_lifecycle_executions(
-            self, ctxt, minion_pool_id, include_tasks=False):
-        return self._call(
-            ctxt, 'get_minion_pool_lifecycle_executions',
-            minion_pool_id=minion_pool_id, include_tasks=include_tasks)
-
-    def get_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id):
-        return self._call(
-            ctxt, 'get_minion_pool_lifecycle_execution',
-            minion_pool_id=minion_pool_id, execution_id=execution_id)
-
-    def delete_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id):
-        return self._call(
-            ctxt, 'delete_minion_pool_lifecycle_execution',
-            minion_pool_id=minion_pool_id, execution_id=execution_id)
-
-    def cancel_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id, force):
-        return self._call(
-            ctxt, 'cancel_minion_pool_lifecycle_execution',
-            minion_pool_id=minion_pool_id, execution_id=execution_id,
-            force=force)
-
-    def get_endpoint_source_minion_pool_options(
-            self, ctxt, endpoint_id, env, option_names):
-        return self._call(
-            ctxt, 'get_endpoint_source_minion_pool_options',
-            endpoint_id=endpoint_id, env=env, option_names=option_names)
-
-    def get_endpoint_destination_minion_pool_options(
-            self, ctxt, endpoint_id, env, option_names):
-        return self._call(
-            ctxt, 'get_endpoint_destination_minion_pool_options',
-            endpoint_id=endpoint_id, env=env, option_names=option_names)
-
-    def validate_endpoint_source_minion_pool_options(
-            self, ctxt, endpoint_id, pool_environment):
-        return self._call(
-            ctxt, 'validate_endpoint_source_minion_pool_options',
-            endpoint_id=endpoint_id, pool_environment=pool_environment)
-
-    def validate_endpoint_destination_minion_pool_options(
-            self, ctxt, endpoint_id, pool_environment):
-        return self._call(
-            ctxt, 'validate_endpoint_destination_minion_pool_options',
-            endpoint_id=endpoint_id, pool_environment=pool_environment)

+ 106 - 942
coriolis/conductor/rpc/server.py

@@ -20,6 +20,7 @@ from coriolis.db.sqlalchemy import models
 from coriolis import exception
 from coriolis import keystone
 from coriolis.licensing import client as licensing_client
+from coriolis.minion_manager.rpc import client as minion_manager_client
 from coriolis.replica_cron.rpc import client as rpc_cron_client
 from coriolis.scheduler.rpc import client as rpc_scheduler_client
 from coriolis import schemas
@@ -145,18 +146,6 @@ def tasks_execution_synchronized(func):
     return wrapper
 
 
-def minion_pool_tasks_execution_synchronized(func):
-    @functools.wraps(func)
-    def wrapper(self, ctxt, minion_pool_id, execution_id, *args, **kwargs):
-        @lockutils.synchronized(
-            constants.EXECUTION_LOCK_NAME_FORMAT % execution_id,
-            external=True)
-        def inner():
-            return func(self, ctxt, minion_pool_id, execution_id, *args, **kwargs)
-        return inner()
-    return wrapper
-
-
 def region_synchronized(func):
     @functools.wraps(func)
     def wrapper(self, ctxt, region_id, *args, **kwargs):
@@ -181,18 +170,6 @@ def service_synchronized(func):
     return wrapper
 
 
-def minion_pool_synchronized(func):
-    @functools.wraps(func)
-    def wrapper(self, ctxt, minion_pool_id, *args, **kwargs):
-        @lockutils.synchronized(
-            constants.MINION_POOL_LOCK_NAME_FORMAT % minion_pool_id,
-            external=True)
-        def inner():
-            return func(self, ctxt, minion_pool_id, *args, **kwargs)
-        return inner()
-    return wrapper
-
-
 class ConductorServerEndpoint(object):
     def __init__(self):
         self._licensing_client = licensing_client.LicensingClient.from_env()
@@ -226,6 +203,10 @@ class ConductorServerEndpoint(object):
                 rpc_cron_client.ReplicaCronClient())
         return self._replica_cron_client_instance
 
+    @property
+    def _minion_manager_client(self):
+        return minion_manager_client.MinionManagerClient()
+
     def get_all_diagnostics(self, ctxt):
         diagnostics = [
             self.get_diagnostics(ctxt),
@@ -740,19 +721,23 @@ class ConductorServerEndpoint(object):
         raise exception.NoSuitableWorkerServiceError(message)
 
     def _begin_tasks(
-            self, ctxt, execution, task_info={},
+            self, ctxt, action, execution, task_info_override=None,
             scheduling_retry_count=5, scheduling_retry_period=2):
         """ Starts all non-error-only tasks which have no depencies. """
         if not ctxt.trust_id:
             keystone.create_trust(ctxt)
             ctxt.delete_trust_id = True
 
-        origin = self._get_task_origin(ctxt, execution.action)
-        destination = self._get_task_destination(ctxt, execution.action)
+        task_info = action.info
+        if task_info_override is not None:
+            task_info = task_info_override
+
+        origin = self._get_task_origin(ctxt, action)
+        destination = self._get_task_destination(ctxt, action)
         origin_endpoint = db_api.get_endpoint(
-            ctxt, execution.action.origin_endpoint_id)
+            ctxt, action.origin_endpoint_id)
         destination_endpoint = db_api.get_endpoint(
-            ctxt, execution.action.destination_endpoint_id)
+            ctxt, action.destination_endpoint_id)
 
         newly_started_tasks = []
         for task in execution.tasks:
@@ -1143,7 +1128,7 @@ class ConductorServerEndpoint(object):
             db_api.add_replica_tasks_execution(ctxt, execution)
             LOG.info("Replica tasks execution created: %s", execution.id)
 
-            self._begin_tasks(ctxt, execution, task_info=replica.info)
+            self._begin_tasks(ctxt, replica, execution)
         except Exception:
             if minion_pool_allocations:
                 LOG.warn(
@@ -1266,7 +1251,7 @@ class ConductorServerEndpoint(object):
         db_api.add_replica_tasks_execution(ctxt, execution)
         LOG.info("Replica tasks execution created: %s", execution.id)
 
-        self._begin_tasks(ctxt, execution, task_info=replica.info)
+        self._begin_tasks(ctxt, replica, execution)
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
     @staticmethod
@@ -1282,114 +1267,6 @@ class ConductorServerEndpoint(object):
                 destination_endpoint.connection_info):
             raise exception.SameDestination()
 
-    def _check_minion_pools_for_action(self, ctxt, action):
-        minion_pools = {
-            pool.id: pool
-            for pool in db_api.get_minion_pool_lifecycles(
-                ctxt, include_tasks_executions=False,
-                include_info=False, include_machines=False,
-                to_dict=False)}
-        def _get_pool(pool_id):
-            pool = minion_pools.get(pool_id)
-            if not pool:
-                raise exception.NotFound(
-                    "Could not find minion pool with ID '%s'." % pool_id)
-            return pool
-
-        if action.origin_minion_pool_id:
-            origin_pool = _get_pool(action.origin_minion_pool_id)
-            if origin_pool.origin_endpoint_id != action.origin_endpoint_id:
-                raise exception.InvalidMinionPoolSelection(
-                    "The selected origin minion pool ('%s') belongs to a "
-                    "different Coriolis endpoint ('%s') than the requested "
-                    "origin endpoint ('%s')" % (
-                        action.origin_minion_pool_id,
-                        origin_pool.origin_endpoint_id,
-                        action.origin_endpoint_id))
-            if origin_pool.pool_platform != constants.PROVIDER_PLATFORM_SOURCE:
-                raise exception.InvalidMinionPoolSelection(
-                    "The selected origin minion pool ('%s') is configured as a"
-                    " '%s' pool. The pool must be of type %s to be used for "
-                    "data exports." % (
-                        action.origin_minion_pool_id,
-                        origin_pool.pool_platform,
-                        constants.PROVIDER_PLATFORM_SOURCE))
-            if origin_pool.pool_os_type != constants.OS_TYPE_LINUX:
-                raise exception.InvalidMinionPoolSelection(
-                    "The selected origin minion pool ('%s') is of OS type '%s'"
-                    " instead of the Linux OS type required for a source "
-                    "transfer minion pool." % (
-                        action.origin_minion_pool_id,
-                        origin_pool.pool_os_type))
-            LOG.debug(
-                "Successfully validated compatibility of origin minion pool "
-                "'%s' for use with action '%s'." % (
-                    action.origin_minion_pool_id, action.id))
-
-        if action.destination_minion_pool_id:
-            destination_pool = _get_pool(action.destination_minion_pool_id)
-            if destination_pool.origin_endpoint_id != (
-                    action.destination_endpoint_id):
-                raise exception.InvalidMinionPoolSelection(
-                    "The selected destination minion pool ('%s') belongs to a "
-                    "different Coriolis endpoint ('%s') than the requested "
-                    "destination endpoint ('%s')" % (
-                        action.destination_minion_pool_id,
-                        destination_pool.origin_endpoint_id,
-                        action.destination_endpoint_id))
-            if destination_pool.pool_platform != (
-                    constants.PROVIDER_PLATFORM_DESTINATION):
-                raise exception.InvalidMinionPoolSelection(
-                    "The selected destination minion pool ('%s') is configured"
-                    " as a '%s'. The pool must be of type %s to be used for "
-                    "data imports." % (
-                        action.destination_minion_pool_id,
-                        destination_pool.pool_platform,
-                        constants.PROVIDER_PLATFORM_DESTINATION))
-            if destination_pool.pool_os_type != constants.OS_TYPE_LINUX:
-                raise exception.InvalidMinionPoolSelection(
-                    "The selected destination minion pool ('%s') is of OS type"
-                    " '%s' instead of the Linux OS type required for a source "
-                    "transfer minion pool." % (
-                        action.destination_minion_pool_id,
-                        destination_pool.pool_os_type))
-            LOG.debug(
-                "Successfully validated compatibility of destination minion "
-                "pool '%s' for use with action '%s'." % (
-                    action.origin_minion_pool_id, action.id))
-
-        if action.instance_osmorphing_minion_pool_mappings:
-            osmorphing_pool_mappings = {
-                instance_id: pool_id
-                for (instance_id, pool_id) in (
-                    action.instance_osmorphing_minion_pool_mappings.items())
-                if pool_id}
-            for (instance, pool_id) in osmorphing_pool_mappings.items():
-                osmorphing_pool = _get_pool(pool_id)
-                if osmorphing_pool.origin_endpoint_id != (
-                        action.destination_endpoint_id):
-                    raise exception.InvalidMinionPoolSelection(
-                        "The selected OSMorphing minion pool for instance '%s'"
-                        " ('%s') belongs to a different Coriolis endpoint "
-                        "('%s') than the destination endpoint ('%s')" % (
-                            instance, pool_id,
-                            osmorphing_pool.origin_endpoint_id,
-                            action.destination_endpoint_id))
-                if osmorphing_pool.pool_platform != (
-                        constants.PROVIDER_PLATFORM_DESTINATION):
-                    raise exception.InvalidMinionPoolSelection(
-                        "The selected OSMorphing minion pool for instance '%s'"
-                        "  ('%s') is configured as a '%s' pool. The pool must "
-                        "be of type %s to be used for OSMorphing." % (
-                            instance, pool_id,
-                            osmorphing_pool.pool_platform,
-                            constants.PROVIDER_PLATFORM_DESTINATION))
-                LOG.debug(
-                    "Successfully validated compatibility of destination "
-                    "minion pool '%s' for use as OSMorphing minion for "
-                    "instance '%s' during action '%s'." % (
-                        action.origin_minion_pool_id, instance, action.id))
-
     def create_instances_replica(self, ctxt, origin_endpoint_id,
                                  destination_endpoint_id,
                                  origin_minion_pool_id,
@@ -1728,7 +1605,7 @@ class ConductorServerEndpoint(object):
             db_api.add_migration(ctxt, migration)
             LOG.info("Migration created: %s", migration.id)
 
-            self._begin_tasks(ctxt, execution, task_info=migration.info)
+            self._begin_tasks(ctxt, migration, execution)
         except Exception:
             if minion_pool_allocations:
                 LOG.warn(
@@ -1757,255 +1634,17 @@ class ConductorServerEndpoint(object):
     def _allocate_minion_machines_for_action(
             self, ctxt, action, include_transfer_minions=True,
             include_osmorphing_minions=True):
-        """ Returns a dict of the form:
-        {
-            "instance_id": {
-                "source_minion": <source minion properties>,
-                "target_minion": <target minion properties>,
-                "osmorphing_minion": <osmorphing minion properties>
-            }
-        }
-        """
-        instance_machine_allocations = {
-            instance: {} for instance in action.instances}
-
-        minion_pool_ids = set()
-        if action.origin_minion_pool_id:
-            minion_pool_ids.add(action.origin_minion_pool_id)
-        if action.destination_minion_pool_id:
-            minion_pool_ids.add(action.destination_minion_pool_id)
-        if action.instance_osmorphing_minion_pool_mappings:
-            minion_pool_ids = minion_pool_ids.union(set(
-                action.instance_osmorphing_minion_pool_mappings.values()))
-        if None in minion_pool_ids:
-            minion_pool_ids.remove(None)
-
-        if not minion_pool_ids:
-            LOG.debug(
-                "No minion pool settings found for action '%s'. "
-                "Skipping minion machine allocations." % (
-                    action.id))
-            return instance_machine_allocations
-
-        LOG.debug(
-            "All minion pool selections for action '%s': %s",
-            action.id, minion_pool_ids)
-
-        def _select_machine(minion_pool, exclude=None):
-            if not minion_pool.minion_machines:
-                raise exception.InvalidMinionPoolSelection(
-                    "Minion pool with ID '%s' has no machines defined." % (
-                        minion_pool.id))
-            selected_machine = None
-            for machine in minion_pool.minion_machines:
-                if exclude and machine.id in exclude:
-                    LOG.debug(
-                        "Excluding minion machine '%s' from search.",
-                        machine.id)
-                    continue
-                if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
-                    LOG.debug(
-                        "Minion machine with ID '%s' is in status '%s' "
-                        "instead of '%s'. Skipping.", machine.id,
-                        machine.status,
-                        constants.MINION_MACHINE_STATUS_AVAILABLE)
-                    continue
-                selected_machine = machine
-                break
-            if not selected_machine:
-                raise exception.InvalidMinionPoolSelection(
-                    "There are no more available minion machines within minion"
-                    " pool with ID '%s' (excluding the following ones already "
-                    "planned for this transfer: %s). Please ensure that the "
-                    "minion pool has enough minion machines allocated and "
-                    "available (i.e. not being used for other operations) "
-                    "to satisfy the number of VMs required by the Migration or"
-                    " Replica." % (
-                        minion_pool.id, exclude))
-            return selected_machine
-
-        osmorphing_pool_map = (
-            action.instance_osmorphing_minion_pool_mappings)
-        with contextlib.ExitStack() as stack:
-            _ = [
-                stack.enter_context(
-                    lockutils.lock(
-                        constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
-                        external=True))
-                for pool_id in minion_pool_ids]
-
-            minion_pools = db_api.get_minion_pool_lifecycles(
-                ctxt, include_tasks_executions=False, include_info=False,
-                include_machines=True, to_dict=False)
-            minion_pool_id_mappings = {
-                pool.id: pool for pool in minion_pools
-                if pool.id in minion_pool_ids}
-
-            missing_pools = [
-                pool_id for pool_id in minion_pool_ids
-                if pool_id not in minion_pool_id_mappings]
-            if missing_pools:
-                raise exception.InvalidMinionPoolSelection(
-                    "The following minion pools could not be found: %s" % (
-                        missing_pools))
-
-            unallocated_pools = {
-                pool_id: pool.pool_status
-                for (pool_id, pool) in minion_pool_id_mappings.items()
-                if pool.pool_status != constants.MINION_POOL_STATUS_ALLOCATED}
-            if unallocated_pools:
-                raise exception.InvalidMinionPoolSelection(
-                    "The following minion pools have not had their machines "
-                    "allocated and thus cannot be used: %s" % (
-                        unallocated_pools))
-
-            allocated_source_machine_ids = set()
-            allocated_target_machine_ids = set()
-            allocated_osmorphing_machine_ids = set()
-            for instance in action.instances:
-
-                if include_transfer_minions:
-                    if action.origin_minion_pool_id:
-                        origin_pool = minion_pool_id_mappings[
-                            action.origin_minion_pool_id]
-                        machine = _select_machine(
-                            origin_pool, exclude=allocated_source_machine_ids)
-                        allocated_source_machine_ids.add(machine.id)
-                        instance_machine_allocations[
-                            instance]['source_minion'] = machine
-                        LOG.debug(
-                            "Selected minion machine '%s' for source-side "
-                            "syncing of instance '%s' as part of transfer "
-                            "action '%s'.", machine.id, instance, action.id)
-
-                    if action.destination_minion_pool_id:
-                        dest_pool = minion_pool_id_mappings[
-                            action.destination_minion_pool_id]
-                        machine = _select_machine(
-                            dest_pool, exclude=allocated_target_machine_ids)
-                        allocated_target_machine_ids.add(machine.id)
-                        instance_machine_allocations[
-                            instance]['target_minion'] = machine
-                        LOG.debug(
-                            "Selected minion machine '%s' for target-side "
-                            "syncing of instance '%s' as part of transfer "
-                            "action '%s'.", machine.id, instance, action.id)
-
-                if include_osmorphing_minions:
-                    if instance not in osmorphing_pool_map:
-                        LOG.debug(
-                            "Instance '%s' is not listed in the OSMorphing "
-                            "minion pool mappings for action '%s'." % (
-                                instance, action.id))
-                    elif osmorphing_pool_map[instance] is None:
-                        LOG.debug(
-                            "OSMorphing pool ID for instance '%s' is "
-                            "None in action '%s'. Ignoring." % (
-                                instance, action.id))
-                    else:
-                        osmorphing_pool_id = osmorphing_pool_map[instance]
-                        # if the selected target and OSMorphing pools
-                        # are the same, reuse the same worker:
-                        ima = instance_machine_allocations[instance]
-                        if osmorphing_pool_id == (
-                                action.destination_minion_pool_id) and (
-                                    'target_minion' in ima):
-                            allocated_target_machine = ima[
-                                'target_minion']
-                            LOG.debug(
-                                "Reusing disk sync minion '%s' for the "
-                                "OSMorphing of instance '%s' as port of "
-                                "transfer action '%s'",
-                                allocated_target_machine.id, instance,
-                                action.id)
-                            instance_machine_allocations[
-                                instance]['osmorphing_minion'] = (
-                                    allocated_target_machine)
-                        # else, allocate a new minion from the selected pool:
-                        else:
-                            osmorphing_pool = minion_pool_id_mappings[
-                                osmorphing_pool_id]
-                            machine = _select_machine(
-                                osmorphing_pool,
-                                exclude=allocated_osmorphing_machine_ids)
-                            allocated_osmorphing_machine_ids.add(machine.id)
-                            instance_machine_allocations[
-                                instance]['osmorphing_minion'] = machine
-                            LOG.debug(
-                                "Selected minion machine '%s' for OSMorphing "
-                                " of instance '%s' as part of transfer "
-                                "action '%s'.",
-                                machine.id, instance, action.id)
-
-            # mark the selected machines as allocated:
-            all_machine_ids = set(itertools.chain(
-                allocated_source_machine_ids,
-                allocated_target_machine_ids,
-                allocated_osmorphing_machine_ids))
-            db_api.set_minion_machines_allocation_statuses(
-                ctxt, all_machine_ids, action.id,
-                constants.MINION_MACHINE_STATUS_ALLOCATED)
-
-        # filter out redundancies:
-        instance_machine_allocations = {
-            instance: allocations
-            for (instance, allocations) in instance_machine_allocations.items()
-            if allocations}
-
-        LOG.debug(
-            "Allocated the following minion machines for action '%s': %s",
-            action.id, {
-                instance: {
-                    typ: machine.id
-                    for (typ, machine) in allocation.items()}
-                for (instance, allocation) in instance_machine_allocations.items()})
-        return instance_machine_allocations
+        return self._minion_manager_client.allocate_minion_machines_for_action(
+            ctxt, action.id, include_transfer_minions=include_transfer_minions,
+            include_osmorphing_minions=include_osmorphing_minions)
 
     def _deallocate_minion_machines_for_action(self, ctxt, action):
-        minion_pool_ids = set()
-        if action.origin_minion_pool_id:
-            minion_pool_ids.add(action.origin_minion_pool_id)
-        if action.destination_minion_pool_id:
-            minion_pool_ids.add(action.destination_minion_pool_id)
-        if action.instance_osmorphing_minion_pool_mappings:
-            minion_pool_ids = minion_pool_ids.union(set(
-                action.instance_osmorphing_minion_pool_mappings.values()))
-        if None in minion_pool_ids:
-            minion_pool_ids.remove(None)
-
-        if not minion_pool_ids:
-            LOG.debug(
-                "No minion pools seem to have been used for action with "
-                "base_id '%s'. Skipping minion machine deallocation.",
-                action.base_id)
-        else:
-            LOG.debug(
-                "Attempting to deallocate all minion pool machine selections "
-                "for action '%s'. Afferent pools are: %s",
-                action.base_id, minion_pool_ids)
-
-            with contextlib.ExitStack() as stack:
-                _ = [
-                    stack.enter_context(
-                        lockutils.lock(
-                            constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
-                            external=True))
-                    for pool_id in minion_pool_ids]
-
-                minion_machines = db_api.get_minion_machines(
-                    ctxt, allocated_action_id=action.base_id)
-                machine_ids = [m.id for m in minion_machines]
-                if machine_ids:
-                    LOG.info(
-                        "Releasing the following minion machines for "
-                        "action '%s': %s", action.base_id, machine_ids)
-                    db_api.set_minion_machines_allocation_statuses(
-                        ctxt, machine_ids, None,
-                        constants.MINION_MACHINE_STATUS_AVAILABLE)
-                else:
-                    LOG.debug(
-                        "No minion machines were found to be associated "
-                        "with action with base_id '%s'.", action.base_id)
+        return self._minion_manager_client.deallocate_minion_machines_for_action(
+            ctxt, action.id)
+
+    def _check_minion_pools_for_action(self, ctxt, action):
+        return self._minion_manager_client.validate_minion_pool_selections_for_action(
+            ctxt, action.id)
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, origin_minion_pool_id,
@@ -2382,7 +2021,7 @@ class ConductorServerEndpoint(object):
             db_api.add_migration(ctxt, migration)
 
             LOG.info("Migration created: %s", migration.id)
-            self._begin_tasks(ctxt, execution, task_info=migration.info)
+            self._begin_tasks(ctxt, migration, execution)
         except Exception:
             if minion_pool_allocations:
                 LOG.warn(
@@ -2599,84 +2238,30 @@ class ConductorServerEndpoint(object):
             {"id": execution_id, "status": execution_status,
              "action": execution.action_id})
 
-        if execution.type in constants.MINION_POOL_EXECUTION_TYPES:
-            self._update_minion_pool_status_for_finished_execution(
-                ctxt, execution, execution_status)
-        else:
-            if execution_status in constants.FINALIZED_EXECUTION_STATUSES:
-                LOG.debug(
-                    "Attempting to deallocate minion machines for finalized "
-                    "Execution '%s' of type '%s' (action '%s') following "
-                    "its transition into finalized status '%s'",
-                    execution.id, execution.type, execution.action_id,
-                    execution_status)
-                action = db_api.get_action(ctxt, execution.action_id)
-                self._deallocate_minion_machines_for_action(
-                    ctxt, action)
-
-                if ctxt.delete_trust_id:
-                    LOG.debug(
-                        "Deleting Keystone trust following status change "
-                        "for Execution '%s' (action '%s') to '%s'",
-                        execution_id, execution.action_id, execution_status)
-                    keystone.delete_trust(ctxt)
-            else:
+        if execution_status in constants.FINALIZED_EXECUTION_STATUSES:
+            LOG.debug(
+                "Attempting to deallocate minion machines for finalized "
+                "Execution '%s' of type '%s' (action '%s') following "
+                "its transition into finalized status '%s'",
+                execution.id, execution.type, execution.action_id,
+                execution_status)
+            action = db_api.get_action(ctxt, execution.action_id)
+            self._deallocate_minion_machines_for_action(
+                ctxt, action)
+
+            if ctxt.delete_trust_id:
                 LOG.debug(
-                    "Not deallocating minion machines for Execution '%s' "
-                    "of type '%s' (action '%s') yet as it is still in an "
-                    "active Execution status (%s)",
-                    execution.id, execution.type, execution.action_id,
-                    execution_status)
-
-    @staticmethod
-    def _update_minion_pool_status_for_finished_execution(
-            ctxt, execution, new_execution_status):
-        # status map if execution is active:
-        stat_map = {
-            constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
-                constants.MINION_POOL_STATUS_ALLOCATING,
-            constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
-                constants.MINION_POOL_STATUS_DEALLOCATING,
-            constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
-                constants.MINION_POOL_STATUS_INITIALIZING,
-            constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
-                constants.MINION_POOL_STATUS_UNINITIALIZING}
-        if new_execution_status == constants.EXECUTION_STATUS_COMPLETED:
-            stat_map = {
-                constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_ALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_DEALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_DEALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_UNINITIALIZED}
-        elif new_execution_status in constants.FINALIZED_TASK_STATUSES:
-            stat_map = {
-                constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_DEALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
-                    constants.MINION_POOL_STATUS_ALLOCATED,
-                constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_UNINITIALIZED,
-                constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
-                    constants.MINION_POOL_STATUS_UNINITIALIZED}
-        final_pool_status = stat_map.get(execution.type)
-        if not final_pool_status:
-            LOG.error(
-                "Could not determine pool status following transition of "
-                "execution '%s' (type '%s') to status '%s'. Presuming error "
-                "has occured. Marking piil as error'd.",
-                execution.id, execution.type, new_execution_status)
-            final_pool_status = constants.MINION_POOL_STATUS_ERROR
-
-        LOG.info(
-            "Marking minion pool '%s' status as '%s' in the DB following the "
-            "transition of execution '%s' (type '%s') to status '%s'.",
-            execution.action_id, final_pool_status, execution.id,
-            execution.type, new_execution_status)
-        db_api.set_minion_pool_lifecycle_status(
-            ctxt, execution.action_id, final_pool_status)
+                    "Deleting Keystone trust following status change "
+                    "for Execution '%s' (action '%s') to '%s'",
+                    execution_id, execution.action_id, execution_status)
+                keystone.delete_trust(ctxt)
+        else:
+            LOG.debug(
+                "Not deallocating minion machines for Execution '%s' "
+                "of type '%s' (action '%s') yet as it is still in an "
+                "active Execution status (%s)",
+                execution.id, execution.type, execution.action_id,
+                execution_status)
 
     @parent_tasks_execution_synchronized
     def set_task_host(self, ctxt, task_id, host):
@@ -3258,61 +2843,61 @@ class ConductorServerEndpoint(object):
                     db_api.update_replica(
                         ctxt, execution.action_id, task_info)
 
-        elif task_type in (
-                constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES,
-                constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES):
-            still_running = _check_other_tasks_running(execution, task)
-            if not still_running:
-                LOG.info(
-                    "Updating 'pool_shared_resources' for pool %s after "
-                    "completion of task '%s' (type '%s').",
-                    execution.action_id, task.id, task_type)
-                db_api.update_minion_pool_lifecycle(
-                    ctxt, execution.action_id, {
-                        "pool_shared_resources": task_info.get(
-                            "pool_shared_resources", {})})
-
-        elif task_type in (
-                constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
-                constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES):
-            still_running = _check_other_tasks_running(execution, task)
-            if not still_running:
-                LOG.info(
-                    "Clearing 'pool_shared_resources' for pool %s following "
-                    "completion of task '%s' (type %s)",
-                    execution.action_id, task.id, task_type)
-                db_api.update_minion_pool_lifecycle(
-                    ctxt, execution.action_id, {
-                        "pool_shared_resources": {}})
-
-        elif task_type in (
-                constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
-                constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE):
-            LOG.info(
-                "Adding DB entry for Minion Machine '%s' of pool %s "
-                "following completion of task '%s' (type %s).",
-                task.instance, execution.action_id, task.id, task_type)
-            minion_machine = models.MinionMachine()
-            minion_machine.id = task.instance
-            minion_machine.pool_id = execution.action_id
-            minion_machine.status = (
-                constants.MINION_MACHINE_STATUS_AVAILABLE)
-            minion_machine.connection_info = task_info[
-                "minion_connection_info"]
-            minion_machine.provider_properties = task_info[
-                "minion_provider_properties"]
-            minion_machine.backup_writer_connection_info = task_info[
-                "minion_backup_writer_connection_info"]
-            db_api.add_minion_machine(ctxt, minion_machine)
-
-        elif task_type in (
-                constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE,
-                constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE):
-            LOG.info(
-                "%s task for Minon Machine '%s' has completed successfully. "
-                "Deleting minion machine from DB.",
-                task_type, task.instance)
-            db_api.delete_minion_machine(ctxt, task.instance)
+        # elif task_type in (
+        #         constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES,
+        #         constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES):
+        #     still_running = _check_other_tasks_running(execution, task)
+        #     if not still_running:
+        #         LOG.info(
+        #             "Updating 'pool_shared_resources' for pool %s after "
+        #             "completion of task '%s' (type '%s').",
+        #             execution.action_id, task.id, task_type)
+        #         db_api.update_minion_pool_lifecycle(
+        #             ctxt, execution.action_id, {
+        #                 "pool_shared_resources": task_info.get(
+        #                     "pool_shared_resources", {})})
+
+        # elif task_type in (
+        #         constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
+        #         constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES):
+        #     still_running = _check_other_tasks_running(execution, task)
+        #     if not still_running:
+        #         LOG.info(
+        #             "Clearing 'pool_shared_resources' for pool %s following "
+        #             "completion of task '%s' (type %s)",
+        #             execution.action_id, task.id, task_type)
+        #         db_api.update_minion_pool_lifecycle(
+        #             ctxt, execution.action_id, {
+        #                 "pool_shared_resources": {}})
+
+        # elif task_type in (
+        #         constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
+        #         constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE):
+        #     LOG.info(
+        #         "Adding DB entry for Minion Machine '%s' of pool %s "
+        #         "following completion of task '%s' (type %s).",
+        #         task.instance, execution.action_id, task.id, task_type)
+        #     minion_machine = models.MinionMachine()
+        #     minion_machine.id = task.instance
+        #     minion_machine.pool_id = execution.action_id
+        #     minion_machine.status = (
+        #         constants.MINION_MACHINE_STATUS_AVAILABLE)
+        #     minion_machine.connection_info = task_info[
+        #         "minion_connection_info"]
+        #     minion_machine.provider_properties = task_info[
+        #         "minion_provider_properties"]
+        #     minion_machine.backup_writer_connection_info = task_info[
+        #         "minion_backup_writer_connection_info"]
+        #     db_api.add_minion_machine(ctxt, minion_machine)
+
+        # elif task_type in (
+        #         constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE,
+        #         constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE):
+        #     LOG.info(
+        #         "%s task for Minon Machine '%s' has completed successfully. "
+        #         "Deleting minion machine from DB.",
+        #         task_type, task.instance)
+        #     db_api.delete_minion_machine(ctxt, task.instance)
 
         elif task_type in (
                 constants.TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION,
@@ -3921,7 +3506,7 @@ class ConductorServerEndpoint(object):
         LOG.debug("Execution for Replica update tasks created: %s",
                   execution.id)
 
-        self._begin_tasks(ctxt, execution, task_info=replica.info)
+        self._begin_tasks(ctxt, replica, execution)
 
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
@@ -4064,424 +3649,3 @@ class ConductorServerEndpoint(object):
     @service_synchronized
     def delete_service(self, ctxt, service_id):
         db_api.delete_service(ctxt, service_id)
-
-    def create_minion_pool(
-            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
-            environment_options, minimum_minions, maximum_minions,
-            minion_max_idle_time, minion_retention_strategy, notes=None):
-        endpoint = db_api.get_endpoint(ctxt, endpoint_id)
-
-        minion_pool = models.MinionPoolLifecycle()
-        minion_pool.id = str(uuid.uuid4())
-        minion_pool.pool_name = name
-        minion_pool.notes = notes
-        minion_pool.pool_platform = pool_platform
-        minion_pool.pool_os_type = pool_os_type
-        minion_pool.pool_status = constants.MINION_POOL_STATUS_UNINITIALIZED
-        minion_pool.minimum_minions = minimum_minions
-        minion_pool.maximum_minions = maximum_minions
-        minion_pool.minion_max_idle_time = minion_max_idle_time
-        minion_pool.minion_retention_strategy = minion_retention_strategy
-
-        # TODO(aznashwan): These field redundancies should be
-        # eliminated once the DB model hirearchy is overhauled:
-        minion_pool.origin_endpoint_id = endpoint_id
-        minion_pool.destination_endpoint_id = endpoint_id
-        minion_pool.source_environment = environment_options
-        minion_pool.destination_environment = environment_options
-        minion_pool.instances = []
-        minion_pool.info = {}
-
-        db_api.add_minion_pool_lifecycle(ctxt, minion_pool)
-        return self.get_minion_pool(ctxt, minion_pool.id)
-
-    def get_minion_pools(self, ctxt, include_tasks_executions=False):
-        return db_api.get_minion_pool_lifecycles(
-            ctxt, include_tasks_executions=include_tasks_executions,
-            include_machines=True)
-
-    def _get_minion_pool(
-            self, ctxt, minion_pool_id, include_tasks_executions=True,
-            include_machines=True):
-        minion_pool = db_api.get_minion_pool_lifecycle(
-            ctxt, minion_pool_id, include_machines=include_machines,
-            include_tasks_executions=include_tasks_executions)
-        if not minion_pool:
-            raise exception.NotFound(
-                "Minion pool with ID '%s' not found." % minion_pool_id)
-        return minion_pool
-
-    @minion_pool_synchronized
-    def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
-        LOG.info(
-            "Attempting to set up shared resources for Minion Pool '%s'.",
-            minion_pool_id)
-        minion_pool = db_api.get_minion_pool_lifecycle(
-            ctxt, minion_pool_id, include_tasks_executions=False,
-            include_machines=False)
-        if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
-            raise exception.InvalidMinionPoolState(
-                "Minion Pool '%s' cannot have shared resources set up as it "
-                "is in '%s' state instead of the expected %s."% (
-                    minion_pool_id, minion_pool.pool_status,
-                    constants.MINION_POOL_STATUS_UNINITIALIZED))
-
-        execution = models.TasksExecution()
-        execution.id = str(uuid.uuid4())
-        execution.action = minion_pool
-        execution.status = constants.EXECUTION_STATUS_UNEXECUTED
-        execution.type = (
-            constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES)
-
-        minion_pool.info[minion_pool_id] = {
-            "pool_os_type": minion_pool.pool_os_type,
-            "pool_identifier": minion_pool.id,
-            # TODO(aznashwan): remove redundancy once transfer
-            # action DB models have been overhauled:
-            "pool_environment_options": minion_pool.source_environment}
-
-        validate_task_type = (
-            constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS)
-        set_up_task_type = (
-            constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES)
-        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
-            validate_task_type = (
-                constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS)
-            set_up_task_type = (
-                constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES)
-
-        validate_pool_options_task = self._create_task(
-            minion_pool.id, validate_task_type, execution)
-
-        setup_pool_resources_task = self._create_task(
-            minion_pool.id,
-            set_up_task_type,
-            execution,
-            depends_on=[validate_pool_options_task.id])
-
-        self._check_execution_tasks_sanity(execution, minion_pool.info)
-
-        # update the action info for the pool's instance:
-        db_api.update_transfer_action_info_for_instance(
-            ctxt, minion_pool.id, minion_pool.id,
-            minion_pool.info[minion_pool.id])
-
-        # add new execution to DB:
-        db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
-        LOG.info(
-            "Minion pool shared resource creation execution created: %s",
-            execution.id)
-
-        self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
-        db_api.set_minion_pool_lifecycle_status(
-            ctxt, minion_pool.id, constants.MINION_POOL_STATUS_INITIALIZING)
-
-        return self._get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution.id).to_dict()
-
-    @minion_pool_synchronized
-    def tear_down_shared_minion_pool_resources(
-            self, ctxt, minion_pool_id, force=False):
-        minion_pool = db_api.get_minion_pool_lifecycle(
-            ctxt, minion_pool_id, include_tasks_executions=False,
-            include_machines=False)
-        if minion_pool.pool_status != (
-                constants.MINION_POOL_STATUS_DEALLOCATED) and not force:
-            raise exception.InvalidMinionPoolState(
-                "Minion Pool '%s' cannot have shared resources torn down as it"
-                " is in '%s' state instead of the expected %s. "
-                "Please use the force flag if you are certain you want "
-                "to tear down the shared resources for this pool." % (
-                    minion_pool_id, minion_pool.pool_status,
-                    constants.MINION_POOL_STATUS_DEALLOCATED))
-
-        LOG.info(
-            "Attempting to tear down shared resources for Minion Pool '%s'.",
-            minion_pool_id)
-
-        execution = models.TasksExecution()
-        execution.id = str(uuid.uuid4())
-        execution.action = minion_pool
-        execution.status = constants.EXECUTION_STATUS_UNEXECUTED
-        execution.type = (
-            constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES)
-
-        tear_down_task_type = (
-            constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES)
-        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
-            tear_down_task_type = (
-                constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES)
-
-        self._create_task(
-            minion_pool.id, tear_down_task_type, execution)
-
-        self._check_execution_tasks_sanity(execution, minion_pool.info)
-
-        # update the action info for the pool's instance:
-        db_api.update_transfer_action_info_for_instance(
-            ctxt, minion_pool.id, minion_pool.id,
-            minion_pool.info[minion_pool.id])
-
-        # add new execution to DB:
-        db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
-        LOG.info(
-            "Minion pool shared resource teardown execution created: %s",
-            execution.id)
-
-        self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
-        db_api.set_minion_pool_lifecycle_status(
-            ctxt, minion_pool.id, constants.MINION_POOL_STATUS_UNINITIALIZING)
-
-        return self._get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution.id).to_dict()
-
-    @minion_pool_synchronized
-    def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
-        LOG.info("Attempting to allocate Minion Pool '%s'.", minion_pool_id)
-        minion_pool = self._get_minion_pool(
-            ctxt, minion_pool_id, include_tasks_executions=False,
-            include_machines=True)
-        if minion_pool.pool_status != constants.MINION_POOL_STATUS_DEALLOCATED:
-            raise exception.InvalidMinionPoolState(
-                "Minion machines for pool '%s' cannot be allocated as the pool"
-                " is in '%s' state instead of the expected %s."% (
-                    minion_pool_id, minion_pool.pool_status,
-                    constants.MINION_POOL_STATUS_DEALLOCATED))
-
-        execution = models.TasksExecution()
-        execution.id = str(uuid.uuid4())
-        execution.action = minion_pool
-        execution.status = constants.EXECUTION_STATUS_UNEXECUTED
-        execution.type = constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS
-
-        new_minion_machine_ids = [
-            str(uuid.uuid4()) for _ in range(minion_pool.minimum_minions)]
-
-        create_minion_task_type = (
-            constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE)
-        delete_minion_task_type = (
-            constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
-        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
-            create_minion_task_type = (
-                constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
-            delete_minion_task_type = (
-                constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
-
-        for minion_machine_id in new_minion_machine_ids:
-            minion_pool.info[minion_machine_id] = {
-                "pool_identifier": minion_pool_id,
-                "pool_os_type": minion_pool.pool_os_type,
-                "pool_shared_resources": minion_pool.pool_shared_resources,
-                "pool_environment_options": minion_pool.source_environment,
-                # NOTE: we default this to an empty dict here to avoid possible
-                # task info conflicts on the cleanup task below for minions
-                # which were slower to deploy:
-                "minion_provider_properties": {}}
-
-            create_minion_task = self._create_task(
-                minion_machine_id, create_minion_task_type, execution)
-
-            self._create_task(
-                minion_machine_id,
-                delete_minion_task_type,
-                execution, on_error_only=True,
-                depends_on=[create_minion_task.id])
-
-        self._check_execution_tasks_sanity(execution, minion_pool.info)
-
-        # update the action info for all of the pool's minions:
-        for minion_machine_id in new_minion_machine_ids:
-            db_api.update_transfer_action_info_for_instance(
-                ctxt, minion_pool.id, minion_machine_id,
-                minion_pool.info[minion_machine_id])
-
-        # add new execution to DB:
-        db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
-        LOG.info("Minion pool allocation execution created: %s", execution.id)
-
-        self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
-        db_api.set_minion_pool_lifecycle_status(
-            ctxt, minion_pool.id, constants.MINION_POOL_STATUS_ALLOCATING)
-
-        return self._get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution.id).to_dict()
-
-    def _check_all_pool_minion_machines_available(self, minion_pool):
-        if not minion_pool.minion_machines:
-            LOG.debug(
-                "Minion pool '%s' does not have any allocated machines.",
-                minion_pool.id)
-            return
-
-        allocated_machine_statuses = {
-            machine.id: machine.status
-            for machine in minion_pool.minion_machines
-            if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE}
-
-        if allocated_machine_statuses:
-            raise exception.InvalidMinionPoolState(
-                "Minion pool with ID '%s' has one or more machines which are "
-                "in-use or otherwise unmodifiable: %s" % (
-                    minion_pool.id,
-                    allocated_machine_statuses))
-
-    @minion_pool_synchronized
-    def deallocate_minion_pool_machines(self, ctxt, minion_pool_id, force=False):
-        LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
-        minion_pool = db_api.get_minion_pool_lifecycle(
-            ctxt, minion_pool_id, include_tasks_executions=False,
-            include_machines=True)
-        if minion_pool.pool_status not in (
-                constants.MINION_POOL_STATUS_ALLOCATED) and not force:
-            raise exception.InvalidMinionPoolState(
-                "Minion Pool '%s' cannot be deallocated as it is in '%s' "
-                "state instead of the expected '%s'. Please use the "
-                "force flag if you are certain you want to deallocate "
-                "the minion pool's machines." % (
-                    minion_pool_id, minion_pool.pool_status,
-                    constants.MINION_POOL_STATUS_ALLOCATED))
-
-        if not force:
-            self._check_all_pool_minion_machines_available(minion_pool)
-
-        execution = models.TasksExecution()
-        execution.id = str(uuid.uuid4())
-        execution.action = minion_pool
-        execution.status = constants.EXECUTION_STATUS_UNEXECUTED
-        execution.type = (
-            constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS)
-
-        delete_minion_task_type = (
-            constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
-        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
-            delete_minion_task_type = (
-                constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
-
-        for minion_machine in minion_pool.minion_machines:
-            minion_machine_id = minion_machine.id
-            minion_pool.info[minion_machine_id] = {
-                "pool_environment_options": minion_pool.source_environment,
-                "minion_provider_properties": (
-                    minion_machine.provider_properties)}
-            self._create_task(
-                minion_machine_id, delete_minion_task_type,
-                # NOTE: we set 'on_error=True' to allow for the completion of
-                # already running deletion tasks to prevent partial deletes:
-                execution, on_error=True)
-
-        self._check_execution_tasks_sanity(execution, minion_pool.info)
-
-        # update the action info for all of the pool's minions:
-        for minion_machine in minion_pool.minion_machines:
-            db_api.update_transfer_action_info_for_instance(
-                ctxt, minion_pool.id, minion_machine.id,
-                minion_pool.info[minion_machine.id])
-
-        # add new execution to DB:
-        db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
-        LOG.info(
-            "Minion pool deallocation execution created: %s", execution.id)
-
-        self._begin_tasks(ctxt, execution, task_info=minion_pool.info)
-        db_api.set_minion_pool_lifecycle_status(
-            ctxt, minion_pool.id, constants.MINION_POOL_STATUS_DEALLOCATING)
-
-        return self._get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution.id).to_dict()
-
-    @minion_pool_synchronized
-    def get_minion_pool(self, ctxt, minion_pool_id):
-        return self._get_minion_pool(
-            ctxt, minion_pool_id, include_tasks_executions=True,
-            include_machines=True)
-
-    @minion_pool_synchronized
-    def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
-        minion_pool = self._get_minion_pool(
-            ctxt, minion_pool_id, include_tasks_executions=False,
-            include_machines=False)
-        if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
-            raise exception.InvalidMinionPoolState(
-                "Minion Pool '%s' cannot be updated as it is in '%s' status "
-                "instead of the expected '%s'. Please ensure the pool machines"
-                "have been deallocated and the pool's supporting resources "
-                "have been torn down before updating the pool." % (
-                    minion_pool_id, minion_pool.pool_status,
-                    constants.MINION_POOL_STATUS_UNINITIALIZED))
-        LOG.info(
-            "Attempting to update minion_pool '%s' with payload: %s",
-            minion_pool_id, updated_values)
-        db_api.update_minion_pool_lifecycle(ctxt, minion_pool_id, updated_values)
-        LOG.info("Minion Pool '%s' successfully updated", minion_pool_id)
-        return db_api.get_minion_pool_lifecycle(ctxt, minion_pool_id)
-
-    @minion_pool_synchronized
-    def delete_minion_pool(self, ctxt, minion_pool_id):
-        minion_pool = self._get_minion_pool(
-            ctxt, minion_pool_id, include_tasks_executions=False,
-            include_machines=True)
-        acceptable_deletion_statuses = [
-            constants.MINION_POOL_STATUS_UNINITIALIZED,
-            constants.MINION_POOL_STATUS_ERROR]
-        if minion_pool.pool_status not in acceptable_deletion_statuses:
-            raise exception.InvalidMinionPoolState(
-                "Minion Pool '%s' cannot be deleted as it is in '%s' status "
-                "instead of one of the expected '%s'. Please ensure the pool "
-                "machines have been deallocated and the pool's supporting "
-                "resources have been torn down before deleting the pool." % (
-                    minion_pool_id, minion_pool.pool_status,
-                    acceptable_deletion_statuses))
-
-        LOG.info("Deleting minion pool with ID '%s'" % minion_pool_id)
-        db_api.delete_minion_pool_lifecycle(ctxt, minion_pool_id)
-
-    @minion_pool_synchronized
-    def get_minion_pool_lifecycle_executions(
-            self, ctxt, minion_pool_id, include_tasks=False):
-        return db_api.get_minion_pool_lifecycle_executions(
-            ctxt, minion_pool_id, include_tasks)
-
-    def _get_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id):
-        execution = db_api.get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution_id)
-        if not execution:
-            raise exception.NotFound(
-                "Execution with ID '%s' for Minion Pool '%s' not found." % (
-                    execution_id, minion_pool_id))
-        return execution
-
-    @minion_pool_tasks_execution_synchronized
-    def get_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id):
-        return self._get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution_id).to_dict()
-
-    @minion_pool_tasks_execution_synchronized
-    def delete_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id):
-        execution = self._get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution_id)
-        if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
-            raise exception.InvalidMigrationState(
-                "Cannot delete execution '%s' for Minion pool '%s' as it is "
-                "currently in '%s' state." % (
-                    execution_id, minion_pool_id, execution.status))
-        db_api.delete_minion_pool_lifecycle_execution(ctxt, execution_id)
-
-    @minion_pool_tasks_execution_synchronized
-    def cancel_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id, force):
-        execution = self._get_minion_pool_lifecycle_execution(
-            ctxt, minion_pool_id, execution_id)
-        if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
-            raise exception.InvalidMinionPoolState(
-                "Minion pool '%s' has no running execution to cancel." % (
-                    minion_pool_id))
-        if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
-                not force):
-            raise exception.InvalidMinionPoolState(
-                "Execution for Minion Pool '%s' is already being cancelled. "
-                "Please use the force option if you'd like to force-cancel "
-                "it." % (minion_pool_id))
-        self._cancel_tasks_execution(ctxt, execution, force=force)

+ 1 - 0
coriolis/constants.py

@@ -306,6 +306,7 @@ CONDUCTOR_MAIN_MESSAGING_TOPIC = "coriolis_conductor"
 WORKER_MAIN_MESSAGING_TOPIC = "coriolis_worker"
 SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler"
 REPLICA_CRON_MAIN_MESSAGING_TOPIC = "coriolis_replica_cron_worker"
+MINION_MANAGER_MAIN_MESSAGING_TOPIC = "coriolis_minion_manager"
 
 MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE = "delete"
 MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF = "poweroff"

+ 56 - 68
coriolis/db/api.py

@@ -1195,43 +1195,35 @@ def delete_minion_machine(context, minion_machine_id):
 
 
 @enginefacade.writer
-def add_minion_pool_lifecycle(context, minion_pool_lifecycle):
-    minion_pool_lifecycle.user_id = context.user
-    minion_pool_lifecycle.project_id = context.tenant
-    _session(context).add(minion_pool_lifecycle)
+def add_minion_pool(context, minion_pool):
+    minion_pool.user_id = context.user
+    minion_pool.project_id = context.tenant
+    _session(context).add(minion_pool)
 
 
 @enginefacade.writer
-def delete_minion_pool_lifecycle(context, minion_pool_id):
+def delete_minion_pool(context, minion_pool_id):
     _delete_transfer_action(
-        context, models.MinionPoolLifecycle, minion_pool_id)
+        context, models.MinionPool, minion_pool_id)
 
 
 @enginefacade.reader
-def get_minion_pool_lifecycle(
-        context, minion_pool_id, include_tasks_executions=True,
-        include_machines=True):
-    q = _soft_delete_aware_query(context, models.MinionPoolLifecycle)
-    if include_tasks_executions:
-        q = q.options(orm.joinedload(models.MinionPoolLifecycle.executions))
+def get_minion_pool(
+        context, minion_pool_id, include_machines=True):
+    q = _soft_delete_aware_query(context, models.MinionPool)
     if include_machines:
         q = q.options(orm.joinedload('minion_machines'))
     if is_user_context(context):
         q = q.filter(
-            models.MinionPoolLifecycle.project_id == context.tenant)
+            models.MinionPool.project_id == context.tenant)
     return q.filter(
-        models.MinionPoolLifecycle.id == minion_pool_id).first()
+        models.MinionPool.id == minion_pool_id).first()
 
 
 @enginefacade.reader
-def get_minion_pool_lifecycles(
-        context, include_tasks_executions=False, include_info=False,
-        include_machines=False, to_dict=True):
-    q = _soft_delete_aware_query(context, models.MinionPoolLifecycle)
-    if include_tasks_executions:
-        q = q.options(orm.joinedload(models.MinionPoolLifecycle.executions))
-    if include_info is False:
-        q = q.options(orm.defer('info'))
+def get_minion_pools(
+        context, include_machines=False, to_dict=True):
+    q = _soft_delete_aware_query(context, models.MinionPool)
     q = q.filter()
     if is_user_context(context):
         q = q.filter(
@@ -1241,14 +1233,12 @@ def get_minion_pool_lifecycles(
     db_result = q.all()
     if to_dict:
         return [i.to_dict(
-            include_info=include_info,
-            include_executions=include_tasks_executions,
             include_machines=include_machines) for i in db_result]
     return db_result
 
 
 @enginefacade.writer
-def add_minion_pool_lifecycle_execution(context, execution):
+def add_minion_pool_execution(context, execution):
     if is_user_context(context):
         if execution.action.project_id != context.tenant:
             raise exception.NotAuthorized()
@@ -1263,10 +1253,9 @@ def add_minion_pool_lifecycle_execution(context, execution):
 
 
 @enginefacade.writer
-def set_minion_pool_lifecycle_status(context, minion_pool_id, status):
-    pool = get_minion_pool_lifecycle(
-        context, minion_pool_id, include_tasks_executions=False,
-        include_machines=False)
+def set_minion_pool_status(context, minion_pool_id, status):
+    pool = get_minion_pool(
+        context, minion_pool_id, include_machines=False)
     LOG.debug(
         "Transitioning minion pool '%s' from status '%s' to '%s'in DB",
         minion_pool_id, pool.pool_status, status)
@@ -1275,10 +1264,9 @@ def set_minion_pool_lifecycle_status(context, minion_pool_id, status):
 
 
 @enginefacade.writer
-def update_minion_pool_lifecycle(context, minion_pool_id, updated_values):
-    lifecycle = get_minion_pool_lifecycle(
-        context, minion_pool_id, include_tasks_executions=False,
-        include_machines=False)
+def update_minion_pool(context, minion_pool_id, updated_values):
+    lifecycle = get_minion_pool(
+        context, minion_pool_id, include_machines=False)
     if not lifecycle:
         raise exception.NotFound(
             "Minion pool '%s' not found" % minion_pool_id)
@@ -1317,38 +1305,38 @@ def update_minion_pool_lifecycle(context, minion_pool_id, updated_values):
     # `updated_at` fields
     setattr(lifecycle, 'updated_at', timeutils.utcnow())
 
-@enginefacade.reader
-def get_minion_pool_lifecycle_executions(
-        context, lifecycle_id, include_tasks=True):
-    q = _soft_delete_aware_query(context, models.TasksExecution)
-    q = q.join(models.MinionPoolLifecycle)
-    if include_tasks:
-        q = _get_tasks_with_details_options(q)
-    if is_user_context(context):
-        q = q.filter(models.MinionPoolLifecycle.project_id == context.tenant)
-    return q.filter(
-        models.MinionPoolLifecycle.id == lifecycle_id).all()
-
-@enginefacade.reader
-def get_minion_pool_lifecycle_execution(context, lifecycle_id, execution_id):
-    q = _soft_delete_aware_query(context, models.TasksExecution).join(
-        models.MinionPoolLifecycle)
-    q = _get_tasks_with_details_options(q)
-    if is_user_context(context):
-        q = q.filter(models.MinionPoolLifecycle.project_id == context.tenant)
-    return q.filter(
-        models.MinionPoolLifecycle.id == lifecycle_id,
-        models.TasksExecution.id == execution_id).first()
-
-@enginefacade.writer
-def delete_minion_pool_lifecycle_execution(context, execution_id):
-    q = _soft_delete_aware_query(context, models.TasksExecution).filter(
-        models.TasksExecution.id == execution_id)
-    if is_user_context(context):
-        if not q.join(models.MinionPoolLifecycle).filter(
-                models.MinionPoolLifecycle.project_id == (
-                    context.tenant)).first():
-            raise exception.NotAuthorized()
-    count = q.soft_delete()
-    if count == 0:
-        raise exception.NotFound("0 entries were soft deleted")
+# @enginefacade.reader
+# def get_minion_pool_executions(
+#         context, lifecycle_id, include_tasks=True):
+#     q = _soft_delete_aware_query(context, models.TasksExecution)
+#     q = q.join(models.MinionPool)
+#     if include_tasks:
+#         q = _get_tasks_with_details_options(q)
+#     if is_user_context(context):
+#         q = q.filter(models.MinionPool.project_id == context.tenant)
+#     return q.filter(
+#         models.MinionPool.id == lifecycle_id).all()
+
+# @enginefacade.reader
+# def get_minion_pool_execution(context, lifecycle_id, execution_id):
+#     q = _soft_delete_aware_query(context, models.TasksExecution).join(
+#         models.MinionPool)
+#     q = _get_tasks_with_details_options(q)
+#     if is_user_context(context):
+#         q = q.filter(models.MinionPool.project_id == context.tenant)
+#     return q.filter(
+#         models.MinionPool.id == lifecycle_id,
+#         models.TasksExecution.id == execution_id).first()
+
+# @enginefacade.writer
+# def delete_minion_pool_execution(context, execution_id):
+#     q = _soft_delete_aware_query(context, models.TasksExecution).filter(
+#         models.TasksExecution.id == execution_id)
+#     if is_user_context(context):
+#         if not q.join(models.MinionPool).filter(
+#                 models.MinionPool.project_id == (
+#                     context.tenant)).first():
+#             raise exception.NotAuthorized()
+#     count = q.soft_delete()
+#     if count == 0:
+#         raise exception.NotFound("0 entries were soft deleted")

+ 39 - 29
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -15,19 +15,6 @@ def upgrade(migrate_engine):
     base_transfer_action = sqlalchemy.Table(
         'base_transfer_action', meta, autoload=True)
 
-    # add the pool option properties for the transfer:
-    origin_minion_pool_id = sqlalchemy.Column(
-        "origin_minion_pool_id", sqlalchemy.String(36), nullable=True)
-    destination_minion_pool_id = sqlalchemy.Column(
-        "destination_minion_pool_id", sqlalchemy.String(36), nullable=True)
-    instance_osmorphing_minion_pool_mappings = sqlalchemy.Column(
-        "instance_osmorphing_minion_pool_mappings", sqlalchemy.Text,
-        nullable=False, default='{}')
-    for col in [
-            origin_minion_pool_id, destination_minion_pool_id,
-            instance_osmorphing_minion_pool_mappings]:
-        base_transfer_action.create_column(col)
-
     # extend tasks execution 'type' column:
     tasks_execution = sqlalchemy.Table(
         'tasks_execution', meta, autoload=True)
@@ -38,23 +25,27 @@ def upgrade(migrate_engine):
     # add table for pool lifecycles:
     tables.append(
         sqlalchemy.Table(
-            'minion_pool_lifecycle',
+            'minion_pool',
             meta,
             sqlalchemy.Column(
                 "id", sqlalchemy.String(36),
-                sqlalchemy.ForeignKey('base_transfer_action.base_id'),
-                primary_key=True),
+                default=lambda: str(uuid.uuid4()), primary_key=True),
+            sqlalchemy.Column(
+                "name", sqlalchemy.String(255), nullable=False),
             sqlalchemy.Column(
-                "pool_name", sqlalchemy.String(255), nullable=False),
+                "endpoint_id", sqlalchemy.String(36),
+                sqlalchemy.ForeignKey('endpoint.id'), nullable=False),
             sqlalchemy.Column(
-                "pool_os_type", sqlalchemy.String(255), nullable=False),
+                "environment_options", sqlalchemy.Text, nullable=False),
             sqlalchemy.Column(
-                "pool_platform", sqlalchemy.String(255), nullable=True),
+                "os_type", sqlalchemy.String(255), nullable=False),
             sqlalchemy.Column(
-                "pool_status", sqlalchemy.String(255), nullable=False,
+                "platform", sqlalchemy.String(255), nullable=True),
+            sqlalchemy.Column(
+                "status", sqlalchemy.String(255), nullable=False,
                 default=lambda: "UNKNOWN"),
             sqlalchemy.Column(
-                "pool_shared_resources", sqlalchemy.Text, nullable=True),
+                "shared_resources", sqlalchemy.Text, nullable=True),
             sqlalchemy.Column(
                 'minimum_minions', sqlalchemy.Integer, nullable=False),
             sqlalchemy.Column(
@@ -82,7 +73,7 @@ def upgrade(migrate_engine):
             sqlalchemy.Column('deleted', sqlalchemy.String(36)),
             sqlalchemy.Column(
                 'pool_id', sqlalchemy.String(36),
-                sqlalchemy.ForeignKey('minion_pool_lifecycle.id'),
+                sqlalchemy.ForeignKey('minion_pool.id'),
                 nullable=False),
             sqlalchemy.Column(
                 'allocated_action', sqlalchemy.String(36), nullable=True),
@@ -97,11 +88,30 @@ def upgrade(migrate_engine):
                 'provider_properties', sqlalchemy.Text,
                 nullable=True)))
 
-    for index, table in enumerate(tables):
-        try:
+    # add the pool option properties for the transfer:
+    origin_minion_pool_id = sqlalchemy.Column(
+        "origin_minion_pool_id", sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('minion_pool.id'), nullable=True)
+    destination_minion_pool_id = sqlalchemy.Column(
+        "destination_minion_pool_id", sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('minion_pool.id'), nullable=True)
+    instance_osmorphing_minion_pool_mappings = sqlalchemy.Column(
+        "instance_osmorphing_minion_pool_mappings", sqlalchemy.Text,
+        nullable=False, default='{}')
+
+    created_columns = []
+    try:
+        for index, table in enumerate(tables):
             table.create()
-        except Exception:
-            # If an error occurs, drop all tables created so far to return
-            # to the previously existing state.
-            meta.drop_all(tables=tables[:index])
-            raise
+        for col in [
+                origin_minion_pool_id, destination_minion_pool_id,
+                instance_osmorphing_minion_pool_mappings]:
+            base_transfer_action.create_column(col)
+            created_columns.append(col)
+    except Exception:
+        # If an error occurs, drop all tables created so far to return
+        # to the previously existing state.
+        for col in created_columns:
+            base_transfer_action.drop_column(col)
+        meta.drop_all(tables=tables[:index])
+        raise

+ 81 - 90
coriolis/db/sqlalchemy/models.py

@@ -201,9 +201,11 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
     storage_mappings = sqlalchemy.Column(types.Json, nullable=True)
     source_environment = sqlalchemy.Column(types.Json, nullable=True)
     origin_minion_pool_id = sqlalchemy.Column(
-        sqlalchemy.String(36), nullable=True)
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('minion_pool.id'), nullable=True)
     destination_minion_pool_id = sqlalchemy.Column(
-        sqlalchemy.String(36), nullable=True)
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('minion_pool.id'), nullable=True)
     instance_osmorphing_minion_pool_mappings = sqlalchemy.Column(
         types.Json, nullable=False, default=lambda: {})
     user_scripts = sqlalchemy.Column(types.Json, nullable=True)
@@ -407,54 +409,6 @@ class Region(
         secondary="service_region_mapping")
 
 
-class Endpoint(BASE, models.TimestampMixin, models.ModelBase,
-               models.SoftDeleteMixin):
-    __tablename__ = 'endpoint'
-
-    id = sqlalchemy.Column(sqlalchemy.String(36),
-                           default=lambda: str(uuid.uuid4()),
-                           primary_key=True)
-    user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
-    project_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
-    connection_info = sqlalchemy.Column(types.Json, nullable=False)
-    type = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
-    name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
-    description = sqlalchemy.Column(sqlalchemy.String(1024), nullable=True)
-    origin_actions = orm.relationship(
-        BaseTransferAction, backref=orm.backref('origin_endpoint'),
-        primaryjoin="and_(BaseTransferAction.origin_endpoint_id==Endpoint.id, "
-                    "BaseTransferAction.deleted=='0')")
-    destination_actions = orm.relationship(
-        BaseTransferAction, backref=orm.backref('destination_endpoint'),
-        primaryjoin="and_(BaseTransferAction.destination_endpoint_id=="
-                    "Endpoint.id, BaseTransferAction.deleted=='0')")
-    mapped_regions = orm.relationship(
-        'Region', back_populates='mapped_endpoints',
-        secondary="endpoint_region_mapping")
-
-
-class ReplicaSchedule(BASE, models.TimestampMixin, models.ModelBase,
-                      models.SoftDeleteMixin):
-    __tablename__ = "replica_schedules"
-
-    id = sqlalchemy.Column(sqlalchemy.String(36),
-                           default=lambda: str(uuid.uuid4()),
-                           primary_key=True)
-    replica_id = sqlalchemy.Column(
-        sqlalchemy.String(36),
-        sqlalchemy.ForeignKey('replica.id'), nullable=False)
-    replica = orm.relationship(
-        Replica, backref=orm.backref("schedules"), foreign_keys=[replica_id])
-    schedule = sqlalchemy.Column(types.Json, nullable=False)
-    expiration_date = sqlalchemy.Column(
-        sqlalchemy.types.DateTime, nullable=True)
-    enabled = sqlalchemy.Column(
-        sqlalchemy.Boolean, nullable=False, default=lambda: False)
-    shutdown_instance = sqlalchemy.Column(
-        sqlalchemy.Boolean, nullable=False, default=False)
-    trust_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
-
-
 class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
                     models.SoftDeleteMixin):
     __tablename__ = "minion_machine"
@@ -467,7 +421,7 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
 
     pool_id = sqlalchemy.Column(
         sqlalchemy.String(36),
-        sqlalchemy.ForeignKey('minion_pool_lifecycle.id'),
+        sqlalchemy.ForeignKey('minion_pool.id'),
         nullable=False)
 
     status = sqlalchemy.Column(
@@ -506,29 +460,30 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
         return result
 
 
-class MinionPoolLifecycle(BaseTransferAction):
-    # TODO(aznashwan): this class inherits numerous redundant fields from
-    # BaseTransferAction. Ideally, the upper hirearchy should be split into a
-    # BaseAction, and a separate inheriting BaseTransferAction.
-    __tablename__ = 'minion_pool_lifecycle'
+class MinionPool(
+            BASE, models.TimestampMixin, models.ModelBase,
+            models.SoftDeleteMixin):
+    __tablename__ = 'minion_pool'
 
     id = sqlalchemy.Column(
         sqlalchemy.String(36),
-        sqlalchemy.ForeignKey(
-            'base_transfer_action.base_id'),
         primary_key=True)
 
-    pool_name = sqlalchemy.Column(
+    name = sqlalchemy.Column(
         sqlalchemy.String(255),
         nullable=False)
-    pool_os_type = sqlalchemy.Column(
+    endpoint_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('endpoint.id'), nullable=False)
+    os_type = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False)
-    pool_platform = sqlalchemy.Column(
+    platform = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False)
-    pool_status = sqlalchemy.Column(
+    environment_options = sqlalchemy.Column(types.Json, nullable=True)
+    status = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False,
         default=lambda: constants.MINION_POOL_STATUS_UNKNOWN)
-    pool_shared_resources = sqlalchemy.Column(
+    shared_resources = sqlalchemy.Column(
         types.Json, nullable=True)
     minimum_minions = sqlalchemy.Column(
         sqlalchemy.Integer, nullable=False)
@@ -540,41 +495,77 @@ class MinionPoolLifecycle(BaseTransferAction):
         sqlalchemy.String(255), nullable=False)
     minion_machines = orm.relationship(
         MinionMachine, backref=orm.backref('minion_pool'),
-        primaryjoin="and_(MinionMachine.pool_id==MinionPoolLifecycle.id, "
+        primaryjoin="and_(MinionMachine.pool_id==MinionPool.id, "
                     "MinionMachine.deleted=='0')")
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'minion_pool_lifecycle'}
-
-    def to_dict(
-            self, include_info=True, include_machines=True,
-            include_executions=True):
-        base = super(MinionPoolLifecycle, self).to_dict(
-            include_info=include_info, include_executions=include_executions)
-        base.update({
+    def to_dict(self, include_machines=True):
+        base = {
             "id": self.id,
-            "pool_name": self.pool_name,
-            "pool_os_type": self.pool_os_type,
-            "pool_platform": self.pool_platform,
-            "pool_shared_resources": self.pool_shared_resources,
-            "pool_status": self.pool_status,
+            "name": self.name,
+            "endpoint_id": self.endpoint_id,
+            "environment_options": self.environment_options,
+            "os_type": self.os_type,
+            "platform": self.platform,
+            "shared_resources": self.shared_resources,
+            "status": self.status,
             "minimum_minions": self.minimum_minions,
             "maximum_minions": self.maximum_minions,
             "minion_max_idle_time": self.minion_max_idle_time,
-            "minion_retention_strategy": self.minion_retention_strategy})
+            "minion_retention_strategy": self.minion_retention_strategy}
         base["minion_machines"] = []
         if include_machines:
             base["minion_machines"] = [
                 machine.to_dict() for machine in self.minion_machines]
-        # TODO(aznashwan): these nits should be avoided by splitting the
-        # BaseTransferAction class into a more specialized hireachy:
-        redundancies = {
-            "environment_options": [
-                "source_environment", "destination_environment"],
-            "endpoint_id": [
-                "origin_endpoint_id", "destination_endpoint_id"]}
-        for new_key, old_keys in redundancies.items():
-            for old_key in old_keys:
-                if old_key in base:
-                    base[new_key] = base.pop(old_key)
         return base
+
+
+class Endpoint(BASE, models.TimestampMixin, models.ModelBase,
+               models.SoftDeleteMixin):
+    __tablename__ = 'endpoint'
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+    project_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+    connection_info = sqlalchemy.Column(types.Json, nullable=False)
+    type = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+    name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+    description = sqlalchemy.Column(sqlalchemy.String(1024), nullable=True)
+    origin_actions = orm.relationship(
+        BaseTransferAction, backref=orm.backref('origin_endpoint'),
+        primaryjoin="and_(BaseTransferAction.origin_endpoint_id==Endpoint.id, "
+                    "BaseTransferAction.deleted=='0')")
+    destination_actions = orm.relationship(
+        BaseTransferAction, backref=orm.backref('destination_endpoint'),
+        primaryjoin="and_(BaseTransferAction.destination_endpoint_id=="
+                    "Endpoint.id, BaseTransferAction.deleted=='0')")
+    minion_pools = orm.relationship(
+        MinionPool, backref=orm.backref('endpoint'),
+        primaryjoin="and_(MinionPool.endpoint_id=="
+                    "Endpoint.id, MinionPool.deleted=='0')")
+    mapped_regions = orm.relationship(
+        'Region', back_populates='mapped_endpoints',
+        secondary="endpoint_region_mapping")
+
+
+class ReplicaSchedule(BASE, models.TimestampMixin, models.ModelBase,
+                      models.SoftDeleteMixin):
+    __tablename__ = "replica_schedules"
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    replica_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('replica.id'), nullable=False)
+    replica = orm.relationship(
+        Replica, backref=orm.backref("schedules"), foreign_keys=[replica_id])
+    schedule = sqlalchemy.Column(types.Json, nullable=False)
+    expiration_date = sqlalchemy.Column(
+        sqlalchemy.types.DateTime, nullable=True)
+    enabled = sqlalchemy.Column(
+        sqlalchemy.Boolean, nullable=False, default=lambda: False)
+    shutdown_instance = sqlalchemy.Column(
+        sqlalchemy.Boolean, nullable=False, default=False)
+    trust_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)

+ 0 - 0
coriolis/minion_manager/__init__.py


+ 0 - 0
coriolis/minion_manager/rpc/__init__.py


+ 148 - 0
coriolis/minion_manager/rpc/client.py

@@ -0,0 +1,148 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_config import cfg
+import oslo_messaging as messaging
+
+from coriolis import rpc
+
+VERSION = "1.0"
+
+scheduler_opts = [
+    cfg.IntOpt("minion_mananger_rpc_timeout",
+               help="Number of seconds until RPC calls to the "
+                    "minion manager timeout.")
+]
+
+CONF = cfg.CONF
+CONF.register_opts(scheduler_opts, 'minion_manager')
+
+
+class MinionManagerClient(object):
+    def __init__(self, timeout=None):
+        target = messaging.Target(topic='coriolis_minion_manager', version=VERSION)
+        if timeout is None:
+            timeout = CONF.minion_manager.minion_mananger_rpc_timeout
+        self._client = rpc.get_client(target, timeout=timeout)
+
+    def get_diagnostics(self, ctxt):
+        return self._client.call(ctxt, 'get_diagnostics')
+
+    def validate_minion_pool_selections_for_action(self, ctxt, action_id):
+        return self._client.call(
+            ctxt, 'validate_minion_pool_selections_for_action',
+            action_id=action_id)
+
+    def allocate_minion_machines_for_action(
+            self, ctxt, action_id, include_transfer_minions=True,
+            include_osmorphing_minions=True):
+        return self._client.call(
+            ctxt, 'allocate_minion_machines_for_action', action_id=action_id,
+            include_transfer_minions=include_transfer_minions,
+            include_osmorphing_minions=include_osmorphing_minions)
+
+    def deallocate_minion_machines_for_action(self, ctxt, action_id):
+        return self._client.call(
+            ctxt, 'deallocate_minion_machines_for_action', action_id=action_id)
+
+    def create_minion_pool(
+            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=None):
+        return self._client.call(
+            ctxt, 'create_minion_pool', name=name, endpoint_id=endpoint_id,
+            pool_platform=pool_platform, pool_os_type=pool_os_type,
+            environment_options=environment_options,
+            minimum_minions=minimum_minions,
+            maximum_minions=maximum_minions,
+            minion_max_idle_time=minion_max_idle_time,
+            minion_retention_strategy=minion_retention_strategy,
+            notes=notes)
+
+    def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, "set_up_shared_minion_pool_resources",
+            minion_pool_id=minion_pool_id)
+
+    def tear_down_shared_minion_pool_resources(
+            self, ctxt, minion_pool_id, force=False):
+        return self._client.call(
+            ctxt, "tear_down_shared_minion_pool_resources",
+            minion_pool_id=minion_pool_id, force=force)
+
+    def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, "allocate_minion_pool_machines",
+            minion_pool_id=minion_pool_id)
+
+    def deallocate_minion_pool_machines(
+            self, ctxt, minion_pool_id, force=False):
+        return self._client.call(
+            ctxt, "deallocate_minion_pool_machines",
+            minion_pool_id=minion_pool_id,
+            force=force)
+
+    def get_minion_pools(self, ctxt):
+        return self._client.call(ctxt, 'get_minion_pools')
+
+    def get_minion_pool(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, 'get_minion_pool', minion_pool_id=minion_pool_id)
+
+    def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
+        return self._client.call(
+            ctxt, 'update_minion_pool',
+            minion_pool_id=minion_pool_id, updated_values=updated_values)
+
+    def delete_minion_pool(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, 'delete_minion_pool', minion_pool_id=minion_pool_id)
+
+    def get_minion_pool_lifecycle_executions(
+            self, ctxt, minion_pool_id, include_tasks=False):
+        return self._client.call(
+            ctxt, 'get_minion_pool_lifecycle_executions',
+            minion_pool_id=minion_pool_id, include_tasks=include_tasks)
+
+    def get_minion_pool_lifecycle_execution(
+            self, ctxt, minion_pool_id, execution_id):
+        return self._client.call(
+            ctxt, 'get_minion_pool_lifecycle_execution',
+            minion_pool_id=minion_pool_id, execution_id=execution_id)
+
+    def delete_minion_pool_lifecycle_execution(
+            self, ctxt, minion_pool_id, execution_id):
+        return self._client.call(
+            ctxt, 'delete_minion_pool_lifecycle_execution',
+            minion_pool_id=minion_pool_id, execution_id=execution_id)
+
+    def cancel_minion_pool_lifecycle_execution(
+            self, ctxt, minion_pool_id, execution_id, force):
+        return self._client.call(
+            ctxt, 'cancel_minion_pool_lifecycle_execution',
+            minion_pool_id=minion_pool_id, execution_id=execution_id,
+            force=force)
+
+    def get_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_source_minion_pool_options',
+            endpoint_id=endpoint_id, env=env, option_names=option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_destination_minion_pool_options',
+            endpoint_id=endpoint_id, env=env, option_names=option_names)
+
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_source_minion_pool_options',
+            endpoint_id=endpoint_id, pool_environment=pool_environment)
+
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_destination_minion_pool_options',
+            endpoint_id=endpoint_id, pool_environment=pool_environment)

+ 867 - 0
coriolis/minion_manager/rpc/server.py

@@ -0,0 +1,867 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import contextlib
+import functools
+import itertools
+import uuid
+
+from oslo_concurrency import lockutils
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from coriolis import constants
+from coriolis import exception
+from coriolis import utils
+from coriolis.conductor.rpc import client as rpc_conductor_client
+from coriolis.db import api as db_api
+from coriolis.db.sqlalchemy import models
+
+
+VERSION = "1.0"
+
+LOG = logging.getLogger(__name__)
+
+
+MINION_MANAGER_OPTS = []
+
+CONF = cfg.CONF
+CONF.register_opts(MINION_MANAGER_OPTS, 'minion_manager')
+
+
+def minion_pool_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, minion_pool_id, *args, **kwargs):
+        @lockutils.synchronized(
+            constants.MINION_POOL_LOCK_NAME_FORMAT % minion_pool_id,
+            external=True)
+        def inner():
+            return func(self, ctxt, minion_pool_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
+class MinionManagerServerEndpoint(object):
+    def __init__(self):
+        self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
+
+    def get_diagnostics(self, ctxt):
+        return utils.get_diagnostics_info()
+
+    @staticmethod
+    def _update_minion_pool_status_for_finished_execution(
+            ctxt, execution, new_execution_status):
+        # status map if execution is active:
+        stat_map = {
+            constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
+                constants.MINION_POOL_STATUS_ALLOCATING,
+            constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
+                constants.MINION_POOL_STATUS_DEALLOCATING,
+            constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
+                constants.MINION_POOL_STATUS_INITIALIZING,
+            constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
+                constants.MINION_POOL_STATUS_UNINITIALIZING}
+        if new_execution_status == constants.EXECUTION_STATUS_COMPLETED:
+            stat_map = {
+                constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
+                    constants.MINION_POOL_STATUS_ALLOCATED,
+                constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
+                    constants.MINION_POOL_STATUS_DEALLOCATED,
+                constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
+                    constants.MINION_POOL_STATUS_DEALLOCATED,
+                constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
+                    constants.MINION_POOL_STATUS_UNINITIALIZED}
+        elif new_execution_status in constants.FINALIZED_TASK_STATUSES:
+            stat_map = {
+                constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS:
+                    constants.MINION_POOL_STATUS_DEALLOCATED,
+                constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS:
+                    constants.MINION_POOL_STATUS_ALLOCATED,
+                constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES:
+                    constants.MINION_POOL_STATUS_UNINITIALIZED,
+                constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES:
+                    constants.MINION_POOL_STATUS_UNINITIALIZED}
+        final_pool_status = stat_map.get(execution.type)
+        if not final_pool_status:
+            LOG.error(
+                "Could not determine pool status following transition of "
+                "execution '%s' (type '%s') to status '%s'. Presuming error "
+                "has occured. Marking piil as error'd.",
+                execution.id, execution.type, new_execution_status)
+            final_pool_status = constants.MINION_POOL_STATUS_ERROR
+
+        LOG.info(
+            "Marking minion pool '%s' status as '%s' in the DB following the "
+            "transition of execution '%s' (type '%s') to status '%s'.",
+            execution.action_id, final_pool_status, execution.id,
+            execution.type, new_execution_status)
+        db_api.set_minion_pool_status(
+            ctxt, execution.action_id, final_pool_status)
+
+    def validate_minion_pool_selections_for_action(self, ctxt, action_id):
+        action = db_api.get_action(ctxt, action_id)
+        minion_pools = {
+            pool.id: pool
+            for pool in db_api.get_minion_pools(
+                ctxt, include_machines=False, to_dict=False)}
+        def _get_pool(pool_id):
+            pool = minion_pools.get(pool_id)
+            if not pool:
+                raise exception.NotFound(
+                    "Could not find minion pool with ID '%s'." % pool_id)
+            return pool
+
+        if action.origin_minion_pool_id:
+            origin_pool = _get_pool(action.origin_minion_pool_id)
+            if origin_pool.endpoint_id != action.origin_endpoint_id:
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected origin minion pool ('%s') belongs to a "
+                    "different Coriolis endpoint ('%s') than the requested "
+                    "origin endpoint ('%s')" % (
+                        action.origin_minion_pool_id,
+                        origin_pool.endpoint_id,
+                        action.origin_endpoint_id))
+            if origin_pool.pool_platform != constants.PROVIDER_PLATFORM_SOURCE:
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected origin minion pool ('%s') is configured as a"
+                    " '%s' pool. The pool must be of type %s to be used for "
+                    "data exports." % (
+                        action.origin_minion_pool_id,
+                        origin_pool.pool_platform,
+                        constants.PROVIDER_PLATFORM_SOURCE))
+            if origin_pool.pool_os_type != constants.OS_TYPE_LINUX:
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected origin minion pool ('%s') is of OS type '%s'"
+                    " instead of the Linux OS type required for a source "
+                    "transfer minion pool." % (
+                        action.origin_minion_pool_id,
+                        origin_pool.pool_os_type))
+            LOG.debug(
+                "Successfully validated compatibility of origin minion pool "
+                "'%s' for use with action '%s'." % (
+                    action.origin_minion_pool_id, action.id))
+
+        if action.destination_minion_pool_id:
+            destination_pool = _get_pool(action.destination_minion_pool_id)
+            if destination_pool.endpoint_id != (
+                    action.destination_endpoint_id):
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected destination minion pool ('%s') belongs to a "
+                    "different Coriolis endpoint ('%s') than the requested "
+                    "destination endpoint ('%s')" % (
+                        action.destination_minion_pool_id,
+                        destination_pool.endpoint_id,
+                        action.destination_endpoint_id))
+            if destination_pool.pool_platform != (
+                    constants.PROVIDER_PLATFORM_DESTINATION):
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected destination minion pool ('%s') is configured"
+                    " as a '%s'. The pool must be of type %s to be used for "
+                    "data imports." % (
+                        action.destination_minion_pool_id,
+                        destination_pool.pool_platform,
+                        constants.PROVIDER_PLATFORM_DESTINATION))
+            if destination_pool.pool_os_type != constants.OS_TYPE_LINUX:
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected destination minion pool ('%s') is of OS type"
+                    " '%s' instead of the Linux OS type required for a source "
+                    "transfer minion pool." % (
+                        action.destination_minion_pool_id,
+                        destination_pool.pool_os_type))
+            LOG.debug(
+                "Successfully validated compatibility of destination minion "
+                "pool '%s' for use with action '%s'." % (
+                    action.origin_minion_pool_id, action.id))
+
+        if action.instance_osmorphing_minion_pool_mappings:
+            osmorphing_pool_mappings = {
+                instance_id: pool_id
+                for (instance_id, pool_id) in (
+                    action.instance_osmorphing_minion_pool_mappings.items())
+                if pool_id}
+            for (instance, pool_id) in osmorphing_pool_mappings.items():
+                osmorphing_pool = _get_pool(pool_id)
+                if osmorphing_pool.endpoint_id != (
+                        action.destination_endpoint_id):
+                    raise exception.InvalidMinionPoolSelection(
+                        "The selected OSMorphing minion pool for instance '%s'"
+                        " ('%s') belongs to a different Coriolis endpoint "
+                        "('%s') than the destination endpoint ('%s')" % (
+                            instance, pool_id,
+                            osmorphing_pool.endpoint_id,
+                            action.destination_endpoint_id))
+                if osmorphing_pool.pool_platform != (
+                        constants.PROVIDER_PLATFORM_DESTINATION):
+                    raise exception.InvalidMinionPoolSelection(
+                        "The selected OSMorphing minion pool for instance '%s'"
+                        "  ('%s') is configured as a '%s' pool. The pool must "
+                        "be of type %s to be used for OSMorphing." % (
+                            instance, pool_id,
+                            osmorphing_pool.pool_platform,
+                            constants.PROVIDER_PLATFORM_DESTINATION))
+                LOG.debug(
+                    "Successfully validated compatibility of destination "
+                    "minion pool '%s' for use as OSMorphing minion for "
+                    "instance '%s' during action '%s'." % (
+                        action.origin_minion_pool_id, instance, action.id))
+
+    def allocate_minion_machines_for_action(
+            self, ctxt, action_id, include_transfer_minions=True,
+            include_osmorphing_minions=True):
+        """ Returns a dict of the form:
+        {
+            "instance_id": {
+                "source_minion": <source minion properties>,
+                "target_minion": <target minion properties>,
+                "osmorphing_minion": <osmorphing minion properties>
+            }
+        }
+        """
+        action = db_api.get_action(ctxt, action_id)
+        instance_machine_allocations = {
+            instance: {} for instance in action.instances}
+
+        minion_pool_ids = set()
+        if action.origin_minion_pool_id:
+            minion_pool_ids.add(action.origin_minion_pool_id)
+        if action.destination_minion_pool_id:
+            minion_pool_ids.add(action.destination_minion_pool_id)
+        if action.instance_osmorphing_minion_pool_mappings:
+            minion_pool_ids = minion_pool_ids.union(set(
+                action.instance_osmorphing_minion_pool_mappings.values()))
+        if None in minion_pool_ids:
+            minion_pool_ids.remove(None)
+
+        if not minion_pool_ids:
+            LOG.debug(
+                "No minion pool settings found for action '%s'. "
+                "Skipping minion machine allocations." % (
+                    action.id))
+            return instance_machine_allocations
+
+        LOG.debug(
+            "All minion pool selections for action '%s': %s",
+            action.id, minion_pool_ids)
+
+        def _select_machine(minion_pool, exclude=None):
+            if not minion_pool.minion_machines:
+                raise exception.InvalidMinionPoolSelection(
+                    "Minion pool with ID '%s' has no machines defined." % (
+                        minion_pool.id))
+            selected_machine = None
+            for machine in minion_pool.minion_machines:
+                if exclude and machine.id in exclude:
+                    LOG.debug(
+                        "Excluding minion machine '%s' from search.",
+                        machine.id)
+                    continue
+                if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
+                    LOG.debug(
+                        "Minion machine with ID '%s' is in status '%s' "
+                        "instead of '%s'. Skipping.", machine.id,
+                        machine.status,
+                        constants.MINION_MACHINE_STATUS_AVAILABLE)
+                    continue
+                selected_machine = machine
+                break
+            if not selected_machine:
+                raise exception.InvalidMinionPoolSelection(
+                    "There are no more available minion machines within minion"
+                    " pool with ID '%s' (excluding the following ones already "
+                    "planned for this transfer: %s). Please ensure that the "
+                    "minion pool has enough minion machines allocated and "
+                    "available (i.e. not being used for other operations) "
+                    "to satisfy the number of VMs required by the Migration or"
+                    " Replica." % (
+                        minion_pool.id, exclude))
+            return selected_machine
+
+        osmorphing_pool_map = (
+            action.instance_osmorphing_minion_pool_mappings)
+        with contextlib.ExitStack() as stack:
+            _ = [
+                stack.enter_context(
+                    lockutils.lock(
+                        constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
+                        external=True))
+                for pool_id in minion_pool_ids]
+
+            minion_pools = db_api.get_minion_pools(
+                ctxt, include_machines=True, to_dict=False)
+            minion_pool_id_mappings = {
+                pool.id: pool for pool in minion_pools
+                if pool.id in minion_pool_ids}
+
+            missing_pools = [
+                pool_id for pool_id in minion_pool_ids
+                if pool_id not in minion_pool_id_mappings]
+            if missing_pools:
+                raise exception.InvalidMinionPoolSelection(
+                    "The following minion pools could not be found: %s" % (
+                        missing_pools))
+
+            unallocated_pools = {
+                pool_id: pool.pool_status
+                for (pool_id, pool) in minion_pool_id_mappings.items()
+                if pool.pool_status != constants.MINION_POOL_STATUS_ALLOCATED}
+            if unallocated_pools:
+                raise exception.InvalidMinionPoolSelection(
+                    "The following minion pools have not had their machines "
+                    "allocated and thus cannot be used: %s" % (
+                        unallocated_pools))
+
+            allocated_source_machine_ids = set()
+            allocated_target_machine_ids = set()
+            allocated_osmorphing_machine_ids = set()
+            for instance in action.instances:
+
+                if include_transfer_minions:
+                    if action.origin_minion_pool_id:
+                        origin_pool = minion_pool_id_mappings[
+                            action.origin_minion_pool_id]
+                        machine = _select_machine(
+                            origin_pool, exclude=allocated_source_machine_ids)
+                        allocated_source_machine_ids.add(machine.id)
+                        instance_machine_allocations[
+                            instance]['source_minion'] = machine
+                        LOG.debug(
+                            "Selected minion machine '%s' for source-side "
+                            "syncing of instance '%s' as part of transfer "
+                            "action '%s'.", machine.id, instance, action.id)
+
+                    if action.destination_minion_pool_id:
+                        dest_pool = minion_pool_id_mappings[
+                            action.destination_minion_pool_id]
+                        machine = _select_machine(
+                            dest_pool, exclude=allocated_target_machine_ids)
+                        allocated_target_machine_ids.add(machine.id)
+                        instance_machine_allocations[
+                            instance]['target_minion'] = machine
+                        LOG.debug(
+                            "Selected minion machine '%s' for target-side "
+                            "syncing of instance '%s' as part of transfer "
+                            "action '%s'.", machine.id, instance, action.id)
+
+                if include_osmorphing_minions:
+                    if instance not in osmorphing_pool_map:
+                        LOG.debug(
+                            "Instance '%s' is not listed in the OSMorphing "
+                            "minion pool mappings for action '%s'." % (
+                                instance, action.id))
+                    elif osmorphing_pool_map[instance] is None:
+                        LOG.debug(
+                            "OSMorphing pool ID for instance '%s' is "
+                            "None in action '%s'. Ignoring." % (
+                                instance, action.id))
+                    else:
+                        osmorphing_pool_id = osmorphing_pool_map[instance]
+                        # if the selected target and OSMorphing pools
+                        # are the same, reuse the same worker:
+                        ima = instance_machine_allocations[instance]
+                        if osmorphing_pool_id == (
+                                action.destination_minion_pool_id) and (
+                                    'target_minion' in ima):
+                            allocated_target_machine = ima[
+                                'target_minion']
+                            LOG.debug(
+                                "Reusing disk sync minion '%s' for the "
+                                "OSMorphing of instance '%s' as port of "
+                                "transfer action '%s'",
+                                allocated_target_machine.id, instance,
+                                action.id)
+                            instance_machine_allocations[
+                                instance]['osmorphing_minion'] = (
+                                    allocated_target_machine)
+                        # else, allocate a new minion from the selected pool:
+                        else:
+                            osmorphing_pool = minion_pool_id_mappings[
+                                osmorphing_pool_id]
+                            machine = _select_machine(
+                                osmorphing_pool,
+                                exclude=allocated_osmorphing_machine_ids)
+                            allocated_osmorphing_machine_ids.add(machine.id)
+                            instance_machine_allocations[
+                                instance]['osmorphing_minion'] = machine
+                            LOG.debug(
+                                "Selected minion machine '%s' for OSMorphing "
+                                " of instance '%s' as part of transfer "
+                                "action '%s'.",
+                                machine.id, instance, action.id)
+
+            # mark the selected machines as allocated:
+            all_machine_ids = set(itertools.chain(
+                allocated_source_machine_ids,
+                allocated_target_machine_ids,
+                allocated_osmorphing_machine_ids))
+            db_api.set_minion_machines_allocation_statuses(
+                ctxt, all_machine_ids, action.id,
+                constants.MINION_MACHINE_STATUS_ALLOCATED)
+
+        # filter out redundancies:
+        instance_machine_allocations = {
+            instance: allocations
+            for (instance, allocations) in instance_machine_allocations.items()
+            if allocations}
+
+        LOG.debug(
+            "Allocated the following minion machines for action '%s': %s",
+            action.id, {
+                instance: {
+                    typ: machine.id
+                    for (typ, machine) in allocation.items()}
+                for (instance, allocation) in instance_machine_allocations.items()})
+        return instance_machine_allocations
+
+    def deallocate_minion_machines_for_action(self, ctxt, action_id):
+        action = db_api.get_action(ctxt, action_id)
+        minion_pool_ids = set()
+        if action.origin_minion_pool_id:
+            minion_pool_ids.add(action.origin_minion_pool_id)
+        if action.destination_minion_pool_id:
+            minion_pool_ids.add(action.destination_minion_pool_id)
+        if action.instance_osmorphing_minion_pool_mappings:
+            minion_pool_ids = minion_pool_ids.union(set(
+                action.instance_osmorphing_minion_pool_mappings.values()))
+        if None in minion_pool_ids:
+            minion_pool_ids.remove(None)
+
+        if not minion_pool_ids:
+            LOG.debug(
+                "No minion pools seem to have been used for action with "
+                "base_id '%s'. Skipping minion machine deallocation.",
+                action.base_id)
+        else:
+            LOG.debug(
+                "Attempting to deallocate all minion pool machine selections "
+                "for action '%s'. Afferent pools are: %s",
+                action.base_id, minion_pool_ids)
+
+            with contextlib.ExitStack() as stack:
+                _ = [
+                    stack.enter_context(
+                        lockutils.lock(
+                            constants.MINION_POOL_LOCK_NAME_FORMAT % pool_id,
+                            external=True))
+                    for pool_id in minion_pool_ids]
+
+                minion_machines = db_api.get_minion_machines(
+                    ctxt, allocated_action_id=action.base_id)
+                machine_ids = [m.id for m in minion_machines]
+                if machine_ids:
+                    LOG.info(
+                        "Releasing the following minion machines for "
+                        "action '%s': %s", action.base_id, machine_ids)
+                    db_api.set_minion_machines_allocation_statuses(
+                        ctxt, machine_ids, None,
+                        constants.MINION_MACHINE_STATUS_AVAILABLE)
+                else:
+                    LOG.debug(
+                        "No minion machines were found to be associated "
+                        "with action with base_id '%s'.", action.base_id)
+
+    def create_minion_pool(
+            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=None):
+        endpoint = db_api.get_endpoint(ctxt, endpoint_id)
+
+        minion_pool = models.MinionPool()
+        minion_pool.id = str(uuid.uuid4())
+        minion_pool.pool_name = name
+        minion_pool.notes = notes
+        minion_pool.pool_platform = pool_platform
+        minion_pool.pool_os_type = pool_os_type
+        minion_pool.endpoint_id = endpoint_id
+        minion_pool.environment_options = environment_options
+        minion_pool.pool_status = constants.MINION_POOL_STATUS_UNINITIALIZED
+        minion_pool.minimum_minions = minimum_minions
+        minion_pool.maximum_minions = maximum_minions
+        minion_pool.minion_max_idle_time = minion_max_idle_time
+        minion_pool.minion_retention_strategy = minion_retention_strategy
+
+        db_api.add_minion_pool(ctxt, minion_pool)
+        return self.get_minion_pool(ctxt, minion_pool.id)
+
+    def get_minion_pools(self, ctxt, include_machines=True):
+        return db_api.get_minion_pools(ctxt, include_machines=include_machines)
+
+    def _get_minion_pool(
+            self, ctxt, minion_pool_id, include_machines=True):
+        minion_pool = db_api.get_minion_pool(
+            ctxt, minion_pool_id, include_machines=include_machines)
+        if not minion_pool:
+            raise exception.NotFound(
+                "Minion pool with ID '%s' not found." % minion_pool_id)
+        return minion_pool
+
+    # @minion_pool_synchronized
+    # def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
+    #     LOG.info(
+    #         "Attempting to set up shared resources for Minion Pool '%s'.",
+    #         minion_pool_id)
+    #     minion_pool = db_api.get_minion_pool_lifecycle(
+    #         ctxt, minion_pool_id, include_tasks_executions=False,
+    #         include_machines=False)
+    #     if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
+    #         raise exception.InvalidMinionPoolState(
+    #             "Minion Pool '%s' cannot have shared resources set up as it "
+    #             "is in '%s' state instead of the expected %s."% (
+    #                 minion_pool_id, minion_pool.pool_status,
+    #                 constants.MINION_POOL_STATUS_UNINITIALIZED))
+
+    #     execution = models.TasksExecution()
+    #     execution.id = str(uuid.uuid4())
+    #     execution.action = minion_pool
+    #     execution.status = constants.EXECUTION_STATUS_UNEXECUTED
+    #     execution.type = (
+    #         constants.EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES)
+
+    #     minion_pool.info[minion_pool_id] = {
+    #         "pool_os_type": minion_pool.pool_os_type,
+    #         "pool_identifier": minion_pool.id,
+    #         # TODO(aznashwan): remove redundancy once transfer
+    #         # action DB models have been overhauled:
+    #         "pool_environment_options": minion_pool.source_environment}
+
+    #     validate_task_type = (
+    #         constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS)
+    #     set_up_task_type = (
+    #         constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES)
+    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #         validate_task_type = (
+    #             constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS)
+    #         set_up_task_type = (
+    #             constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES)
+
+    #     validate_pool_options_task = self._create_task(
+    #         minion_pool.id, validate_task_type, execution)
+
+    #     setup_pool_resources_task = self._create_task(
+    #         minion_pool.id,
+    #         set_up_task_type,
+    #         execution,
+    #         depends_on=[validate_pool_options_task.id])
+
+    #     self._check_execution_tasks_sanity(execution, minion_pool.info)
+
+    #     # update the action info for the pool's instance:
+    #     db_api.update_transfer_action_info_for_instance(
+    #         ctxt, minion_pool.id, minion_pool.id,
+    #         minion_pool.info[minion_pool.id])
+
+    #     # add new execution to DB:
+    #     db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
+    #     LOG.info(
+    #         "Minion pool shared resource creation execution created: %s",
+    #         execution.id)
+
+    #     self._begin_tasks(ctxt, minion_pool, execution)
+    #     db_api.set_minion_pool_lifecycle_status(
+    #         ctxt, minion_pool.id, constants.MINION_POOL_STATUS_INITIALIZING)
+
+    #     return self._get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution.id).to_dict()
+
+    # @minion_pool_synchronized
+    # def tear_down_shared_minion_pool_resources(
+    #         self, ctxt, minion_pool_id, force=False):
+    #     minion_pool = db_api.get_minion_pool_lifecycle(
+    #         ctxt, minion_pool_id, include_tasks_executions=False,
+    #         include_machines=False)
+    #     if minion_pool.pool_status != (
+    #             constants.MINION_POOL_STATUS_DEALLOCATED) and not force:
+    #         raise exception.InvalidMinionPoolState(
+    #             "Minion Pool '%s' cannot have shared resources torn down as it"
+    #             " is in '%s' state instead of the expected %s. "
+    #             "Please use the force flag if you are certain you want "
+    #             "to tear down the shared resources for this pool." % (
+    #                 minion_pool_id, minion_pool.pool_status,
+    #                 constants.MINION_POOL_STATUS_DEALLOCATED))
+
+    #     LOG.info(
+    #         "Attempting to tear down shared resources for Minion Pool '%s'.",
+    #         minion_pool_id)
+
+    #     execution = models.TasksExecution()
+    #     execution.id = str(uuid.uuid4())
+    #     execution.action = minion_pool
+    #     execution.status = constants.EXECUTION_STATUS_UNEXECUTED
+    #     execution.type = (
+    #         constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES)
+
+    #     tear_down_task_type = (
+    #         constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES)
+    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #         tear_down_task_type = (
+    #             constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES)
+
+    #     self._create_task(
+    #         minion_pool.id, tear_down_task_type, execution)
+
+    #     self._check_execution_tasks_sanity(execution, minion_pool.info)
+
+    #     # update the action info for the pool's instance:
+    #     db_api.update_transfer_action_info_for_instance(
+    #         ctxt, minion_pool.id, minion_pool.id,
+    #         minion_pool.info[minion_pool.id])
+
+    #     # add new execution to DB:
+    #     db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
+    #     LOG.info(
+    #         "Minion pool shared resource teardown execution created: %s",
+    #         execution.id)
+
+    #     self._begin_tasks(ctxt, minion_pool, execution)
+    #     db_api.set_minion_pool_lifecycle_status(
+    #         ctxt, minion_pool.id, constants.MINION_POOL_STATUS_UNINITIALIZING)
+
+    #     return self._get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution.id).to_dict()
+
+    # @minion_pool_synchronized
+    # def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
+    #     LOG.info("Attempting to allocate Minion Pool '%s'.", minion_pool_id)
+    #     minion_pool = self._get_minion_pool(
+    #         ctxt, minion_pool_id, include_tasks_executions=False,
+    #         include_machines=True)
+    #     if minion_pool.pool_status != constants.MINION_POOL_STATUS_DEALLOCATED:
+    #         raise exception.InvalidMinionPoolState(
+    #             "Minion machines for pool '%s' cannot be allocated as the pool"
+    #             " is in '%s' state instead of the expected %s."% (
+    #                 minion_pool_id, minion_pool.pool_status,
+    #                 constants.MINION_POOL_STATUS_DEALLOCATED))
+
+    #     execution = models.TasksExecution()
+    #     execution.id = str(uuid.uuid4())
+    #     execution.action = minion_pool
+    #     execution.status = constants.EXECUTION_STATUS_UNEXECUTED
+    #     execution.type = constants.EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS
+
+    #     new_minion_machine_ids = [
+    #         str(uuid.uuid4()) for _ in range(minion_pool.minimum_minions)]
+
+    #     create_minion_task_type = (
+    #         constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE)
+    #     delete_minion_task_type = (
+    #         constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #         create_minion_task_type = (
+    #             constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
+    #         delete_minion_task_type = (
+    #             constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+
+    #     for minion_machine_id in new_minion_machine_ids:
+    #         minion_pool.info[minion_machine_id] = {
+    #             "pool_identifier": minion_pool_id,
+    #             "pool_os_type": minion_pool.pool_os_type,
+    #             "pool_shared_resources": minion_pool.pool_shared_resources,
+    #             "pool_environment_options": minion_pool.source_environment,
+    #             # NOTE: we default this to an empty dict here to avoid possible
+    #             # task info conflicts on the cleanup task below for minions
+    #             # which were slower to deploy:
+    #             "minion_provider_properties": {}}
+
+    #         create_minion_task = self._create_task(
+    #             minion_machine_id, create_minion_task_type, execution)
+
+    #         self._create_task(
+    #             minion_machine_id,
+    #             delete_minion_task_type,
+    #             execution, on_error_only=True,
+    #             depends_on=[create_minion_task.id])
+
+    #     self._check_execution_tasks_sanity(execution, minion_pool.info)
+
+    #     # update the action info for all of the pool's minions:
+    #     for minion_machine_id in new_minion_machine_ids:
+    #         db_api.update_transfer_action_info_for_instance(
+    #             ctxt, minion_pool.id, minion_machine_id,
+    #             minion_pool.info[minion_machine_id])
+
+    #     # add new execution to DB:
+    #     db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
+    #     LOG.info("Minion pool allocation execution created: %s", execution.id)
+
+    #     self._begin_tasks(ctxt, minion_pool, execution)
+    #     db_api.set_minion_pool_lifecycle_status(
+    #         ctxt, minion_pool.id, constants.MINION_POOL_STATUS_ALLOCATING)
+
+    #     return self._get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution.id).to_dict()
+
+    # def _check_all_pool_minion_machines_available(self, minion_pool):
+    #     if not minion_pool.minion_machines:
+    #         LOG.debug(
+    #             "Minion pool '%s' does not have any allocated machines.",
+    #             minion_pool.id)
+    #         return
+
+    #     allocated_machine_statuses = {
+    #         machine.id: machine.status
+    #         for machine in minion_pool.minion_machines
+    #         if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE}
+
+    #     if allocated_machine_statuses:
+    #         raise exception.InvalidMinionPoolState(
+    #             "Minion pool with ID '%s' has one or more machines which are "
+    #             "in-use or otherwise unmodifiable: %s" % (
+    #                 minion_pool.id,
+    #                 allocated_machine_statuses))
+
+    # @minion_pool_synchronized
+    # def deallocate_minion_pool_machines(self, ctxt, minion_pool_id, force=False):
+    #     LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
+    #     minion_pool = db_api.get_minion_pool_lifecycle(
+    #         ctxt, minion_pool_id, include_tasks_executions=False,
+    #         include_machines=True)
+    #     if minion_pool.pool_status not in (
+    #             constants.MINION_POOL_STATUS_ALLOCATED) and not force:
+    #         raise exception.InvalidMinionPoolState(
+    #             "Minion Pool '%s' cannot be deallocated as it is in '%s' "
+    #             "state instead of the expected '%s'. Please use the "
+    #             "force flag if you are certain you want to deallocate "
+    #             "the minion pool's machines." % (
+    #                 minion_pool_id, minion_pool.pool_status,
+    #                 constants.MINION_POOL_STATUS_ALLOCATED))
+
+    #     if not force:
+    #         self._check_all_pool_minion_machines_available(minion_pool)
+
+    #     execution = models.TasksExecution()
+    #     execution.id = str(uuid.uuid4())
+    #     execution.action = minion_pool
+    #     execution.status = constants.EXECUTION_STATUS_UNEXECUTED
+    #     execution.type = (
+    #         constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS)
+
+    #     delete_minion_task_type = (
+    #         constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+    #     if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+    #         delete_minion_task_type = (
+    #             constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+
+    #     for minion_machine in minion_pool.minion_machines:
+    #         minion_machine_id = minion_machine.id
+    #         minion_pool.info[minion_machine_id] = {
+    #             "pool_environment_options": minion_pool.source_environment,
+    #             "minion_provider_properties": (
+    #                 minion_machine.provider_properties)}
+    #         self._create_task(
+    #             minion_machine_id, delete_minion_task_type,
+    #             # NOTE: we set 'on_error=True' to allow for the completion of
+    #             # already running deletion tasks to prevent partial deletes:
+    #             execution, on_error=True)
+
+    #     self._check_execution_tasks_sanity(execution, minion_pool.info)
+
+    #     # update the action info for all of the pool's minions:
+    #     for minion_machine in minion_pool.minion_machines:
+    #         db_api.update_transfer_action_info_for_instance(
+    #             ctxt, minion_pool.id, minion_machine.id,
+    #             minion_pool.info[minion_machine.id])
+
+    #     # add new execution to DB:
+    #     db_api.add_minion_pool_lifecycle_execution(ctxt, execution)
+    #     LOG.info(
+    #         "Minion pool deallocation execution created: %s", execution.id)
+
+    #     self._begin_tasks(ctxt, minion_pool, execution)
+    #     db_api.set_minion_pool_lifecycle_status(
+    #         ctxt, minion_pool.id, constants.MINION_POOL_STATUS_DEALLOCATING)
+
+    #     return self._get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution.id).to_dict()
+
+    @minion_pool_synchronized
+    def get_minion_pool(self, ctxt, minion_pool_id):
+        return self._get_minion_pool(
+            ctxt, minion_pool_id, include_machines=True)
+
+    @minion_pool_synchronized
+    def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
+        minion_pool = self._get_minion_pool(
+            ctxt, minion_pool_id, include_machines=False)
+        if minion_pool.pool_status != constants.MINION_POOL_STATUS_UNINITIALIZED:
+            raise exception.InvalidMinionPoolState(
+                "Minion Pool '%s' cannot be updated as it is in '%s' status "
+                "instead of the expected '%s'. Please ensure the pool machines"
+                "have been deallocated and the pool's supporting resources "
+                "have been torn down before updating the pool." % (
+                    minion_pool_id, minion_pool.pool_status,
+                    constants.MINION_POOL_STATUS_UNINITIALIZED))
+        LOG.info(
+            "Attempting to update minion_pool '%s' with payload: %s",
+            minion_pool_id, updated_values)
+        db_api.update_minion_pool(ctxt, minion_pool_id, updated_values)
+        LOG.info("Minion Pool '%s' successfully updated", minion_pool_id)
+        return db_api.get_minion_pool(ctxt, minion_pool_id)
+
+    @minion_pool_synchronized
+    def delete_minion_pool(self, ctxt, minion_pool_id):
+        minion_pool = self._get_minion_pool(
+            ctxt, minion_pool_id, include_machines=True)
+        acceptable_deletion_statuses = [
+            constants.MINION_POOL_STATUS_UNINITIALIZED,
+            constants.MINION_POOL_STATUS_ERROR]
+        if minion_pool.pool_status not in acceptable_deletion_statuses:
+            raise exception.InvalidMinionPoolState(
+                "Minion Pool '%s' cannot be deleted as it is in '%s' status "
+                "instead of one of the expected '%s'. Please ensure the pool "
+                "machines have been deallocated and the pool's supporting "
+                "resources have been torn down before deleting the pool." % (
+                    minion_pool_id, minion_pool.pool_status,
+                    acceptable_deletion_statuses))
+
+        LOG.info("Deleting minion pool with ID '%s'" % minion_pool_id)
+        db_api.delete_minion_pool(ctxt, minion_pool_id)
+
+    # @minion_pool_synchronized
+    # def get_minion_pool_lifecycle_executions(
+    #         self, ctxt, minion_pool_id, include_tasks=False):
+    #     return db_api.get_minion_pool_lifecycle_executions(
+    #         ctxt, minion_pool_id, include_tasks)
+
+    # def _get_minion_pool_lifecycle_execution(
+    #         self, ctxt, minion_pool_id, execution_id):
+    #     execution = db_api.get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution_id)
+    #     if not execution:
+    #         raise exception.NotFound(
+    #             "Execution with ID '%s' for Minion Pool '%s' not found." % (
+    #                 execution_id, minion_pool_id))
+    #     return execution
+
+    # @minion_pool_tasks_execution_synchronized
+    # def get_minion_pool_lifecycle_execution(
+    #         self, ctxt, minion_pool_id, execution_id):
+    #     return self._get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution_id).to_dict()
+
+    # @minion_pool_tasks_execution_synchronized
+    # def delete_minion_pool_lifecycle_execution(
+    #         self, ctxt, minion_pool_id, execution_id):
+    #     execution = self._get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution_id)
+    #     if execution.status in constants.ACTIVE_EXECUTION_STATUSES:
+    #         raise exception.InvalidMigrationState(
+    #             "Cannot delete execution '%s' for Minion pool '%s' as it is "
+    #             "currently in '%s' state." % (
+    #                 execution_id, minion_pool_id, execution.status))
+    #     db_api.delete_minion_pool_lifecycle_execution(ctxt, execution_id)
+
+    # @minion_pool_tasks_execution_synchronized
+    # def cancel_minion_pool_lifecycle_execution(
+    #         self, ctxt, minion_pool_id, execution_id, force):
+    #     execution = self._get_minion_pool_lifecycle_execution(
+    #         ctxt, minion_pool_id, execution_id)
+    #     if execution.status not in constants.ACTIVE_EXECUTION_STATUSES:
+    #         raise exception.InvalidMinionPoolState(
+    #             "Minion pool '%s' has no running execution to cancel." % (
+    #                 minion_pool_id))
+    #     if execution.status == constants.EXECUTION_STATUS_CANCELLING and (
+    #             not force):
+    #         raise exception.InvalidMinionPoolState(
+    #             "Execution for Minion Pool '%s' is already being cancelled. "
+    #             "Please use the force option if you'd like to force-cancel "
+    #             "it." % (minion_pool_id))
+    #     self._cancel_tasks_execution(ctxt, execution, force=force)

+ 2 - 2
coriolis/minion_pools/api.py

@@ -2,12 +2,12 @@
 # All Rights Reserved.
 
 from coriolis import utils
-from coriolis.conductor.rpc import client as rpc_client
+from coriolis.minion_manager.rpc import client as rpc_client
 
 
 class API(object):
     def __init__(self):
-        self._rpc_client = rpc_client.ConductorClient()
+        self._rpc_client = rpc_client.MinionManagerClient()
 
     def create(
             self, ctxt, name, endpoint_id, pool_platform, pool_os_type,

+ 1 - 0
setup.cfg

@@ -30,6 +30,7 @@ console_scripts =
     coriolis-worker = coriolis.cmd.worker:main
     coriolis-replica-cron = coriolis.cmd.replica_cron:main
     coriolis-scheduler= coriolis.cmd.scheduler:main
+    coriolis-minion-manager= coriolis.cmd.minion_manager:main
     coriolis-dbsync = coriolis.cmd.db_sync:main
 
 [wheel]