Просмотр исходного кода

Merge pull request #145 from aznashwan/minion-pooling

Add Minion Machine Pooling features.
Nashwan Azhari 5 лет назад
Родитель
Сommit
cfeccada09
42 измененных файлов с 3684 добавлено и 128 удалено
  1. 46 0
      coriolis/api/v1/endpoint_destination_minion_pool_options.py
  2. 46 0
      coriolis/api/v1/endpoint_source_minion_pool_options.py
  3. 19 4
      coriolis/api/v1/migrations.py
  4. 83 0
      coriolis/api/v1/minion_pool_actions.py
  5. 37 0
      coriolis/api/v1/minion_pool_tasks_execution_actions.py
  6. 65 0
      coriolis/api/v1/minion_pool_tasks_executions.py
  7. 223 0
      coriolis/api/v1/minion_pools.py
  8. 5 2
      coriolis/api/v1/replica_tasks_executions.py
  9. 26 4
      coriolis/api/v1/replicas.py
  10. 56 0
      coriolis/api/v1/router.py
  11. 20 0
      coriolis/api/v1/views/endpoint_destination_minion_pool_options_view.py
  12. 20 0
      coriolis/api/v1/views/endpoint_source_minion_pool_options_view.py
  13. 45 0
      coriolis/api/v1/views/minion_pool_tasks_execution_view.py
  14. 54 0
      coriolis/api/v1/views/minion_pool_view.py
  15. 126 6
      coriolis/conductor/rpc/client.py
  16. 732 70
      coriolis/conductor/rpc/server.py
  17. 104 3
      coriolis/constants.py
  18. 259 18
      coriolis/db/api.py
  19. 107 0
      coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py
  20. 136 1
      coriolis/db/sqlalchemy/models.py
  21. 0 0
      coriolis/endpoint_minion_pool_options/__init__.py
  22. 19 0
      coriolis/endpoint_minion_pool_options/api.py
  23. 13 0
      coriolis/endpoints/api.py
  24. 13 0
      coriolis/exception.py
  25. 19 10
      coriolis/migrations/api.py
  26. 0 0
      coriolis/minion_pool_tasks_executions/__init__.py
  27. 26 0
      coriolis/minion_pool_tasks_executions/api.py
  28. 0 0
      coriolis/minion_pools/__init__.py
  29. 49 0
      coriolis/minion_pools/api.py
  30. 24 1
      coriolis/policies/endpoints.py
  31. 83 0
      coriolis/policies/minion_pool_tasks_executions.py
  32. 123 0
      coriolis/policies/minion_pools.py
  33. 4 1
      coriolis/policy.py
  34. 109 0
      coriolis/providers/base.py
  35. 5 1
      coriolis/providers/factory.py
  36. 4 0
      coriolis/replicas/api.py
  37. 5 1
      coriolis/schemas.py
  38. 15 5
      coriolis/tasks/base.py
  39. 48 1
      coriolis/tasks/factory.py
  40. 800 0
      coriolis/tasks/minion_pool_tasks.py
  41. 26 0
      coriolis/worker/rpc/client.py
  42. 90 0
      coriolis/worker/rpc/server.py

+ 46 - 0
coriolis/api/v1/endpoint_destination_minion_pool_options.py

@@ -0,0 +1,46 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+
+from coriolis import utils
+from coriolis.api.v1.views import (
+    endpoint_destination_minion_pool_options_view)
+from coriolis.api import wsgi as api_wsgi
+from coriolis.endpoint_minion_pool_options import api
+from coriolis.policies import endpoints as endpoint_policies
+
+
+LOG = logging.getLogger(__name__)
+
+
+class EndpointDestinationMinionPoolOptionsController(api_wsgi.Controller):
+    def __init__(self):
+        self._minion_pool_options_api = api.API()
+        super(EndpointDestinationMinionPoolOptionsController, self).__init__()
+
+    def index(self, req, endpoint_id):
+        context = req.environ['coriolis.context']
+        context.can("%s:list_destination_minion_pool_options" % (
+            endpoint_policies.ENDPOINTS_POLICY_PREFIX))
+
+        env = req.GET.get("env")
+        if env is not None:
+            env = utils.decode_base64_param(env, is_json=True)
+        else:
+            env = {}
+
+        options = req.GET.get("options")
+        if options is not None:
+            options = utils.decode_base64_param(options, is_json=True)
+        else:
+            options = {}
+
+        return endpoint_destination_minion_pool_options_view.collection(
+            req,
+            self._minion_pool_options_api.get_endpoint_destination_minion_pool_options(
+                context, endpoint_id, env=env, option_names=options))
+
+
+def create_resource():
+    return api_wsgi.Resource(EndpointDestinationMinionPoolOptionsController())

+ 46 - 0
coriolis/api/v1/endpoint_source_minion_pool_options.py

@@ -0,0 +1,46 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+
+from coriolis import utils
+from coriolis.api.v1.views import (
+        endpoint_source_minion_pool_options_view)
+from coriolis.api import wsgi as api_wsgi
+from coriolis.endpoint_minion_pool_options import api
+from coriolis.policies import endpoints as endpoint_policies
+
+
+LOG = logging.getLogger(__name__)
+
+
+class EndpointSourceMinionPoolOptionsController(api_wsgi.Controller):
+    def __init__(self):
+        self._minion_pool_options_api = api.API()
+        super(EndpointSourceMinionPoolOptionsController, self).__init__()
+
+    def index(self, req, endpoint_id):
+        context = req.environ['coriolis.context']
+        context.can("%s:list_source_minion_pool_options" % (
+            endpoint_policies.ENDPOINTS_POLICY_PREFIX))
+
+        env = req.GET.get("env")
+        if env is not None:
+            env = utils.decode_base64_param(env, is_json=True)
+        else:
+            env = {}
+
+        options = req.GET.get("options")
+        if options is not None:
+            options = utils.decode_base64_param(options, is_json=True)
+        else:
+            options = {}
+
+        return endpoint_source_minion_pool_options_view.collection(
+            req,
+            self._minion_pool_options_api.get_endpoint_source_minion_pool_options(
+                context, endpoint_id, env=env, option_names=options))
+
+
+def create_resource():
+    return api_wsgi.Resource(EndpointSourceMinionPoolOptionsController())

+ 19 - 4
coriolis/api/v1/migrations.py

@@ -1,5 +1,6 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 # All Rights Reserved.
+
 import json
 import json
 
 
 from oslo_log import log as logging
 from oslo_log import log as logging
@@ -56,6 +57,11 @@ class MigrationController(api_wsgi.Controller):
         try:
         try:
             origin_endpoint_id = migration["origin_endpoint_id"]
             origin_endpoint_id = migration["origin_endpoint_id"]
             destination_endpoint_id = migration["destination_endpoint_id"]
             destination_endpoint_id = migration["destination_endpoint_id"]
+            origin_minion_pool_id = migration.get('origin_minion_pool_id')
+            destination_minion_pool_id = migration.get(
+                'destination_minion_pool_id')
+            instance_osmorphing_minion_pool_mappings = migration.get(
+                'instance_osmorphing_minion_pool_mappings', {})
             destination_environment = migration.get(
             destination_environment = migration.get(
                 "destination_environment", {})
                 "destination_environment", {})
             instances = migration["instances"]
             instances = migration["instances"]
@@ -92,8 +98,10 @@ class MigrationController(api_wsgi.Controller):
             destination_environment['storage_mappings'] = storage_mappings
             destination_environment['storage_mappings'] = storage_mappings
 
 
             return (origin_endpoint_id, destination_endpoint_id,
             return (origin_endpoint_id, destination_endpoint_id,
-                    source_environment, destination_environment, instances,
-                    notes, skip_os_morphing, replication_count,
+                    origin_minion_pool_id, destination_minion_pool_id,
+                    instance_osmorphing_minion_pool_mappings, source_environment,
+                    destination_environment, instances, notes,
+                    skip_os_morphing, replication_count,
                     shutdown_instances, network_map, storage_mappings)
                     shutdown_instances, network_map, storage_mappings)
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)
@@ -111,15 +119,20 @@ class MigrationController(api_wsgi.Controller):
             clone_disks = migration_body.get("clone_disks", True)
             clone_disks = migration_body.get("clone_disks", True)
             force = migration_body.get("force", False)
             force = migration_body.get("force", False)
             skip_os_morphing = migration_body.get("skip_os_morphing", False)
             skip_os_morphing = migration_body.get("skip_os_morphing", False)
+            instance_osmorphing_minion_pool_mappings = migration_body.get(
+                'instance_osmorphing_minion_pool_mappings', {})
 
 
             # NOTE: destination environment for replica should have been
             # NOTE: destination environment for replica should have been
             # validated upon its creation.
             # validated upon its creation.
             migration = self._migration_api.deploy_replica_instances(
             migration = self._migration_api.deploy_replica_instances(
-                context, replica_id, clone_disks, force, skip_os_morphing,
-                user_scripts=user_scripts)
+                context, replica_id, instance_osmorphing_minion_pool_mappings, clone_disks,
+                force, skip_os_morphing, user_scripts=user_scripts)
         else:
         else:
             (origin_endpoint_id,
             (origin_endpoint_id,
              destination_endpoint_id,
              destination_endpoint_id,
+             origin_minion_pool_id,
+             destination_minion_pool_id,
+             instance_osmorphing_minion_pool_mappings,
              source_environment,
              source_environment,
              destination_environment,
              destination_environment,
              instances,
              instances,
@@ -132,6 +145,8 @@ class MigrationController(api_wsgi.Controller):
                  context, migration_body)
                  context, migration_body)
             migration = self._migration_api.migrate_instances(
             migration = self._migration_api.migrate_instances(
                 context, origin_endpoint_id, destination_endpoint_id,
                 context, origin_endpoint_id, destination_endpoint_id,
+                origin_minion_pool_id, destination_minion_pool_id,
+                instance_osmorphing_minion_pool_mappings,
                 source_environment, destination_environment, instances,
                 source_environment, destination_environment, instances,
                 network_map, storage_mappings, replication_count,
                 network_map, storage_mappings, replication_count,
                 shutdown_instances, notes=notes,
                 shutdown_instances, notes=notes,

+ 83 - 0
coriolis/api/v1/minion_pool_actions.py

@@ -0,0 +1,83 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from webob import exc
+
+from coriolis import exception
+from coriolis.api.v1.views import minion_pool_tasks_execution_view
+from coriolis.api import wsgi as api_wsgi
+from coriolis.policies import minion_pools as minion_pool_policies
+from coriolis.minion_pools import api
+
+
+class MinionPoolActionsController(api_wsgi.Controller):
+    def __init__(self):
+        self.minion_pool_api = api.API()
+        super(MinionPoolActionsController, self).__init__()
+
+    @api_wsgi.action('set-up-shared-resources')
+    def _set_up_shared_resources(self, req, id, body):
+        context = req.environ['coriolis.context']
+        context.can(
+            minion_pool_policies.get_minion_pools_policy_label(
+                "set_up_shared_resources"))
+        try:
+            return minion_pool_tasks_execution_view.single(
+                req, self.minion_pool_api.set_up_shared_pool_resources(
+                    context, id))
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+    @api_wsgi.action('tear-down-shared-resources')
+    def _tear_down_shared_resources(self, req, id, body):
+        context = req.environ['coriolis.context']
+        context.can(
+            minion_pool_policies.get_minion_pools_policy_label(
+                "tear_down_shared_resources"))
+        force = (body["tear-down-shared-resources"] or {}).get(
+            "force", False)
+        try:
+            return minion_pool_tasks_execution_view.single(
+                req, self.minion_pool_api.tear_down_shared_pool_resources(
+                    context, id, force=force))
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+    @api_wsgi.action('allocate-machines')
+    def _allocate_pool_machines(self, req, id, body):
+        context = req.environ['coriolis.context']
+        context.can(
+            minion_pool_policies.get_minion_pools_policy_label(
+                "allocate_machines"))
+        try:
+            return minion_pool_tasks_execution_view.single(
+                req, self.minion_pool_api.allocate_machines(
+                    context, id))
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+    @api_wsgi.action('deallocate-machines')
+    def _deallocate_pool_machines(self, req, id, body):
+        context = req.environ['coriolis.context']
+        context.can(
+            minion_pool_policies.get_minion_pools_policy_label(
+                "deallocate_machines"))
+        force = (body["deallocate-machines"] or {}).get("force", False)
+        try:
+            return minion_pool_tasks_execution_view.single(
+                req, self.minion_pool_api.deallocate_machines(
+                    context, id, force=force))
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(MinionPoolActionsController())

+ 37 - 0
coriolis/api/v1/minion_pool_tasks_execution_actions.py

@@ -0,0 +1,37 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from webob import exc
+
+from coriolis import exception
+from coriolis.api import wsgi as api_wsgi
+from coriolis.policies \
+    import minion_pool_tasks_executions as pool_execution_policies
+from coriolis.minion_pool_tasks_executions import api
+
+
+class MinionPoolTasksExecutionActionsController(api_wsgi.Controller):
+    def __init__(self):
+        self._minion_pool_tasks_executions_api = api.API()
+        super(MinionPoolTasksExecutionActionsController, self).__init__()
+
+    @api_wsgi.action('cancel')
+    def _cancel(self, req, minion_pool_id, id, body):
+        context = req.environ['coriolis.context']
+        context.can(
+            pool_execution_policies.get_minion_pool_executions_policy_label(
+                'cancel'))
+        try:
+            force = (body["cancel"] or {}).get("force", False)
+
+            self._minion_pool_tasks_executions_api.cancel(
+                context, minion_pool_id, id, force)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(MinionPoolTasksExecutionActionsController())

+ 65 - 0
coriolis/api/v1/minion_pool_tasks_executions.py

@@ -0,0 +1,65 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from webob import exc
+
+from coriolis.api import wsgi as api_wsgi
+from coriolis.api.v1.views import minion_pool_tasks_execution_view
+from coriolis import exception
+from coriolis.minion_pool_tasks_executions import api
+from coriolis.policies \
+    import minion_pool_tasks_executions as pool_execution_policies
+
+
+class MinionPoolTasksExecutionController(api_wsgi.Controller):
+    def __init__(self):
+        self._pool_tasks_execution_api = api.API()
+        super(MinionPoolTasksExecutionController, self).__init__()
+
+    def show(self, req, minion_pool_id, id):
+        context = req.environ["coriolis.context"]
+        context.can(
+            pool_execution_policies.get_minion_pool_executions_policy_label(
+                "show"))
+        execution = self._pool_tasks_execution_api.get(
+            context, minion_pool_id, id)
+        if not execution:
+            raise exc.HTTPNotFound()
+
+        return minion_pool_tasks_execution_view.single(req, execution)
+
+    def index(self, req, minion_pool_id):
+        context = req.environ["coriolis.context"]
+        context.can(
+            pool_execution_policies.get_minion_pool_executions_policy_label(
+                "list"))
+
+        return minion_pool_tasks_execution_view.collection(
+            req, self._pool_tasks_execution_api.list(
+                context, minion_pool_id, include_tasks=False))
+
+    def detail(self, req, minion_pool_id):
+        context = req.environ["coriolis.context"]
+        context.can(
+            pool_execution_policies.get_minion_pool_executions_policy_label(
+                "show"))
+        return minion_pool_tasks_execution_view.collection(
+            req, self._pool_tasks_execution_api.list(
+                req.environ['coriolis.context'], minion_pool_id,
+                include_tasks=True))
+
+    def delete(self, req, minion_pool_id, id):
+        context = req.environ["coriolis.context"]
+        context.can(
+            pool_execution_policies.get_minion_pool_executions_policy_label(
+                "delete"))
+
+        try:
+            self._pool_tasks_execution_api.delete(context, minion_pool_id, id)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(MinionPoolTasksExecutionController())

+ 223 - 0
coriolis/api/v1/minion_pools.py

@@ -0,0 +1,223 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+from webob import exc
+
+from coriolis import constants
+from coriolis import exception
+from coriolis.api.v1.views import minion_pool_view
+from coriolis.api.v1.views import minion_pool_tasks_execution_view
+from coriolis.api import wsgi as api_wsgi
+from coriolis.endpoints import api as endpoints_api
+from coriolis.policies import minion_pools as pools_policies
+from coriolis.minion_pools import api
+
+LOG = logging.getLogger(__name__)
+
+
+class MinionPoolController(api_wsgi.Controller):
+    def __init__(self):
+        self._minion_pool_api = api.API()
+        self._endpoints_api = endpoints_api.API()
+        super(MinionPoolController, self).__init__()
+
+    def show(self, req, id):
+        context = req.environ["coriolis.context"]
+        context.can(pools_policies.get_minion_pools_policy_label("show"))
+        minion_pool = self._minion_pool_api.get_minion_pool(context, id)
+        if not minion_pool:
+            raise exc.HTTPNotFound()
+
+        return minion_pool_view.single(req, minion_pool)
+
+    def index(self, req):
+        context = req.environ["coriolis.context"]
+        context.can(pools_policies.get_minion_pools_policy_label("list"))
+        return minion_pool_view.collection(
+            req, self._minion_pool_api.get_minion_pools(context))
+
+    def _check_pool_retention_strategy(self, pool_retention_strategy):
+        if not pool_retention_strategy:
+            LOG.debug(
+                "Ignoring void minion pool retention strategy '%s'",
+                pool_retention_strategy)
+        valid_strats = [
+            constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE,
+            constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF]
+        if pool_retention_strategy not in valid_strats:
+            raise Exception(
+                "Invalid minion pool retention strategy '%s'. Must be one of "
+                "the following: %s" % (pool_retention_strategy, valid_strats))
+
+    def _check_pool_numeric_values(
+            self, minimum_minions, maximum_minions, minion_max_idle_time):
+        if minimum_minions is not None:
+            if minimum_minions <= 0:
+                raise Exception(
+                    "'minimum_minions' must be a strictly positive integer. "
+                    "Got: %s" % minimum_minions)
+        if maximum_minions is not None:
+            if maximum_minions <= 0:
+                raise Exception(
+                    "'maximum_minions' must be a strictly positive integer. "
+                    "Got: %s" % maximum_minions)
+            if maximum_minions < minimum_minions:
+                raise Exception(
+                    "'maximum_minions' value (%s) must be at least as large as"
+                    " the 'minimum_minions' value (%s)." % (
+                        maximum_minions, minimum_minions))
+        if minion_max_idle_time is not None:
+            if minion_max_idle_time <= 0:
+                raise Exception(
+                    "'minion_max_idle_time' must be a strictly positive "
+                    "integer. Got: %s" % maximum_minions)
+
+    def _validate_create_body(self, ctxt, body):
+        try:
+            minion_pool = body["minion_pool"]
+            name = minion_pool["pool_name"]
+            endpoint_id = minion_pool["endpoint_id"]
+            pool_os_type = minion_pool["pool_os_type"]
+            if pool_os_type not in constants.VALID_OS_TYPES:
+                raise Exception(
+                    "The provided pool OS type '%s' is invalid. Must be one "
+                    "of the following: %s" % (
+                        pool_os_type, constants.VALID_OS_TYPES))
+            pool_platform = minion_pool["pool_platform"]
+            supported_pool_platforms = [
+                constants.PROVIDER_PLATFORM_SOURCE,
+                constants.PROVIDER_PLATFORM_DESTINATION]
+            if pool_platform not in supported_pool_platforms:
+                raise Exception(
+                    "The provided pool platform ('%s') is invalid. Must be one"
+                    " of the following: %s" % (
+                        pool_platform, supported_pool_platforms))
+            if pool_platform == constants.PROVIDER_PLATFORM_SOURCE and (
+                    pool_os_type != constants.OS_TYPE_LINUX):
+                raise Exception(
+                    "Source Minion Pools are required to be of OS type "
+                    "'%s', not '%s'." % (
+                        constants.OS_TYPE_LINUX, pool_os_type))
+            environment_options = minion_pool["environment_options"]
+            if pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+                self._endpoints_api.validate_endpoint_source_minion_pool_options(
+                    ctxt, endpoint_id, environment_options)
+            elif pool_platform == constants.PROVIDER_PLATFORM_DESTINATION:
+                self._endpoints_api.validate_endpoint_destination_minion_pool_options(
+                    ctxt, endpoint_id, environment_options)
+
+            minimum_minions = minion_pool.get("minimum_minions", 1)
+            maximum_minions = minion_pool.get(
+                "maximum_minions", minimum_minions)
+            minion_max_idle_time = minion_pool.get(
+                "minion_max_idle_time", 1)
+            self._check_pool_numeric_values(
+                minimum_minions, maximum_minions, minion_max_idle_time)
+            minion_retention_strategy = minion_pool.get(
+                "minion_retention_strategy",
+                constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE)
+            self._check_pool_retention_strategy(
+                minion_retention_strategy)
+            notes = minion_pool.get("notes")
+            return (
+                name, endpoint_id, pool_platform, pool_os_type,
+                environment_options, minimum_minions, maximum_minions,
+                minion_max_idle_time, minion_retention_strategy, notes)
+        except Exception as ex:
+            LOG.exception(ex)
+            if hasattr(ex, "message"):
+                msg = ex.message
+            else:
+                msg = str(ex)
+            raise exception.InvalidInput(msg)
+
+    def create(self, req, body):
+        context = req.environ["coriolis.context"]
+        context.can(pools_policies.get_minion_pools_policy_label("create"))
+        (name, endpoint_id, pool_platform, pool_os_type, environment_options,
+         minimum_minions, maximum_minions, minion_max_idle_time,
+         minion_retention_strategy, notes) = (
+            self._validate_create_body(context, body))
+        return minion_pool_view.single(req, self._minion_pool_api.create(
+            context, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=notes))
+
+    def _validate_update_body(self, id, context, body):
+        try:
+            minion_pool = body["minion_pool"]
+            if 'endpoint_id' in minion_pool:
+                raise Exception(
+                    "The 'endpoint_id' of a minion pool cannot be updated.")
+            if 'pool_platform' in minion_pool:
+                raise Exception(
+                    "The 'pool_platform' of a minion pool cannot be updated.")
+            vals = {k: minion_pool[k] for k in minion_pool.keys() &
+                    {"name", "environment_options", "minimum_minions",
+                     "maximum_minions", "minion_max_idle_time",
+                     "minion_retention_strategy", "notes", "pool_os_type"}}
+            if 'minion_retention_strategy' in vals:
+                self._check_pool_retention_strategy(
+                    vals['minion_retention_strategy'])
+            if any([
+                    f in vals for f in [
+                        'environment_options', 'minimum_minions',
+                        'maximum_minions', 'minion_max_idle_time']]):
+                minion_pool = self._minion_pool_api.get_minion_pool(
+                    context, id)
+                self._check_pool_numeric_values(
+                    vals.get(
+                        'minimum_minions', minion_pool['minimum_minions']),
+                    vals.get(
+                        'maximum_minions', minion_pool['maximum_minions']),
+                    vals.get('minion_max_idle_time'))
+
+                if 'environment_options' in vals:
+                    if minion_pool['pool_platform'] == (
+                            constants.PROVIDER_PLATFORM_SOURCE):
+                        self._endpoints_api.validate_endpoint_source_minion_pool_options(
+                            # TODO(aznashwan): remove endpoint ID fields redundancy
+                            # once DB models are overhauled:
+                            context, minion_pool['origin_endpoint_id'],
+                            vals['environment_options'])
+                    elif minion_pool['pool_platform'] == (
+                            constants.PROVIDER_PLATFORM_DESTINATION):
+                        self._endpoints_api.validate_endpoint_destination_minion_pool_options(
+                            # TODO(aznashwan): remove endpoint ID fields redundancy
+                            # once DB models are overhauled:
+                            context, minion_pool['origin_endpoint_id'],
+                            vals['environment_options'])
+                    else:
+                        raise Exception(
+                            "Unknown pool platform: %s" % minion_pool[
+                                'pool_platform'])
+            return vals
+        except Exception as ex:
+            LOG.exception(ex)
+            if hasattr(ex, "message"):
+                msg = ex.message
+            else:
+                msg = str(ex)
+            raise exception.InvalidInput(msg)
+
+    def update(self, req, id, body):
+        context = req.environ["coriolis.context"]
+        context.can(pools_policies.get_minion_pools_policy_label("update"))
+        updated_values = self._validate_update_body(id, context, body)
+        return minion_pool_view.single(
+            req, self._minion_pool_api.update(
+                req.environ['coriolis.context'], id, updated_values))
+
+    def delete(self, req, id):
+        context = req.environ["coriolis.context"]
+        context.can(pools_policies.get_minion_pools_policy_label("delete"))
+        try:
+            self._minion_pool_api.delete(req.environ['coriolis.context'], id)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(MinionPoolController())

+ 5 - 2
coriolis/api/v1/replica_tasks_executions.py

@@ -36,10 +36,13 @@ class ReplicaTasksExecutionController(api_wsgi.Controller):
                 context, replica_id, include_tasks=False))
                 context, replica_id, include_tasks=False))
 
 
     def detail(self, req, replica_id):
     def detail(self, req, replica_id):
+        context = req.environ["coriolis.context"]
+        context.can(
+            executions_policies.get_replica_executions_policy_label("show"))
+
         return replica_tasks_execution_view.collection(
         return replica_tasks_execution_view.collection(
             req, self._replica_tasks_execution_api.get_executions(
             req, self._replica_tasks_execution_api.get_executions(
-                req.environ['coriolis.context'], replica_id,
-                include_tasks=True))
+                context, replica_id, include_tasks=True))
 
 
     def create(self, req, replica_id, body):
     def create(self, req, replica_id, body):
         context = req.environ["coriolis.context"]
         context = req.environ["coriolis.context"]

+ 26 - 4
coriolis/api/v1/replicas.py

@@ -71,6 +71,13 @@ class ReplicaController(api_wsgi.Controller):
             api_utils.validate_network_map(network_map)
             api_utils.validate_network_map(network_map)
             destination_environment['network_map'] = network_map
             destination_environment['network_map'] = network_map
 
 
+            origin_minion_pool_id = replica.get(
+                'origin_minion_pool_id')
+            destination_minion_pool_id = replica.get(
+                'destination_minion_pool_id')
+            instance_osmorphing_minion_pool_mappings = replica.get(
+                'instance_osmorphing_minion_pool_mappings', {})
+
             # NOTE(aznashwan): we validate the destination environment for the
             # NOTE(aznashwan): we validate the destination environment for the
             # import provider before appending the 'storage_mappings' parameter
             # import provider before appending the 'storage_mappings' parameter
             # for plugins with strict property name checks which do not yet
             # for plugins with strict property name checks which do not yet
@@ -88,7 +95,9 @@ class ReplicaController(api_wsgi.Controller):
 
 
             return (origin_endpoint_id, destination_endpoint_id,
             return (origin_endpoint_id, destination_endpoint_id,
                     source_environment, destination_environment, instances,
                     source_environment, destination_environment, instances,
-                    network_map, storage_mappings, notes)
+                    network_map, storage_mappings, notes,
+                    origin_minion_pool_id, destination_minion_pool_id,
+                    instance_osmorphing_minion_pool_mappings)
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)
             msg = getattr(ex, "message", str(ex))
             msg = getattr(ex, "message", str(ex))
@@ -100,12 +109,17 @@ class ReplicaController(api_wsgi.Controller):
 
 
         (origin_endpoint_id, destination_endpoint_id,
         (origin_endpoint_id, destination_endpoint_id,
          source_environment, destination_environment, instances, network_map,
          source_environment, destination_environment, instances, network_map,
-         storage_mappings, notes) = self._validate_create_body(context, body)
+         storage_mappings, notes, origin_minion_pool_id,
+         destination_minion_pool_id,
+         instance_osmorphing_minion_pool_mappings) = (
+            self._validate_create_body(context, body))
 
 
         return replica_view.single(req, self._replica_api.create(
         return replica_view.single(req, self._replica_api.create(
             context, origin_endpoint_id, destination_endpoint_id,
             context, origin_endpoint_id, destination_endpoint_id,
-            source_environment, destination_environment, instances,
-            network_map, storage_mappings, notes))
+            origin_minion_pool_id, destination_minion_pool_id,
+            instance_osmorphing_minion_pool_mappings, source_environment,
+            destination_environment, instances, network_map,
+            storage_mappings, notes))
 
 
     def delete(self, req, id):
     def delete(self, req, id):
         context = req.environ["coriolis.context"]
         context = req.environ["coriolis.context"]
@@ -214,6 +228,14 @@ class ReplicaController(api_wsgi.Controller):
             final_values['destination_environment'][
             final_values['destination_environment'][
                 'network_map'] = final_network_map
                 'network_map'] = final_network_map
 
 
+        minion_pool_fields = [
+            "origin_minion_pool_id", "destination_minion_pool_id",
+            "instance_osmorphing_minion_pool_mappings"]
+        final_values.update({
+            mpf: updated_values[mpf]
+            for mpf in minion_pool_fields
+            if mpf in updated_values})
+
         return final_values
         return final_values
 
 
     def _validate_update_body(self, id, context, body):
     def _validate_update_body(self, id, context, body):

+ 56 - 0
coriolis/api/v1/router.py

@@ -6,14 +6,20 @@ from oslo_log import log as logging
 from coriolis import api
 from coriolis import api
 from coriolis.api.v1 import diagnostics
 from coriolis.api.v1 import diagnostics
 from coriolis.api.v1 import endpoint_actions
 from coriolis.api.v1 import endpoint_actions
+from coriolis.api.v1 import endpoint_destination_minion_pool_options
 from coriolis.api.v1 import endpoint_destination_options
 from coriolis.api.v1 import endpoint_destination_options
 from coriolis.api.v1 import endpoint_instances
 from coriolis.api.v1 import endpoint_instances
 from coriolis.api.v1 import endpoint_networks
 from coriolis.api.v1 import endpoint_networks
+from coriolis.api.v1 import endpoint_source_minion_pool_options
 from coriolis.api.v1 import endpoint_source_options
 from coriolis.api.v1 import endpoint_source_options
 from coriolis.api.v1 import endpoint_storage
 from coriolis.api.v1 import endpoint_storage
 from coriolis.api.v1 import endpoints
 from coriolis.api.v1 import endpoints
 from coriolis.api.v1 import migration_actions
 from coriolis.api.v1 import migration_actions
 from coriolis.api.v1 import migrations
 from coriolis.api.v1 import migrations
+from coriolis.api.v1 import minion_pools
+from coriolis.api.v1 import minion_pool_actions
+from coriolis.api.v1 import minion_pool_tasks_executions
+from coriolis.api.v1 import minion_pool_tasks_execution_actions
 from coriolis.api.v1 import provider_schemas
 from coriolis.api.v1 import provider_schemas
 from coriolis.api.v1 import providers
 from coriolis.api.v1 import providers
 from coriolis.api.v1 import regions
 from coriolis.api.v1 import regions
@@ -61,6 +67,56 @@ class APIRouter(api.APIRouter):
                         controller=self.resources['services'],
                         controller=self.resources['services'],
                         collection={'detail': 'GET'})
                         collection={'detail': 'GET'})
 
 
+        self.resources['minion_pools'] = minion_pools.create_resource()
+        mapper.resource('minion_pool', 'minion_pools',
+                        controller=self.resources['minion_pools'],
+                        collection={'detail': 'GET'})
+
+        minion_pool_actions_resource = minion_pool_actions.create_resource()
+        self.resources['minion_pool_actions'] = minion_pool_actions_resource
+        minion_pool_path = '/{project_id}/minion_pools/{id}'
+        mapper.connect('minion_pool_actions',
+                       minion_pool_path + '/actions',
+                       controller=self.resources['minion_pool_actions'],
+                       action='action',
+                       conditions={'method': 'POST'})
+
+        self.resources['minion_pool_tasks_executions'] = \
+            minion_pool_tasks_executions.create_resource()
+        mapper.resource('minion_pools', 'minion_pools/{minion_pool_id}/executions',
+                        controller=self.resources['minion_pool_tasks_executions'],
+                        collection={'detail': 'GET'},
+                        member={'action': 'POST'})
+
+        minion_pool_tasks_execution_actions_resource = \
+            minion_pool_tasks_execution_actions.create_resource()
+        self.resources['minion_pool_tasks_execution_actions'] = \
+            minion_pool_tasks_execution_actions_resource
+        pool_execution_path = (
+            '/{project_id}/minion_pools/{minion_pool_id}/executions/{id}')
+        mapper.connect('minion_pool_tasks_execution_actions',
+                       pool_execution_path + '/actions',
+                       controller=self.resources[
+                           'minion_pool_tasks_execution_actions'],
+                       action='action',
+                       conditions={'method': 'POST'})
+
+        self.resources['endpoint_source_minion_pool_options'] = \
+            endpoint_source_minion_pool_options.create_resource()
+        mapper.resource('minion_pool_options',
+                        'endpoints/{endpoint_id}/source-minion-pool-options',
+                        controller=(
+                            self.resources[
+                                'endpoint_source_minion_pool_options']))
+
+        self.resources['endpoint_destination_minion_pool_options'] = \
+            endpoint_destination_minion_pool_options.create_resource()
+        mapper.resource('minion_pool_options',
+                        'endpoints/{endpoint_id}/destination-minion-pool-options',
+                        controller=(
+                            self.resources[
+                                'endpoint_destination_minion_pool_options']))
+
         endpoint_actions_resource = endpoint_actions.create_resource()
         endpoint_actions_resource = endpoint_actions.create_resource()
         self.resources['endpoint_actions'] = endpoint_actions_resource
         self.resources['endpoint_actions'] = endpoint_actions_resource
         endpoint_path = '/{project_id}/endpoints/{id}'
         endpoint_path = '/{project_id}/endpoints/{id}'

+ 20 - 0
coriolis/api/v1/views/endpoint_destination_minion_pool_options_view.py

@@ -0,0 +1,20 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_dest_opt(req, destination_option, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    return dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in destination_option.items()))
+
+
+def collection(req, destination_pool_options):
+    formatted_opts = [
+        _format_dest_opt(req, opt) for opt in destination_pool_options]
+    return {'destination_minion_pool_options': formatted_opts}

+ 20 - 0
coriolis/api/v1/views/endpoint_source_minion_pool_options_view.py

@@ -0,0 +1,20 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_dest_opt(req, source_option, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    return dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in source_option.items()))
+
+
+def collection(req, source_pool_options):
+    formatted_opts = [
+        _format_dest_opt(req, opt) for opt in source_pool_options]
+    return {'source_minion_pool_options': formatted_opts}

+ 45 - 0
coriolis/api/v1/views/minion_pool_tasks_execution_view.py

@@ -0,0 +1,45 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+from coriolis import constants
+from coriolis import utils
+
+
+def _sort_tasks(tasks, filter_error_only_tasks=True):
+    """ Sorts the given list of dicts representing tasks.
+    Tasks are sorted primarily based on their index.
+    """
+    if filter_error_only_tasks:
+        tasks = [
+            t for t in tasks
+            if t['status'] != (
+                constants.TASK_STATUS_ON_ERROR_ONLY)]
+    return sorted(
+        tasks, key=lambda t: t.get('index', 0))
+
+
+def format_minion_pool_tasks_execution(req, execution, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    if "tasks" in execution:
+        execution["tasks"] = _sort_tasks(execution["tasks"])
+
+    execution_dict = dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in execution.items()))
+
+    return execution_dict
+
+
+def single(req, execution):
+    return {"execution": format_minion_pool_tasks_execution(req, execution)}
+
+
+def collection(req, executions):
+    formatted_executions = [
+        format_minion_pool_tasks_execution(req, m) for m in executions]
+    return {'executions': formatted_executions}

+ 54 - 0
coriolis/api/v1/views/minion_pool_view.py

@@ -0,0 +1,54 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_minion_pool(req, minion_pool, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    minion_pool_dict = dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in minion_pool.items()))
+
+    # TODO(aznashwan): remove these redundancies once the base
+    # DB action model hirearchy will be overhauled:
+    for key in ["origin_endpoint_id", "destination_endpoint_id"]:
+        if key in minion_pool_dict:
+            minion_pool_dict["endpoint_id"] = minion_pool_dict.pop(key)
+    for key in ["source_environment", "destination_environment"]:
+        if key in minion_pool_dict:
+            minion_pool_dict["environment_options"] = minion_pool_dict.pop(key)
+
+    def _hide_minion_creds(minion_conn):
+        if 'pkey' in minion_conn:
+            minion_conn['pkey'] = '***'
+        if 'password' in minion_conn:
+            minion_conn['password'] = '***'
+        if 'certificates' in minion_conn:
+            for key in minion_conn['certificates']:
+                minion_conn['certificates'][key] = '***'
+    if 'minion_machines' in minion_pool_dict:
+        for machine in minion_pool_dict['minion_machines']:
+            if 'connection_info' in machine:
+                _hide_minion_creds(machine['connection_info'])
+            if 'backup_writer_connection_info' in machine:
+                if 'connection_details' in machine[
+                        'backup_writer_connection_info']:
+                    _hide_minion_creds(
+                        machine['backup_writer_connection_info'][
+                            'connection_details'])
+
+    return minion_pool_dict
+
+
+def single(req, minion_pool):
+    return {"minion_pool": _format_minion_pool(req, minion_pool)}
+
+
+def collection(req, minion_pools):
+    formatted_minion_pools = [
+        _format_minion_pool(req, r) for r in minion_pools]
+    return {'minion_pools': formatted_minion_pools}

+ 126 - 6
coriolis/conductor/rpc/client.py

@@ -155,6 +155,9 @@ class ConductorClient(object):
 
 
     def create_instances_replica(self, ctxt, origin_endpoint_id,
     def create_instances_replica(self, ctxt, origin_endpoint_id,
                                  destination_endpoint_id,
                                  destination_endpoint_id,
+                                 origin_minion_pool_id,
+                                 destination_minion_pool_id,
+                                 instance_osmorphing_minion_pool_mappings,
                                  source_environment, destination_environment,
                                  source_environment, destination_environment,
                                  instances, network_map, storage_mappings,
                                  instances, network_map, storage_mappings,
                                  notes=None):
                                  notes=None):
@@ -162,6 +165,10 @@ class ConductorClient(object):
             ctxt, 'create_instances_replica',
             ctxt, 'create_instances_replica',
             origin_endpoint_id=origin_endpoint_id,
             origin_endpoint_id=origin_endpoint_id,
             destination_endpoint_id=destination_endpoint_id,
             destination_endpoint_id=destination_endpoint_id,
+            origin_minion_pool_id=origin_minion_pool_id,
+            destination_minion_pool_id=destination_minion_pool_id,
+            instance_osmorphing_minion_pool_mappings=(
+                instance_osmorphing_minion_pool_mappings),
             destination_environment=destination_environment,
             destination_environment=destination_environment,
             instances=instances,
             instances=instances,
             notes=notes,
             notes=notes,
@@ -197,15 +204,22 @@ class ConductorClient(object):
             ctxt, 'get_migration', migration_id=migration_id)
             ctxt, 'get_migration', migration_id=migration_id)
 
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
     def migrate_instances(self, ctxt, origin_endpoint_id,
-                          destination_endpoint_id, source_environment,
-                          destination_environment, instances, network_map,
-                          storage_mappings, replication_count,
-                          shutdown_instances=False, notes=None,
-                          skip_os_morphing=False, user_scripts=None):
+                          destination_endpoint_id, origin_minion_pool_id,
+                          destination_minion_pool_id,
+                          instance_osmorphing_minion_pool_mappings,
+                          source_environment, destination_environment,
+                          instances, network_map, storage_mappings,
+                          replication_count, shutdown_instances=False,
+                          notes=None, skip_os_morphing=False,
+                          user_scripts=None):
         return self._client.call(
         return self._client.call(
             ctxt, 'migrate_instances',
             ctxt, 'migrate_instances',
             origin_endpoint_id=origin_endpoint_id,
             origin_endpoint_id=origin_endpoint_id,
             destination_endpoint_id=destination_endpoint_id,
             destination_endpoint_id=destination_endpoint_id,
+            origin_minion_pool_id=origin_minion_pool_id,
+            destination_minion_pool_id=destination_minion_pool_id,
+            instance_osmorphing_minion_pool_mappings=(
+                instance_osmorphing_minion_pool_mappings),
             destination_environment=destination_environment,
             destination_environment=destination_environment,
             instances=instances,
             instances=instances,
             notes=notes,
             notes=notes,
@@ -217,11 +231,15 @@ class ConductorClient(object):
             source_environment=source_environment,
             source_environment=source_environment,
             user_scripts=user_scripts)
             user_scripts=user_scripts)
 
 
-    def deploy_replica_instances(self, ctxt, replica_id, clone_disks=False,
+    def deploy_replica_instances(self, ctxt, replica_id,
+                                 instance_osmorphing_minion_pool_mappings=None,
+                                 clone_disks=False,
                                  force=False, skip_os_morphing=False,
                                  force=False, skip_os_morphing=False,
                                  user_scripts=None):
                                  user_scripts=None):
         return self._client.call(
         return self._client.call(
             ctxt, 'deploy_replica_instances', replica_id=replica_id,
             ctxt, 'deploy_replica_instances', replica_id=replica_id,
+            instance_osmorphing_minion_pool_mappings=(
+                instance_osmorphing_minion_pool_mappings),
             clone_disks=clone_disks, force=force,
             clone_disks=clone_disks, force=force,
             skip_os_morphing=skip_os_morphing,
             skip_os_morphing=skip_os_morphing,
             user_scripts=user_scripts)
             user_scripts=user_scripts)
@@ -372,3 +390,105 @@ class ConductorClient(object):
     def delete_service(self, ctxt, service_id):
     def delete_service(self, ctxt, service_id):
         return self._client.call(
         return self._client.call(
             ctxt, 'delete_service', service_id=service_id)
             ctxt, 'delete_service', service_id=service_id)
+
+    def create_minion_pool(
+            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=None):
+        return self._client.call(
+            ctxt, 'create_minion_pool', name=name, endpoint_id=endpoint_id,
+            pool_platform=pool_platform, pool_os_type=pool_os_type,
+            environment_options=environment_options,
+            minimum_minions=minimum_minions,
+            maximum_minions=maximum_minions,
+            minion_max_idle_time=minion_max_idle_time,
+            minion_retention_strategy=minion_retention_strategy,
+            notes=notes)
+
+    def set_up_shared_minion_pool_resources(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, "set_up_shared_minion_pool_resources",
+            minion_pool_id=minion_pool_id)
+
+    def tear_down_shared_minion_pool_resources(
+            self, ctxt, minion_pool_id, force=False):
+        return self._client.call(
+            ctxt, "tear_down_shared_minion_pool_resources",
+            minion_pool_id=minion_pool_id, force=force)
+
+    def allocate_minion_pool_machines(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, "allocate_minion_pool_machines",
+            minion_pool_id=minion_pool_id)
+
+    def deallocate_minion_pool_machines(
+            self, ctxt, minion_pool_id, force=False):
+        return self._client.call(
+            ctxt, "deallocate_minion_pool_machines",
+            minion_pool_id=minion_pool_id,
+            force=force)
+
+    def get_minion_pools(self, ctxt):
+        return self._client.call(ctxt, 'get_minion_pools')
+
+    def get_minion_pool(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, 'get_minion_pool', minion_pool_id=minion_pool_id)
+
+    def update_minion_pool(self, ctxt, minion_pool_id, updated_values):
+        return self._client.call(
+            ctxt, 'update_minion_pool',
+            minion_pool_id=minion_pool_id, updated_values=updated_values)
+
+    def delete_minion_pool(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, 'delete_minion_pool', minion_pool_id=minion_pool_id)
+
+    def get_minion_pool_lifecycle_executions(
+            self, ctxt, minion_pool_id, include_tasks=False):
+        return self._client.call(
+            ctxt, 'get_minion_pool_lifecycle_executions',
+            minion_pool_id=minion_pool_id, include_tasks=include_tasks)
+
+    def get_minion_pool_lifecycle_execution(
+            self, ctxt, minion_pool_id, execution_id):
+        return self._client.call(
+            ctxt, 'get_minion_pool_lifecycle_execution',
+            minion_pool_id=minion_pool_id, execution_id=execution_id)
+
+    def delete_minion_pool_lifecycle_execution(
+            self, ctxt, minion_pool_id, execution_id):
+        return self._client.call(
+            ctxt, 'delete_minion_pool_lifecycle_execution',
+            minion_pool_id=minion_pool_id, execution_id=execution_id)
+
+    def cancel_minion_pool_lifecycle_execution(
+            self, ctxt, minion_pool_id, execution_id, force):
+        return self._client.call(
+            ctxt, 'cancel_minion_pool_lifecycle_execution',
+            minion_pool_id=minion_pool_id, execution_id=execution_id,
+            force=force)
+
+    def get_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_source_minion_pool_options',
+            endpoint_id=endpoint_id, env=env, option_names=option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_destination_minion_pool_options',
+            endpoint_id=endpoint_id, env=env, option_names=option_names)
+
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_source_minion_pool_options',
+            endpoint_id=endpoint_id, pool_environment=pool_environment)
+
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_destination_minion_pool_options',
+            endpoint_id=endpoint_id, pool_environment=pool_environment)

Разница между файлами не показана из-за своего большого размера
+ 732 - 70
coriolis/conductor/rpc/server.py


+ 104 - 3
coriolis/constants.py

@@ -33,6 +33,7 @@ TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_FORCE_CANCELED = "FORCE_CANCELED"
 TASK_STATUS_FORCE_CANCELED = "FORCE_CANCELED"
+TASK_STATUS_FAILED_TO_CANCEL = "FAILED_TO_CANCEL"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED_AFTER_COMPLETION = "CANCELED_AFTER_COMPLETION"
 TASK_STATUS_CANCELED_AFTER_COMPLETION = "CANCELED_AFTER_COMPLETION"
 TASK_STATUS_CANCELLING = "CANCELLING"
 TASK_STATUS_CANCELLING = "CANCELLING"
@@ -57,7 +58,8 @@ CANCELED_TASK_STATUSES = [
     TASK_STATUS_CANCELED_AFTER_COMPLETION,
     TASK_STATUS_CANCELED_AFTER_COMPLETION,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FROM_DEADLOCK,
     TASK_STATUS_CANCELED_FROM_DEADLOCK,
-    TASK_STATUS_FAILED_TO_SCHEDULE
+    TASK_STATUS_FAILED_TO_SCHEDULE,
+    TASK_STATUS_FAILED_TO_CANCEL
 ]
 ]
 
 
 FINALIZED_TASK_STATUSES = [
 FINALIZED_TASK_STATUSES = [
@@ -69,7 +71,8 @@ FINALIZED_TASK_STATUSES = [
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FROM_DEADLOCK,
     TASK_STATUS_CANCELED_FROM_DEADLOCK,
     TASK_STATUS_CANCELED_AFTER_COMPLETION,
     TASK_STATUS_CANCELED_AFTER_COMPLETION,
-    TASK_STATUS_FAILED_TO_SCHEDULE
+    TASK_STATUS_FAILED_TO_SCHEDULE,
+    TASK_STATUS_FAILED_TO_CANCEL
 ]
 ]
 
 
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
@@ -130,6 +133,46 @@ TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = (
 TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
 TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
 TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 
 
+TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS = (
+    "VALIDATE_SOURCE_MINION_POOL_ENVIRONMENT_OPTIONS")
+TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS = (
+    "VALIDATE_DESTINATION_MINION_POOL_ENVIRONMENT_OPTIONS")
+TASK_TYPE_CREATE_SOURCE_MINION_MACHINE = "CREATE_SOURCE_MINION_MACHINE"
+TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE = (
+    "CREATE_DESTINATION_MINION_MACHINE")
+TASK_TYPE_DELETE_SOURCE_MINION_MACHINE = "DELETE_SOURCE_MINION_MACHINE"
+TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE = (
+    "DELETE_DESTINATION_MINION_MACHINE")
+TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES = (
+    "SET_UP_SOURCE_POOL_SHARED_RESOURCES")
+TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES = (
+    "SET_UP_DESTINATION_POOL_SHARED_RESOURCES")
+TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES = (
+    "TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES")
+TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES = (
+    "TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES")
+TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION = "ATTACH_VOLUMES_TO_SOURCE_MINION"
+TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION = (
+    "DETACH_VOLUMES_FROM_SOURCE_MINION")
+TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION = (
+    "ATTACH_VOLUMES_TO_DESTINATION_MINION")
+TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION = (
+    "DETACH_VOLUMES_FROM_DESTINATION_MINION")
+TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION = (
+    "ATTACH_VOLUMES_TO_OSMORPHING_MINION")
+TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION = (
+    "DETACH_VOLUMES_FROM_OSMORPHING_MINION")
+TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY = (
+    "VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY")
+TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY = (
+    "VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY")
+TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY = (
+    "VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY")
+TASK_TYPE_RELEASE_SOURCE_MINION = "RELEASE_SOURCE_MINION"
+TASK_TYPE_RELEASE_DESTINATION_MINION = "RELEASE_DESTINATION_MINION"
+TASK_TYPE_RELEASE_OSMORPHING_MINION = "RELEASE_OSMORPHING_MINION"
+TASK_TYPE_COLLECT_OSMORPHING_INFO = "COLLECT_OS_MORPHING_INFO"
+
 TASK_PLATFORM_SOURCE = "source"
 TASK_PLATFORM_SOURCE = "source"
 TASK_PLATFORM_DESTINATION = "destination"
 TASK_PLATFORM_DESTINATION = "destination"
 TASK_PLATFORM_BILATERAL = "bilateral"
 TASK_PLATFORM_BILATERAL = "bilateral"
@@ -156,6 +199,8 @@ PROVIDER_TYPE_ENDPOINT_STORAGE = 32768
 PROVIDER_TYPE_SOURCE_REPLICA_UPDATE = 65536
 PROVIDER_TYPE_SOURCE_REPLICA_UPDATE = 65536
 PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS = 131072
 PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS = 131072
 PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE = 262144
 PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE = 262144
+PROVIDER_TYPE_SOURCE_MINION_POOL = 524288
+PROVIDER_TYPE_DESTINATION_MINION_POOL = 1048576
 
 
 DISK_FORMAT_VMDK = 'vmdk'
 DISK_FORMAT_VMDK = 'vmdk'
 DISK_FORMAT_RAW = 'raw'
 DISK_FORMAT_RAW = 'raw'
@@ -190,6 +235,9 @@ OS_TYPE_UNKNOWN = "unknown"
 
 
 DEFAULT_OS_TYPE = OS_TYPE_LINUX
 DEFAULT_OS_TYPE = OS_TYPE_LINUX
 
 
+VALID_OS_TYPES = [
+    OS_TYPE_BSD, OS_TYPE_LINUX, OS_TYPE_OS_X, OS_TYPE_SOLARIS, OS_TYPE_WINDOWS]
+
 TMP_DIRS_KEY = "__tmp_dirs"
 TMP_DIRS_KEY = "__tmp_dirs"
 
 
 COMPRESSION_FORMAT_GZIP = "gzip"
 COMPRESSION_FORMAT_GZIP = "gzip"
@@ -205,6 +253,23 @@ EXECUTION_TYPE_REPLICA_DISKS_DELETE = "replica_disks_delete"
 EXECUTION_TYPE_REPLICA_DEPLOY = "replica_deploy"
 EXECUTION_TYPE_REPLICA_DEPLOY = "replica_deploy"
 EXECUTION_TYPE_MIGRATION = "migration"
 EXECUTION_TYPE_MIGRATION = "migration"
 EXECUTION_TYPE_REPLICA_UPDATE = "replica_update"
 EXECUTION_TYPE_REPLICA_UPDATE = "replica_update"
+EXECUTION_TYPE_MINION_POOL_MAINTENANCE = "minion_pool_maintenance"
+EXECUTION_TYPE_MINION_POOL_UPDATE = "minion_pool_update"
+EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES = (
+    "minion_pool_set_up_shared_resources")
+EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES = (
+    "minion_pool_tear_down_shared_resources")
+EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS = "minion_pool_allocate_minions"
+EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS = (
+    "minion_pool_deallocate_minions")
+
+MINION_POOL_EXECUTION_TYPES = [
+    EXECUTION_TYPE_MINION_POOL_MAINTENANCE,
+    EXECUTION_TYPE_MINION_POOL_UPDATE,
+    EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES,
+    EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES,
+    EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS,
+    EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS]
 
 
 TASK_LOCK_NAME_FORMAT = "task-%s"
 TASK_LOCK_NAME_FORMAT = "task-%s"
 EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
 EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
@@ -214,13 +279,22 @@ REPLICA_LOCK_NAME_FORMAT = "replica-%s"
 SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
 SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
 REGION_LOCK_NAME_FORMAT = "region-%s"
 REGION_LOCK_NAME_FORMAT = "region-%s"
 SERVICE_LOCK_NAME_FORMAT = "service-%s"
 SERVICE_LOCK_NAME_FORMAT = "service-%s"
+MINION_POOL_LOCK_NAME_FORMAT = "minion-pool-%s"
 
 
 EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = {
 EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = {
     EXECUTION_TYPE_MIGRATION: MIGRATION_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_MIGRATION: MIGRATION_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_REPLICA_EXECUTION: REPLICA_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_REPLICA_EXECUTION: REPLICA_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_REPLICA_DEPLOY: REPLICA_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_REPLICA_DEPLOY: REPLICA_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_REPLICA_UPDATE: REPLICA_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_REPLICA_UPDATE: REPLICA_LOCK_NAME_FORMAT,
-    EXECUTION_TYPE_REPLICA_DISKS_DELETE: REPLICA_LOCK_NAME_FORMAT
+    EXECUTION_TYPE_REPLICA_DISKS_DELETE: REPLICA_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_MINION_POOL_MAINTENANCE: MINION_POOL_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_MINION_POOL_UPDATE: MINION_POOL_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES: (
+        MINION_POOL_LOCK_NAME_FORMAT),
+    EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES: (
+        MINION_POOL_LOCK_NAME_FORMAT),
+    EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS: MINION_POOL_LOCK_NAME_FORMAT,
+    EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS: MINION_POOL_LOCK_NAME_FORMAT
 }
 }
 
 
 SERVICE_STATUS_UP = "UP"
 SERVICE_STATUS_UP = "UP"
@@ -232,3 +306,30 @@ CONDUCTOR_MAIN_MESSAGING_TOPIC = "coriolis_conductor"
 WORKER_MAIN_MESSAGING_TOPIC = "coriolis_worker"
 WORKER_MAIN_MESSAGING_TOPIC = "coriolis_worker"
 SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler"
 SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler"
 REPLICA_CRON_MAIN_MESSAGING_TOPIC = "coriolis_replica_cron_worker"
 REPLICA_CRON_MAIN_MESSAGING_TOPIC = "coriolis_replica_cron_worker"
+
+MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE = "delete"
+MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF = "poweroff"
+
+MINION_POOL_STATUS_UNKNOWN = "UNKNOWN"
+MINION_POOL_STATUS_ERROR = "ERROR"
+MINION_POOL_STATUS_UNINITIALIZED = "UNINITIALIZED"
+MINION_POOL_STATUS_UNINITIALIZING = "UNINITIALIZING"
+MINION_POOL_STATUS_INITIALIZING = "INITIALIZING"
+MINION_POOL_STATUS_DEALLOCATING = "DEALLOCATING"
+MINION_POOL_STATUS_DEALLOCATED = "DEALLOCATED"
+MINION_POOL_STATUS_ALLOCATING = "ALLOCATING"
+MINION_POOL_STATUS_ALLOCATED = "ALLOCATED"
+MINION_POOL_STATUS_RECONFIGURING = "RECONFIGURING"
+
+ACTIVE_MINION_POOL_STATUSES = [
+    MINION_POOL_STATUS_INITIALIZING,
+    MINION_POOL_STATUS_ALLOCATING,
+    MINION_POOL_STATUS_DEALLOCATING,
+    MINION_POOL_STATUS_UNINITIALIZING]
+
+MINION_MACHINE_IDENTIFIER_FORMAT = "coriolis-pool-%(pool_id)s-minion-%(minion_id)s"
+MINION_MACHINE_STATUS_UNKNOWN = "UNKNOWN"
+MINION_MACHINE_STATUS_UNINITIALIZED = "UNINITIALIZED"
+MINION_MACHINE_STATUS_RECONFIGURING = "RECONFIGURING"
+MINION_MACHINE_STATUS_AVAILABLE = "AVAILABLE"
+MINION_MACHINE_STATUS_ALLOCATED = "ALLOCATED"

+ 259 - 18
coriolis/db/api.py

@@ -574,6 +574,7 @@ def set_execution_status(
     if update_action_status:
     if update_action_status:
         set_action_last_execution_status(
         set_action_last_execution_status(
             context, execution.action_id, status)
             context, execution.action_id, status)
+    return execution
 
 
 
 
 @enginefacade.reader
 @enginefacade.reader
@@ -615,26 +616,28 @@ def update_transfer_action_info_for_instance(
 
 
     # Copy is needed, otherwise sqlalchemy won't save the changes
     # Copy is needed, otherwise sqlalchemy won't save the changes
     action_info = action.info.copy()
     action_info = action.info.copy()
+    instance_info_old = {}
     if instance in action_info:
     if instance in action_info:
         instance_info_old = action_info[instance]
         instance_info_old = action_info[instance]
-        old_keys = set(instance_info_old.keys())
-        new_keys = set(new_instance_info.keys())
-        overwritten_keys = old_keys.intersection(new_keys)
-        if overwritten_keys:
-            LOG.debug(
-                "Overwriting the values of the following keys for info of "
-                "instance '%s' of action with ID '%s': %s",
-                instance, action_id, overwritten_keys)
-        newly_added_keys = new_keys.difference(old_keys)
-        if newly_added_keys:
-            LOG.debug(
-                "The following new keys will be added for info of instance "
-                "'%s' in action with ID '%s': %s",
-                instance, action_id, newly_added_keys)
 
 
-        instance_info_old_copy = instance_info_old.copy()
-        instance_info_old_copy.update(new_instance_info)
-        action_info[instance] = instance_info_old_copy
+    old_keys = set(instance_info_old.keys())
+    new_keys = set(new_instance_info.keys())
+    overwritten_keys = old_keys.intersection(new_keys)
+    if overwritten_keys:
+        LOG.debug(
+            "Overwriting the values of the following keys for info of "
+            "instance '%s' of action with ID '%s': %s",
+            instance, action_id, overwritten_keys)
+    newly_added_keys = new_keys.difference(old_keys)
+    if newly_added_keys:
+        LOG.debug(
+            "The following new keys will be added for info of instance "
+            "'%s' in action with ID '%s': %s",
+            instance, action_id, newly_added_keys)
+
+    instance_info_old_copy = instance_info_old.copy()
+    instance_info_old_copy.update(new_instance_info)
+    action_info[instance] = instance_info_old_copy
     action.info = action_info
     action.info = action_info
 
 
     return action_info[instance]
     return action_info[instance]
@@ -744,7 +747,9 @@ def update_replica(context, replica_id, updated_values):
 
 
     updateable_fields = [
     updateable_fields = [
         "source_environment", "destination_environment", "notes",
         "source_environment", "destination_environment", "notes",
-        "network_map", "storage_mappings"]
+        "network_map", "storage_mappings",
+        "origin_minion_pool_id", "destination_minion_pool_id",
+        "instance_osmorphing_minion_pool_mappings"]
     for field in updateable_fields:
     for field in updateable_fields:
         if mapped_info_fields.get(field, field) in updated_values:
         if mapped_info_fields.get(field, field) in updated_values:
             LOG.debug(
             LOG.debug(
@@ -1087,3 +1092,239 @@ def get_mapped_services_for_region(context, region_id):
     q = q.filter(
     q = q.filter(
         models.ServiceRegionMapping.service_id == region_id)
         models.ServiceRegionMapping.service_id == region_id)
     return q.all()
     return q.all()
+
+
+@enginefacade.writer
+def add_minion_machine(context, minion_machine):
+    minion_machine.user_id = context.user
+    minion_machine.project_id = context.tenant
+    _session(context).add(minion_machine)
+
+
+@enginefacade.reader
+def get_minion_machines(context, allocated_action_id=None):
+    q = _soft_delete_aware_query(context, models.MinionMachine)
+    if allocated_action_id:
+        q = q.filter(
+            models.MinionMachine.allocated_action == allocated_action_id)
+    return q.all()
+
+
+@enginefacade.reader
+def get_minion_machine(context, minion_machine_id):
+    q = _soft_delete_aware_query(context, models.MinionMachine)
+    return q.filter(
+        models.MinionMachine.id == minion_machine_id).first()
+
+
+@enginefacade.writer
+def update_minion_machine(context, minion_machine_id, updated_values):
+    if not minion_machine_id:
+        raise exception.InvalidInput(
+            "No minion_machine ID specified for updating.")
+    minion_machine = get_minion_machine(context, minion_machine_id)
+    if not minion_machine:
+        raise exception.NotFound(
+            "MinionMachine with ID '%s' does not exist." % minion_machine_id)
+
+    updateable_fields = [
+        "connection_info", "provider_properties", "status",
+        "backup_writer_connection_info", "allocated_action"]
+    _update_sqlalchemy_object_fields(
+        minion_machine, updateable_fields, updated_values)
+
+
+@enginefacade.writer
+def set_minion_machines_allocation_statuses(
+        context, minion_machine_ids, action_id, allocation_status):
+    machines = get_minion_machines(context)
+    existing_machine_id_mappings = {
+        machine.id: machine for machine in machines}
+    missing = [
+        mid for mid in minion_machine_ids
+        if mid not in existing_machine_id_mappings]
+    if missing:
+        raise exception.NotFound(
+            "The following minion machines could not be found: %s" % (
+                missing))
+
+    for machine_id in minion_machine_ids:
+        machine = existing_machine_id_mappings[machine_id]
+        LOG.debug(
+            "Changing allocation status in DB for minion machine '%s' "
+            "from '%s' to '%s' and allocated action from '%s' to '%s'" % (
+                machine.id, machine.status, allocation_status,
+                machine.allocated_action, action_id))
+        machine.allocated_action = action_id
+        machine.status = allocation_status
+
+
+@enginefacade.writer
+def delete_minion_machine(context, minion_machine_id):
+    minion_machine = get_minion_machine(context, minion_machine_id)
+    # TODO(aznashwan): update models to be soft-delete-aware to
+    # avoid needing to hard-delete here:
+    count = _soft_delete_aware_query(context, models.MinionMachine).filter_by(
+        id=minion_machine_id).delete()
+    if count == 0:
+        raise exception.NotFound("0 MinionMachine entries were soft deleted")
+
+
+@enginefacade.writer
+def add_minion_pool_lifecycle(context, minion_pool_lifecycle):
+    minion_pool_lifecycle.user_id = context.user
+    minion_pool_lifecycle.project_id = context.tenant
+    _session(context).add(minion_pool_lifecycle)
+
+
+@enginefacade.writer
+def delete_minion_pool_lifecycle(context, minion_pool_id):
+    _delete_transfer_action(
+        context, models.MinionPoolLifecycle, minion_pool_id)
+
+
+@enginefacade.reader
+def get_minion_pool_lifecycle(
+        context, minion_pool_id, include_tasks_executions=True,
+        include_machines=True):
+    q = _soft_delete_aware_query(context, models.MinionPoolLifecycle)
+    if include_tasks_executions:
+        q = q.options(orm.joinedload(models.MinionPoolLifecycle.executions))
+    if include_machines:
+        q = q.options(orm.joinedload('minion_machines'))
+    if is_user_context(context):
+        q = q.filter(
+            models.MinionPoolLifecycle.project_id == context.tenant)
+    return q.filter(
+        models.MinionPoolLifecycle.id == minion_pool_id).first()
+
+
+@enginefacade.reader
+def get_minion_pool_lifecycles(
+        context, include_tasks_executions=False, include_info=False,
+        include_machines=False, to_dict=True):
+    q = _soft_delete_aware_query(context, models.MinionPoolLifecycle)
+    if include_tasks_executions:
+        q = q.options(orm.joinedload(models.MinionPoolLifecycle.executions))
+    if include_info is False:
+        q = q.options(orm.defer('info'))
+    q = q.filter()
+    if is_user_context(context):
+        q = q.filter(
+            models.Replica.project_id == context.tenant)
+    if include_machines:
+        q = q.options(orm.joinedload('minion_machines'))
+    db_result = q.all()
+    if to_dict:
+        return [i.to_dict(
+            include_info=include_info,
+            include_executions=include_tasks_executions,
+            include_machines=include_machines) for i in db_result]
+    return db_result
+
+
+@enginefacade.writer
+def add_minion_pool_lifecycle_execution(context, execution):
+    if is_user_context(context):
+        if execution.action.project_id != context.tenant:
+            raise exception.NotAuthorized()
+
+    # include deleted records
+    max_number = _model_query(
+        context, func.max(models.TasksExecution.number)).filter_by(
+            action_id=execution.action.id).first()[0] or 0
+    execution.number = max_number + 1
+
+    _session(context).add(execution)
+
+
+@enginefacade.writer
+def set_minion_pool_lifecycle_status(context, minion_pool_id, status):
+    pool = get_minion_pool_lifecycle(
+        context, minion_pool_id, include_tasks_executions=False,
+        include_machines=False)
+    LOG.debug(
+        "Transitioning minion pool '%s' from status '%s' to '%s'in DB",
+        minion_pool_id, pool.pool_status, status)
+    pool.pool_status = status
+    setattr(pool, 'updated_at', timeutils.utcnow())
+
+
+@enginefacade.writer
+def update_minion_pool_lifecycle(context, minion_pool_id, updated_values):
+    lifecycle = get_minion_pool_lifecycle(
+        context, minion_pool_id, include_tasks_executions=False,
+        include_machines=False)
+    if not lifecycle:
+        raise exception.NotFound(
+            "Minion pool '%s' not found" % minion_pool_id)
+
+    updateable_fields = [
+        "minimum_minions", "maximum_minions", "minion_max_idle_time",
+        "minion_retention_strategy", "environment_options",
+        "pool_shared_resources", "notes", "pool_name", "pool_os_type"]
+    # TODO(aznashwan): this should no longer be required when the
+    # transfer action class hirearchy is to be overhauled:
+    redundancies = {
+        "environment_options": [
+            "source_environment", "destination_environment"]}
+    for field in updateable_fields:
+        if field in updated_values:
+            if field in redundancies:
+                for old_key in redundancies[field]:
+                    LOG.debug(
+                        "Updating the '%s' field of Minion Pool '%s' to: '%s'",
+                        old_key, minion_pool_id, updated_values[field])
+                    setattr(lifecycle, old_key, updated_values[field])
+            else:
+                LOG.debug(
+                    "Updating the '%s' field of Minion Pool '%s' to: '%s'",
+                    field, minion_pool_id, updated_values[field])
+                setattr(lifecycle, field, updated_values[field])
+
+    non_updateable_fields = set(
+        updated_values.keys()).difference(updateable_fields)
+    if non_updateable_fields:
+        LOG.warn(
+            "The following Replica fields can NOT be updated: %s",
+            non_updateable_fields)
+
+    # the oslo_db library uses this method for both the `created_at` and
+    # `updated_at` fields
+    setattr(lifecycle, 'updated_at', timeutils.utcnow())
+
+@enginefacade.reader
+def get_minion_pool_lifecycle_executions(
+        context, lifecycle_id, include_tasks=True):
+    q = _soft_delete_aware_query(context, models.TasksExecution)
+    q = q.join(models.MinionPoolLifecycle)
+    if include_tasks:
+        q = _get_tasks_with_details_options(q)
+    if is_user_context(context):
+        q = q.filter(models.MinionPoolLifecycle.project_id == context.tenant)
+    return q.filter(
+        models.MinionPoolLifecycle.id == lifecycle_id).all()
+
+@enginefacade.reader
+def get_minion_pool_lifecycle_execution(context, lifecycle_id, execution_id):
+    q = _soft_delete_aware_query(context, models.TasksExecution).join(
+        models.MinionPoolLifecycle)
+    q = _get_tasks_with_details_options(q)
+    if is_user_context(context):
+        q = q.filter(models.MinionPoolLifecycle.project_id == context.tenant)
+    return q.filter(
+        models.MinionPoolLifecycle.id == lifecycle_id,
+        models.TasksExecution.id == execution_id).first()
+
+@enginefacade.writer
+def delete_minion_pool_lifecycle_execution(context, execution_id):
+    q = _soft_delete_aware_query(context, models.TasksExecution).filter(
+        models.TasksExecution.id == execution_id)
+    if is_user_context(context):
+        if not q.join(models.MinionPoolLifecycle).filter(
+                models.MinionPoolLifecycle.project_id == (
+                    context.tenant)).first():
+            raise exception.NotAuthorized()
+    count = q.soft_delete()
+    if count == 0:
+        raise exception.NotFound("0 entries were soft deleted")

+ 107 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -0,0 +1,107 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import uuid
+
+import sqlalchemy
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    endpoint = sqlalchemy.Table(
+        'endpoint', meta, autoload=True)
+    base_transfer_action = sqlalchemy.Table(
+        'base_transfer_action', meta, autoload=True)
+
+    # add the pool option properties for the transfer:
+    origin_minion_pool_id = sqlalchemy.Column(
+        "origin_minion_pool_id", sqlalchemy.String(36), nullable=True)
+    destination_minion_pool_id = sqlalchemy.Column(
+        "destination_minion_pool_id", sqlalchemy.String(36), nullable=True)
+    instance_osmorphing_minion_pool_mappings = sqlalchemy.Column(
+        "instance_osmorphing_minion_pool_mappings", sqlalchemy.Text,
+        nullable=False, default='{}')
+    for col in [
+            origin_minion_pool_id, destination_minion_pool_id,
+            instance_osmorphing_minion_pool_mappings]:
+        base_transfer_action.create_column(col)
+
+    # extend tasks execution 'type' column:
+    tasks_execution = sqlalchemy.Table(
+        'tasks_execution', meta, autoload=True)
+    tasks_execution.c.type.alter(type=sqlalchemy.String(255))
+
+    tables = []
+
+    # add table for pool lifecycles:
+    tables.append(
+        sqlalchemy.Table(
+            'minion_pool_lifecycle',
+            meta,
+            sqlalchemy.Column(
+                "id", sqlalchemy.String(36),
+                sqlalchemy.ForeignKey('base_transfer_action.base_id'),
+                primary_key=True),
+            sqlalchemy.Column(
+                "pool_name", sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                "pool_os_type", sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                "pool_platform", sqlalchemy.String(255), nullable=True),
+            sqlalchemy.Column(
+                "pool_status", sqlalchemy.String(255), nullable=False,
+                default=lambda: "UNKNOWN"),
+            sqlalchemy.Column(
+                "pool_shared_resources", sqlalchemy.Text, nullable=True),
+            sqlalchemy.Column(
+                'minimum_minions', sqlalchemy.Integer, nullable=False),
+            sqlalchemy.Column(
+                'maximum_minions', sqlalchemy.Integer, nullable=False),
+            sqlalchemy.Column(
+                'minion_max_idle_time', sqlalchemy.Integer, nullable=False),
+            sqlalchemy.Column(
+                'minion_retention_strategy', sqlalchemy.String(255),
+                nullable=False)))
+
+    # declare minion machine table:
+    tables.append(
+        sqlalchemy.Table(
+            'minion_machine',
+            meta,
+            sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                              default=lambda: str(uuid.uuid4())),
+            sqlalchemy.Column(
+                "user_id", sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                "project_id", sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted', sqlalchemy.String(36)),
+            sqlalchemy.Column(
+                'pool_id', sqlalchemy.String(36),
+                sqlalchemy.ForeignKey('minion_pool_lifecycle.id'),
+                nullable=False),
+            sqlalchemy.Column(
+                'allocated_action', sqlalchemy.String(36), nullable=True),
+            sqlalchemy.Column(
+                'status', sqlalchemy.String(255), nullable=False,
+                default=lambda: "UNKNOWN"),
+            sqlalchemy.Column('connection_info', sqlalchemy.Text),
+            sqlalchemy.Column(
+                'backup_writer_connection_info', sqlalchemy.Text,
+                nullable=True),
+            sqlalchemy.Column(
+                'provider_properties', sqlalchemy.Text,
+                nullable=True)))
+
+    for index, table in enumerate(tables):
+        try:
+            table.create()
+        except Exception:
+            # If an error occurs, drop all tables created so far to return
+            # to the previously existing state.
+            meta.drop_all(tables=tables[:index])
+            raise

+ 136 - 1
coriolis/db/sqlalchemy/models.py

@@ -143,7 +143,7 @@ class TasksExecution(BASE, models.TimestampMixin, models.ModelBase,
                              backref=orm.backref('execution'))
                              backref=orm.backref('execution'))
     status = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     status = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     number = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
     number = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
-    type = sqlalchemy.Column(sqlalchemy.String(20))
+    type = sqlalchemy.Column(sqlalchemy.String(255))
 
 
     def to_dict(self):
     def to_dict(self):
         result = {
         result = {
@@ -196,6 +196,12 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
     network_map = sqlalchemy.Column(types.Json, nullable=True)
     network_map = sqlalchemy.Column(types.Json, nullable=True)
     storage_mappings = sqlalchemy.Column(types.Json, nullable=True)
     storage_mappings = sqlalchemy.Column(types.Json, nullable=True)
     source_environment = sqlalchemy.Column(types.Json, nullable=True)
     source_environment = sqlalchemy.Column(types.Json, nullable=True)
+    origin_minion_pool_id = sqlalchemy.Column(
+        sqlalchemy.String(36), nullable=True)
+    destination_minion_pool_id = sqlalchemy.Column(
+        sqlalchemy.String(36), nullable=True)
+    instance_osmorphing_minion_pool_mappings = sqlalchemy.Column(
+        types.Json, nullable=False, default=lambda: {})
 
 
     __mapper_args__ = {
     __mapper_args__ = {
         'polymorphic_identity': 'base_transfer_action',
         'polymorphic_identity': 'base_transfer_action',
@@ -224,6 +230,10 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
             "updated_at": self.updated_at,
             "updated_at": self.updated_at,
             "deleted_at": self.deleted_at,
             "deleted_at": self.deleted_at,
             "deleted": self.deleted,
             "deleted": self.deleted,
+            "origin_minion_pool_id": self.origin_minion_pool_id,
+            "destination_minion_pool_id": self.destination_minion_pool_id,
+            "instance_osmorphing_minion_pool_mappings":
+                self.instance_osmorphing_minion_pool_mappings
         }
         }
         if include_executions:
         if include_executions:
             for ex in self.executions:
             for ex in self.executions:
@@ -436,3 +446,128 @@ class ReplicaSchedule(BASE, models.TimestampMixin, models.ModelBase,
     shutdown_instance = sqlalchemy.Column(
     shutdown_instance = sqlalchemy.Column(
         sqlalchemy.Boolean, nullable=False, default=False)
         sqlalchemy.Boolean, nullable=False, default=False)
     trust_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
     trust_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+
+
+class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
+                    models.SoftDeleteMixin):
+    __tablename__ = "minion_machine"
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+    project_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
+
+    pool_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('minion_pool_lifecycle.id'),
+        nullable=False)
+
+    status = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False,
+        default=lambda: constants.MINION_MACHINE_STATUS_UNKNOWN)
+
+    allocated_action = sqlalchemy.Column(
+        sqlalchemy.String(36), nullable=True)
+
+    connection_info = sqlalchemy.Column(
+        types.Json, nullable=True)
+
+    backup_writer_connection_info = sqlalchemy.Column(
+        types.Json, nullable=True)
+
+    provider_properties = sqlalchemy.Column(
+        types.Json, nullable=True)
+
+    def to_dict(self):
+        result = {
+            "id": self.id,
+            "user_id": self.user_id,
+            "project_id": self.project_id,
+            "created_at": self.created_at,
+            "updated_at": self.updated_at,
+            "deleted_at": self.deleted_at,
+            "deleted": self.deleted,
+            "pool_id": self.pool_id,
+            "status": self.status,
+            "connection_info": self.connection_info,
+            "allocated_action": self.allocated_action,
+            "backup_writer_connection_info": (
+                self.backup_writer_connection_info),
+            "provider_properties": self.provider_properties
+        }
+        return result
+
+
+class MinionPoolLifecycle(BaseTransferAction):
+    # TODO(aznashwan): this class inherits numerous redundant fields from
+    # BaseTransferAction. Ideally, the upper hirearchy should be split into a
+    # BaseAction, and a separate inheriting BaseTransferAction.
+    __tablename__ = 'minion_pool_lifecycle'
+
+    id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey(
+            'base_transfer_action.base_id'),
+        primary_key=True)
+
+    pool_name = sqlalchemy.Column(
+        sqlalchemy.String(255),
+        nullable=False)
+    pool_os_type = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False)
+    pool_platform = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False)
+    pool_status = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False,
+        default=lambda: constants.MINION_POOL_STATUS_UNKNOWN)
+    pool_shared_resources = sqlalchemy.Column(
+        types.Json, nullable=True)
+    minimum_minions = sqlalchemy.Column(
+        sqlalchemy.Integer, nullable=False)
+    maximum_minions = sqlalchemy.Column(
+        sqlalchemy.Integer, nullable=False)
+    minion_max_idle_time = sqlalchemy.Column(
+        sqlalchemy.Integer, nullable=False)
+    minion_retention_strategy = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False)
+    minion_machines = orm.relationship(
+        MinionMachine, backref=orm.backref('minion_pool'),
+        primaryjoin="and_(MinionMachine.pool_id==MinionPoolLifecycle.id, "
+                    "MinionMachine.deleted=='0')")
+
+    __mapper_args__ = {
+        'polymorphic_identity': 'minion_pool_lifecycle'}
+
+    def to_dict(
+            self, include_info=True, include_machines=True,
+            include_executions=True):
+        base = super(MinionPoolLifecycle, self).to_dict(
+            include_info=include_info, include_executions=include_executions)
+        base.update({
+            "id": self.id,
+            "pool_name": self.pool_name,
+            "pool_os_type": self.pool_os_type,
+            "pool_platform": self.pool_platform,
+            "pool_shared_resources": self.pool_shared_resources,
+            "pool_status": self.pool_status,
+            "minimum_minions": self.minimum_minions,
+            "maximum_minions": self.maximum_minions,
+            "minion_max_idle_time": self.minion_max_idle_time,
+            "minion_retention_strategy": self.minion_retention_strategy})
+        base["minion_machines"] = []
+        if include_machines:
+            base["minion_machines"] = [
+                machine.to_dict() for machine in self.minion_machines]
+        # TODO(aznashwan): these nits should be avoided by splitting the
+        # BaseTransferAction class into a more specialized hireachy:
+        redundancies = {
+            "environment_options": [
+                "source_environment", "destination_environment"],
+            "endpoint_id": [
+                "origin_endpoint_id", "destination_endpoint_id"]}
+        for new_key, old_keys in redundancies.items():
+            for old_key in old_keys:
+                if old_key in base:
+                    base[new_key] = base.pop(old_key)
+        return base

+ 0 - 0
coriolis/endpoint_minion_pool_options/__init__.py


+ 19 - 0
coriolis/endpoint_minion_pool_options/api.py

@@ -0,0 +1,19 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def get_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, env=None, option_names=None):
+        return self._rpc_client.get_endpoint_source_minion_pool_options(
+            ctxt, endpoint_id, env, option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, env=None, option_names=None):
+        return self._rpc_client.get_endpoint_destination_minion_pool_options(
+            ctxt, endpoint_id, env, option_names)

+ 13 - 0
coriolis/endpoints/api.py

@@ -41,3 +41,16 @@ class API(object):
     def validate_source_environment(self, ctxt, endpoint_id, source_env):
     def validate_source_environment(self, ctxt, endpoint_id, source_env):
         return self._rpc_client.validate_endpoint_source_environment(
         return self._rpc_client.validate_endpoint_source_environment(
             ctxt, endpoint_id, source_env)
             ctxt, endpoint_id, source_env)
+
+    @utils.bad_request_on_error("Invalid source minion pool environment: %s")
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._rpc_client.validate_endpoint_source_minion_pool_options(
+            ctxt, endpoint_id, pool_environment)
+
+    @utils.bad_request_on_error(
+        "Invalid destination minion pool environment: %s")
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._rpc_client.validate_endpoint_destination_minion_pool_options(
+            ctxt, endpoint_id, pool_environment)

+ 13 - 0
coriolis/exception.py

@@ -149,6 +149,14 @@ class Invalid(CoriolisException):
     safe = True
     safe = True
 
 
 
 
+class InvalidMinionPoolSelection(Invalid):
+    message = _("The selected minion pool is incompatible.")
+
+
+class MinionMachineAllocationFailure(Invalid):
+    message = _("No minion machines were available for allocation")
+
+
 class InvalidCustomOSDetectTools(Invalid):
 class InvalidCustomOSDetectTools(Invalid):
     message = _("The provided custom OS detect tools are invalid.")
     message = _("The provided custom OS detect tools are invalid.")
 
 
@@ -202,6 +210,11 @@ class InvalidTaskState(Invalid):
         'Task "%(task_id)s" in in an invalid state: %(task_state)s')
         'Task "%(task_id)s" in in an invalid state: %(task_state)s')
 
 
 
 
+class InvalidMinionPoolState(Invalid):
+    message = _(
+        'Minion pool "%(pool_id)s" in in an invalid state: %(pool_state)s')
+
+
 class TaskIsCancelling(InvalidTaskState):
 class TaskIsCancelling(InvalidTaskState):
     message = _(TASK_ALREADY_CANCELLING_EXCEPTION_FMT)
     message = _(TASK_ALREADY_CANCELLING_EXCEPTION_FMT)
 
 

+ 19 - 10
coriolis/migrations/api.py

@@ -9,24 +9,33 @@ class API(object):
         self._rpc_client = rpc_client.ConductorClient()
         self._rpc_client = rpc_client.ConductorClient()
 
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
     def migrate_instances(self, ctxt, origin_endpoint_id,
-                          destination_endpoint_id, source_environment,
-                          destination_environment, instances, network_map,
-                          storage_mappings, replication_count,
+                          destination_endpoint_id, origin_minion_pool_id,
+                          destination_minion_pool_id,
+                          instance_osmorphing_minion_pool_mappings,
+                          source_environment, destination_environment,
+                          instances, network_map, storage_mappings,
+                          replication_count,
                           shutdown_instances, notes=None,
                           shutdown_instances, notes=None,
                           skip_os_morphing=False, user_scripts=None):
                           skip_os_morphing=False, user_scripts=None):
         return self._rpc_client.migrate_instances(
         return self._rpc_client.migrate_instances(
             ctxt, origin_endpoint_id, destination_endpoint_id,
             ctxt, origin_endpoint_id, destination_endpoint_id,
-            source_environment, destination_environment, instances,
-            network_map, storage_mappings,
-            replication_count, shutdown_instances=shutdown_instances,
+            origin_minion_pool_id, destination_minion_pool_id,
+            instance_osmorphing_minion_pool_mappings, source_environment,
+            destination_environment, instances, network_map,
+            storage_mappings, replication_count,
+            shutdown_instances=shutdown_instances,
             notes=notes, skip_os_morphing=skip_os_morphing,
             notes=notes, skip_os_morphing=skip_os_morphing,
             user_scripts=user_scripts)
             user_scripts=user_scripts)
 
 
-    def deploy_replica_instances(self, ctxt, replica_id, clone_disks=False,
-                                 force=False, skip_os_morphing=False,
-                                 user_scripts=None):
+    def deploy_replica_instances(self, ctxt, replica_id,
+                                 instance_osmorphing_minion_pool_mappings,
+                                 clone_disks=False, force=False,
+                                 skip_os_morphing=False, user_scripts=None):
         return self._rpc_client.deploy_replica_instances(
         return self._rpc_client.deploy_replica_instances(
-            ctxt, replica_id, clone_disks, force, skip_os_morphing,
+            ctxt, replica_id, instance_osmorphing_minion_pool_mappings=(
+                instance_osmorphing_minion_pool_mappings),
+            clone_disks=clone_disks, force=force,
+            skip_os_morphing=skip_os_morphing,
             user_scripts=user_scripts)
             user_scripts=user_scripts)
 
 
     def delete(self, ctxt, migration_id):
     def delete(self, ctxt, migration_id):

+ 0 - 0
coriolis/minion_pool_tasks_executions/__init__.py


+ 26 - 0
coriolis/minion_pool_tasks_executions/api.py

@@ -0,0 +1,26 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis import utils
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def list(self, ctxt, minion_pool_id, include_tasks=False):
+        return self._rpc_client.get_minion_pool_lifecycle_executions(
+            ctxt, minion_pool_id, include_tasks=include_tasks)
+
+    def get(self, ctxt, minion_pool_id, execution_id):
+        return self._rpc_client.get_minion_pool_lifecycle_execution(
+            ctxt, minion_pool_id, execution_id)
+
+    def cancel(self, ctxt, minion_pool_id, execution_id, force):
+        return self._rpc_client.cancel_minion_pool_lifecycle_execution(
+            ctxt, minion_pool_id, execution_id, force)
+
+    def delete(self, ctxt, minion_pool_id, execution_id):
+        return self._rpc_client.delete_minion_pool_lifecycle_execution(
+            ctxt, minion_pool_id, execution_id)

+ 0 - 0
coriolis/minion_pools/__init__.py


+ 49 - 0
coriolis/minion_pools/api.py

@@ -0,0 +1,49 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis import utils
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def create(
+            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=None):
+        return self._rpc_client.create_minion_pool(
+            ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=notes)
+
+    def update(self, ctxt, minion_pool_id, updated_values):
+        return self._rpc_client.update_minion_pool(
+            ctxt, minion_pool_id, updated_values=updated_values)
+
+    def delete(self, ctxt, minion_pool_id):
+        self._rpc_client.delete_minion_pool(ctxt, minion_pool_id)
+
+    def get_minion_pools(self, ctxt):
+        return self._rpc_client.get_minion_pools(ctxt)
+
+    def get_minion_pool(self, ctxt, minion_pool_id):
+        return self._rpc_client.get_minion_pool(ctxt, minion_pool_id)
+
+    def set_up_shared_pool_resources(self, ctxt, minion_pool_id):
+        return self._rpc_client.set_up_shared_minion_pool_resources(
+            ctxt, minion_pool_id)
+
+    def tear_down_shared_pool_resources(
+            self, ctxt, minion_pool_id, force=False):
+        return self._rpc_client.tear_down_shared_minion_pool_resources(
+            ctxt, minion_pool_id, force=force)
+
+    def allocate_machines(self, ctxt, minion_pool_id):
+        return self._rpc_client.allocate_minion_pool_machines(
+            ctxt, minion_pool_id)
+
+    def deallocate_machines(self, ctxt, minion_pool_id, force=False):
+        return self._rpc_client.deallocate_minion_pool_machines(
+            ctxt, minion_pool_id, force=force)

+ 24 - 1
coriolis/policies/endpoints.py

@@ -149,7 +149,30 @@ ENDPOINTS_POLICY_DEFAULT_RULES = [
                 "method": "GET"
                 "method": "GET"
             }
             }
         ]
         ]
-    )
+    ),
+    policy.DocumentedRuleDefault(
+        get_endpoints_policy_label('list_source_minion_pool_options'),
+        ENDPOINTS_POLICY_DEFAULT_RULE,
+        "List available source minion pool options for endpoint",
+        [
+            {
+                "path": "/endpoint/{endpoint_id}/source-minion-pool-options",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_endpoints_policy_label('list_destination_minion_pool_options'),
+        ENDPOINTS_POLICY_DEFAULT_RULE,
+        "List available destination pool options for endpoint",
+        [
+            {
+                "path": (
+                    "/endpoint/{endpoint_id}/destination-minion-pool-options"),
+                "method": "GET"
+            }
+        ]
+    ),
 ]
 ]
 
 
 
 

+ 83 - 0
coriolis/policies/minion_pool_tasks_executions.py

@@ -0,0 +1,83 @@
+# Copyright 2018 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_policy import policy
+
+from coriolis.policies import base
+
+
+MINION_POOL_EXECUTIONS_POLICY_PREFIX = "%s:minion_pool_executions" % (
+    base.CORIOLIS_POLICIES_PREFIX)
+MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULE = "rule:admin_or_owner"
+
+
+def get_minion_pool_executions_policy_label(rule_label):
+    return "%s:%s" % (
+        MINION_POOL_EXECUTIONS_POLICY_PREFIX, rule_label)
+
+
+MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULES = [
+    policy.DocumentedRuleDefault(
+        get_minion_pool_executions_policy_label('create'),
+        MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULE,
+        "Create a new execution for a given Minion Pool",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/executions",
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pool_executions_policy_label('list'),
+        MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULE,
+        "List Executions for a given Minion Pool",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/executions",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pool_executions_policy_label('show'),
+        MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULE,
+        "Show details for Minion Pool execution",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/executions/{execution_id}",
+                "method": "GET"
+            }
+        ]
+    ),
+    # TODO(aznashwan): minion pool execution actions should ideally be
+    # declared in a separate module
+    policy.DocumentedRuleDefault(
+        get_minion_pool_executions_policy_label('cancel'),
+        MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULE,
+        "Cancel a Minion Pool execution",
+        [
+            {
+                "path": (
+                    "/minion_pools/{minion_pool_id}/executions/"
+                    "{execution_id}/actions"),
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pool_executions_policy_label('delete'),
+        MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULE,
+        "Delete an execution for a given Minion Pool",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/executions/{execution_id}",
+                "method": "DELETE"
+            }
+        ]
+    )
+]
+
+
+def list_rules():
+    return MINION_POOL_EXECUTIONS_POLICY_DEFAULT_RULES

+ 123 - 0
coriolis/policies/minion_pools.py

@@ -0,0 +1,123 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+
+from oslo_policy import policy
+
+from coriolis.policies import base
+
+
+MINION_POOLS_POLICY_PREFIX = "%s:minion_pools" % base.CORIOLIS_POLICIES_PREFIX
+MINION_POOLS_DEFAULT_RULE = "rule:admin_or_owner"
+
+
+def get_minion_pools_policy_label(rule_label):
+    return "%s:%s" % (
+        MINION_POOLS_POLICY_PREFIX, rule_label)
+
+
+MINION_POOLS_DEFAULT_RULES = [
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('create'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Create a minion pool",
+        [
+            {
+                "path": "/minion_pools",
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('list'),
+        MINION_POOLS_DEFAULT_RULE,
+        "List minion_pools",
+        [
+            {
+                "path": "/minion_pools",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('show'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Show details for minion pool",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('update'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Update details for minion pool",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}",
+                "method": "PUT"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('delete'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Delete minion pool",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}",
+                "method": "DELETE"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('set_up_shared_resources'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Set up shared minion pool resources",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/actions",
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('tear_down_shared_resources'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Tear down shared minion pool resources",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/actions",
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('allocate_machines'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Allocate Minion Pool machines",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/actions",
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('deallocate_machines'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Deallocate Minion Pool machines",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/actions",
+                "method": "POST"
+            }
+        ]
+    )
+]
+
+
+def list_rules():
+    return MINION_POOLS_DEFAULT_RULES

+ 4 - 1
coriolis/policy.py

@@ -14,6 +14,8 @@ from coriolis.policies import diagnostics
 from coriolis.policies import endpoints
 from coriolis.policies import endpoints
 from coriolis.policies import general
 from coriolis.policies import general
 from coriolis.policies import migrations
 from coriolis.policies import migrations
+from coriolis.policies import minion_pools
+from coriolis.policies import minion_pool_tasks_executions
 from coriolis.policies import regions
 from coriolis.policies import regions
 from coriolis.policies import replicas
 from coriolis.policies import replicas
 from coriolis.policies import replica_schedules
 from coriolis.policies import replica_schedules
@@ -28,7 +30,8 @@ _ENFORCER = None
 
 
 DEFAULT_POLICIES_MODULES = [
 DEFAULT_POLICIES_MODULES = [
     base, endpoints, general, migrations, replicas, replica_schedules,
     base, endpoints, general, migrations, replicas, replica_schedules,
-    replica_tasks_executions, diagnostics, regions, services]
+    replica_tasks_executions, diagnostics, regions, services, minion_pools,
+    minion_pool_tasks_executions]
 
 
 
 
 def reset():
 def reset():

+ 109 - 0
coriolis/providers/base.py

@@ -532,3 +532,112 @@ class BaseUpdateDestinationReplicaProvider(
         having been executed or the replica disks having been deleted, this
         having been executed or the replica disks having been deleted, this
         method should simply return the empty `volumes_info` it was given.
         method should simply return the empty `volumes_info` it was given.
         """
         """
+
+
+class _BaseMinionPoolProvider(
+        object, with_metaclass(abc.ABCMeta)):
+    """ Class for providers which offer Minion Pool management functionality.
+    """
+
+    @abc.abstractmethod
+    def get_minion_pool_environment_schema(self):
+        """ Returns the schema for the minion pool options. """
+        pass
+
+    @abc.abstractmethod
+    def get_minion_pool_options(
+            self, ctxt, connection_info, env=None, option_names=None):
+        """ Returns possible environment options for minion pools. """
+        pass
+
+    @abc.abstractmethod
+    def validate_minion_compatibility_for_transfer(
+            self, ctxt, connection_info, export_info, environment_options,
+            minion_properties):
+        """ Validates compatibility between the pool's options and the options
+        selected for a given transfer. Should raise if any options related to
+        the minions in the pool might be deemed incompatible with the desited
+        transfer options.
+        """
+        pass
+
+    @abc.abstractmethod
+    def validate_minion_pool_environment_options(
+            self, ctxt, connection_info, environment_options):
+        """ Validates the provided pool options. """
+        pass
+
+    @abc.abstractmethod
+    def set_up_pool_shared_resources(
+            self, ctxt, connection_info, environment_options, pool_identifier):
+        """ Sets up supporting resources which can be re-used amongst the
+        machines which will be spawned within the pool (e.g. a shared network)
+        """
+        pass
+
+    @abc.abstractmethod
+    def tear_down_pool_shared_resources(
+            self, ctxt, connection_info, environment_options,
+            pool_shared_resources):
+        """ Tears down all pool supporting resources. """
+        pass
+
+    @abc.abstractmethod
+    def create_minion(
+            self, ctxt, connection_info, environment_options,
+            pool_identifier, pool_os_type, pool_shared_resources,
+            new_minion_identifier):
+        pass
+
+    @abc.abstractmethod
+    def delete_minion(
+            self, ctxt, connection_info, minion_properties):
+        pass
+
+    @abc.abstractmethod
+    def shutdown_minion(
+            self, ctxt, connection_info, minion_properties):
+        pass
+
+    @abc.abstractmethod
+    def start_minion(
+            self, ctxt, connection_info, minion_properties):
+        pass
+
+    @abc.abstractmethod
+    def attach_volumes_to_minion(
+            self, ctxt, connection_info, minion_properties, volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def detach_volumes_from_minion(
+            self, ctxt, connection_info, minion_properties, volumes_info):
+        pass
+
+
+class BaseSourceMinionPoolProvider(_BaseMinionPoolProvider):
+
+    pass
+
+
+class BaseDestinationMinionPoolProvider(_BaseMinionPoolProvider):
+
+    @abc.abstractmethod
+    def validate_osmorphing_minion_compatibility_for_transfer(
+            self, ctxt, connection_info, export_info, environment_options,
+            minion_properties):
+        """ Validates compatibility between the OSMorphing pool's options and
+        the options selected for a given transfer. Should raise if any options
+        of the minions in the pool might be deemed incompatible with the
+        desired transfer options.
+        """
+        pass
+
+    @abc.abstractmethod
+    def get_additional_os_morphing_info(
+            self, ctxt, connection_info, target_environment,
+            instance_deployment_info):
+        """ This method should return any additional 'osmorphing_info'
+        as defined in coriolis.schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA
+        """
+        pass

+ 5 - 1
coriolis/providers/factory.py

@@ -49,7 +49,11 @@ PROVIDER_TYPE_MAP = {
     constants.PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE: (
     constants.PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE: (
         base.BaseUpdateDestinationReplicaProvider),
         base.BaseUpdateDestinationReplicaProvider),
     constants.PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS: (
     constants.PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS: (
-        base.BaseEndpointSourceOptionsProvider)
+        base.BaseEndpointSourceOptionsProvider),
+    constants.PROVIDER_TYPE_SOURCE_MINION_POOL: (
+        base.BaseSourceMinionPoolProvider),
+    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL: (
+        base.BaseDestinationMinionPoolProvider)
 }
 }
 
 
 
 

+ 4 - 0
coriolis/replicas/api.py

@@ -9,10 +9,14 @@ class API(object):
         self._rpc_client = rpc_client.ConductorClient()
         self._rpc_client = rpc_client.ConductorClient()
 
 
     def create(self, ctxt, origin_endpoint_id, destination_endpoint_id,
     def create(self, ctxt, origin_endpoint_id, destination_endpoint_id,
+               origin_minion_pool_id, destination_minion_pool_id,
+               instance_osmorphing_minion_pool_mappings,
                source_environment, destination_environment, instances,
                source_environment, destination_environment, instances,
                network_map, storage_mappings, notes=None):
                network_map, storage_mappings, notes=None):
         return self._rpc_client.create_instances_replica(
         return self._rpc_client.create_instances_replica(
             ctxt, origin_endpoint_id, destination_endpoint_id,
             ctxt, origin_endpoint_id, destination_endpoint_id,
+            origin_minion_pool_id, destination_minion_pool_id,
+            instance_osmorphing_minion_pool_mappings,
             source_environment, destination_environment, instances,
             source_environment, destination_environment, instances,
             network_map, storage_mappings, notes)
             network_map, storage_mappings, notes)
 
 

+ 5 - 1
coriolis/schemas.py

@@ -10,7 +10,7 @@ import jsonschema
 from oslo_log import log as logging
 from oslo_log import log as logging
 
 
 from coriolis import exception
 from coriolis import exception
-from coriolis import utils 
+from coriolis import utils
 
 
 
 
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
@@ -22,6 +22,10 @@ PROVIDER_CONNECTION_INFO_SCHEMA_NAME = "connection_info_schema.json"
 
 
 PROVIDER_TARGET_ENVIRONMENT_SCHEMA_NAME = "target_environment_schema.json"
 PROVIDER_TARGET_ENVIRONMENT_SCHEMA_NAME = "target_environment_schema.json"
 PROVIDER_SOURCE_ENVIRONMENT_SCHEMA_NAME = "source_environment_schema.json"
 PROVIDER_SOURCE_ENVIRONMENT_SCHEMA_NAME = "source_environment_schema.json"
+PROVIDER_SOURCE_MINION_POOL_ENVIRONMENT_SCHEMA_NAME = (
+    "source_minion_pool_environment_schema.json")
+PROVIDER_DESTINATION_MINION_POOL_ENVIRONMENT_SCHEMA_NAME = (
+    "destination_minion_pool_environment_schema.json")
 
 
 _CORIOLIS_VM_EXPORT_INFO_SCHEMA_NAME = "vm_export_info_schema.json"
 _CORIOLIS_VM_EXPORT_INFO_SCHEMA_NAME = "vm_export_info_schema.json"
 _CORIOLIS_VM_INSTANCE_INFO_SCHEMA_NAME = "vm_instance_info_schema.json"
 _CORIOLIS_VM_INSTANCE_INFO_SCHEMA_NAME = "vm_instance_info_schema.json"

+ 15 - 5
coriolis/tasks/base.py

@@ -56,14 +56,18 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
         """ Returns a list of the string fields which are required
         """ Returns a list of the string fields which are required
         to be present during the tasks' run method. """
         to be present during the tasks' run method. """
-        pass
+        raise NotImplementedError(
+            "No required task info properties specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractclassmethod
     @abc.abstractclassmethod
     def get_returned_task_info_properties(cls):
     def get_returned_task_info_properties(cls):
         """ Returns a list of the string fields which are returned by the
         """ Returns a list of the string fields which are returned by the
         tasks' run method to be added to the task info.
         tasks' run method to be added to the task info.
         """
         """
-        pass
+        raise NotImplementedError(
+            "No returned task info properties specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractclassmethod
     @abc.abstractclassmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
@@ -71,14 +75,18 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
         of all the provider types (constants.PROVIDER_TYPE_*) required for the
         of all the provider types (constants.PROVIDER_TYPE_*) required for the
         task.
         task.
         """
         """
-        pass
+        raise NotImplementedError(
+            "No required provider types specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractclassmethod
     @abc.abstractclassmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
         """ Returns whether the task operates on the source platform, the
         """ Returns whether the task operates on the source platform, the
         destination, or both. (constants.TASK_PLATFORM_*)
         destination, or both. (constants.TASK_PLATFORM_*)
         """
         """
-        pass
+        raise NotImplementedError(
+            "No required platform specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractmethod
     @abc.abstractmethod
     def _run(self, ctxt, instance, origin, destination, task_info,
     def _run(self, ctxt, instance, origin, destination, task_info,
@@ -88,7 +96,9 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
         'self.get_returned_task_info_properties'.
         'self.get_returned_task_info_properties'.
         Must be implemented in all child classes.
         Must be implemented in all child classes.
         """
         """
-        pass
+        raise NotImplementedError(
+            "No base run method implemented for task class of type '%s'." % (
+                self.__class__))
 
 
     def run(self, ctxt, instance, origin, destination, task_info,
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):

+ 48 - 1
coriolis/tasks/factory.py

@@ -4,6 +4,7 @@
 from coriolis import constants
 from coriolis import constants
 from coriolis import exception
 from coriolis import exception
 from coriolis.tasks import migration_tasks
 from coriolis.tasks import migration_tasks
+from coriolis.tasks import minion_pool_tasks
 from coriolis.tasks import osmorphing_tasks
 from coriolis.tasks import osmorphing_tasks
 from coriolis.tasks import replica_tasks
 from coriolis.tasks import replica_tasks
 
 
@@ -81,7 +82,53 @@ _TASKS_MAP = {
     constants.TASK_TYPE_UPDATE_SOURCE_REPLICA:
     constants.TASK_TYPE_UPDATE_SOURCE_REPLICA:
         replica_tasks.UpdateSourceReplicaTask,
         replica_tasks.UpdateSourceReplicaTask,
     constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
     constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
-        replica_tasks.UpdateDestinationReplicaTask
+        replica_tasks.UpdateDestinationReplicaTask,
+    constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS:
+        minion_pool_tasks.ValidateSourceMinionPoolOptionsTask,
+    constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS:
+        minion_pool_tasks.ValidateDestinationMinionPoolOptionsTask,
+    constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE:
+        minion_pool_tasks.CreateSourceMinionMachineTask,
+    constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE:
+        minion_pool_tasks.CreateDestinationMinionMachineTask,
+    constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE:
+        minion_pool_tasks.DeleteSourceMinionMachineTask,
+    constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE:
+        minion_pool_tasks.DeleteDestinationMinionMachineTask,
+    constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.SetUpSourcePoolSupportingResourcesTask,
+    constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.SetUpDestinationPoolSupportingResources,
+    constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.TearDownSourcePoolSupportingResourcesTask,
+    constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.TearDownDestinationPoolSupportingResources,
+    constants.TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION:
+        minion_pool_tasks.AttachVolumesToSourceMinionTask,
+    constants.TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION:
+        minion_pool_tasks.DetachVolumesFromSourceMinionTask,
+    constants.TASK_TYPE_ATTACH_VOLUMES_TO_DESTINATION_MINION:
+        minion_pool_tasks.AttachVolumesToDestinationMinionTask,
+    constants.TASK_TYPE_DETACH_VOLUMES_FROM_DESTINATION_MINION:
+        minion_pool_tasks.DetachVolumesFromDestinationMinionTask,
+    constants.TASK_TYPE_ATTACH_VOLUMES_TO_OSMORPHING_MINION:
+        minion_pool_tasks.AttachVolumesToOSMorphingMinionTask,
+    constants.TASK_TYPE_DETACH_VOLUMES_FROM_OSMORPHING_MINION:
+        minion_pool_tasks.DetachVolumesFromOSMorphingMinionTask,
+    constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_COMPATIBILITY:
+        minion_pool_tasks.ValidateSourceMinionCompatibilityTask,
+    constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_COMPATIBILITY:
+        minion_pool_tasks.ValidateDestinationMinionCompatibilityTask,
+    constants.TASK_TYPE_VALIDATE_OSMORPHING_MINION_POOL_COMPATIBILITY:
+        minion_pool_tasks.ValidateOSMorphingMinionCompatibilityTask,
+    constants.TASK_TYPE_RELEASE_SOURCE_MINION:
+        minion_pool_tasks.ReleaseSourceMinionTask,
+    constants.TASK_TYPE_RELEASE_DESTINATION_MINION:
+        minion_pool_tasks.ReleaseDestinationMinionTask,
+    constants.TASK_TYPE_RELEASE_OSMORPHING_MINION:
+        minion_pool_tasks.ReleaseOSMorphingMinionTask,
+    constants.TASK_TYPE_COLLECT_OSMORPHING_INFO:
+        minion_pool_tasks.CollectOSMorphingInfoTask
 }
 }
 
 
 
 

+ 800 - 0
coriolis/tasks/minion_pool_tasks.py

@@ -0,0 +1,800 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+
+from coriolis import constants
+from coriolis import events
+from coriolis import exception
+from coriolis.providers import factory as providers_factory
+from coriolis.tasks import base
+
+
+LOG = logging.getLogger(__name__)
+
+
+SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS = {
+    # NOTE: these redundancies are in place so as to have the
+    # 'Release*' task classes clear these fields after they run:
+    "source_minion_machine_id": "source_minion_machine_id",
+    "source_minion_provider_properties": "source_resources",
+    "source_minion_connection_info": "source_resources_connection_info"}
+TARGET_MINION_TASK_INFO_FIELD_MAPPINGS = {
+    "target_minion_machine_id": "target_minion_machine_id",
+    "target_minion_provider_properties": "target_resources",
+    "target_minion_backup_writer_connection_info": (
+        "target_resources_connection_info")}
+OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS = {
+    "osmorphing_minion_machine_id": "osmorphing_minion_machine_id",
+    "osmorphing_minion_provider_properties": "os_morphing_resources",
+    "osmorphing_minion_connection_info": "osmorphing_connection_info"}
+
+
+def _get_required_minion_pool_provider_types_for_platform(
+        platform_type):
+    provider_type = None
+    if platform_type == constants.PROVIDER_PLATFORM_SOURCE:
+        provider_type = constants.PROVIDER_TYPE_SOURCE_MINION_POOL
+    elif platform_type == constants.PROVIDER_PLATFORM_DESTINATION:
+        provider_type = constants.PROVIDER_TYPE_DESTINATION_MINION_POOL
+    else:
+        raise NotImplementedError(
+            "Cannot determine required minion pool provider type for "
+            "platform of type '%s'" % platform_type)
+    return {
+        platform_type: [provider_type]}
+
+
+class _BaseValidateMinionPoolOptionsTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No Minion pool options validation platform specified.")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["pool_environment_options"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return []
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    def _run(self, ctxt, minion_pool_machine_id, origin, destination,
+             task_info, event_handler):
+
+        # NOTE: both origin or target endpoints would work:
+        connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            destination["type"], provider_type, event_handler)
+
+        environment_options = task_info['pool_environment_options']
+        provider.validate_minion_pool_environment_options(
+            ctxt, connection_info, environment_options)
+
+        return {}
+
+
+class ValidateSourceMinionPoolOptionsTask(_BaseValidateMinionPoolOptionsTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class ValidateDestinationMinionPoolOptionsTask(
+        _BaseValidateMinionPoolOptionsTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseCreateMinionMachineTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion pool creation required platform specified.")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return [
+            "pool_environment_options", "pool_shared_resources",
+            "pool_identifier", "pool_os_type"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return [
+            "minion_provider_properties", "minion_connection_info",
+            "minion_backup_writer_connection_info"]
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    def _run(self, ctxt, minion_pool_machine_id, origin, destination,
+             task_info, event_handler):
+
+        # NOTE: both origin or target endpoints would work:
+        connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            destination["type"], provider_type, event_handler)
+
+        pool_identifier = task_info['pool_identifier']
+        environment_options = task_info['pool_environment_options']
+        pool_shared_resources = task_info['pool_shared_resources']
+        pool_os_type = task_info["pool_os_type"]
+        minion_properties = provider.create_minion(
+            ctxt, connection_info, environment_options, pool_identifier,
+            pool_os_type, pool_shared_resources, minion_pool_machine_id)
+
+        missing = [
+            key for key in [
+                "connection_info", "minion_provider_properties",
+                "backup_writer_connection_info"]
+            if key not in minion_properties]
+        if missing:
+            LOG.warn(
+                "Provider of type '%s' failed to return the following minion "
+                "property keys: %s. Allowing run to completion for later "
+                "cleanup.")
+
+        minion_connection_info = {}
+        if 'connection_info' in minion_properties:
+            minion_connection_info = base.marshal_migr_conn_info(
+                minion_properties['connection_info'])
+        minion_backup_writer_conn = {}
+        if 'backup_writer_connection_info' in minion_properties:
+            minion_backup_writer_conn = minion_properties[
+                'backup_writer_connection_info']
+            if 'connection_details' in minion_backup_writer_conn:
+                minion_backup_writer_conn['connection_details'] = (
+                    base.marshal_migr_conn_info(
+                        minion_backup_writer_conn['connection_details']))
+
+        return {
+            "minion_connection_info": minion_connection_info,
+            "minion_backup_writer_connection_info": (
+                minion_backup_writer_conn),
+            "minion_provider_properties": minion_properties.get(
+                "minion_provider_properties")}
+
+
+class CreateSourceMinionMachineTask(_BaseCreateMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class CreateDestinationMinionMachineTask(_BaseCreateMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseDeleteMinionMachineTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion deletion required platform specified.")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["minion_provider_properties"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return []
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    def _run(self, ctxt, minion_pool_machine_id, origin, destination,
+             task_info, event_handler):
+
+        # NOTE: both origin or target endpoints would work:
+        connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            destination["type"], provider_type, event_handler)
+
+        minion_provider_properties = task_info['minion_provider_properties']
+        provider.delete_minion(
+            ctxt, connection_info, minion_provider_properties)
+
+        return {}
+
+
+class DeleteSourceMinionMachineTask(_BaseDeleteMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class DeleteDestinationMinionMachineTask(_BaseDeleteMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseSetUpPoolSupportingResourcesTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No pool shared resource setup required platform specified.")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["pool_environment_options", "pool_identifier"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return ["pool_shared_resources"]
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    def _run(self, ctxt, minion_pool_machine_id, origin, destination,
+             task_info, event_handler):
+
+        # NOTE: both origin or target endpoints would work:
+        connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            destination["type"], provider_type, event_handler)
+
+        pool_identifier = task_info['pool_identifier']
+        environment_options = task_info['pool_environment_options']
+        pool_shared_resources = provider.set_up_pool_shared_resources(
+            ctxt, connection_info, environment_options, pool_identifier)
+
+        return {"pool_shared_resources": pool_shared_resources}
+
+
+class SetUpSourcePoolSupportingResourcesTask(
+        _BaseSetUpPoolSupportingResourcesTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class SetUpDestinationPoolSupportingResources(
+        _BaseSetUpPoolSupportingResourcesTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseTearDownPoolSupportingResourcesTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No pool tear down shared resoures required platform specified.")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["pool_environment_options", "pool_shared_resources"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return ["pool_shared_resources"]
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    def _run(self, ctxt, minion_pool_machine_id, origin, destination,
+             task_info, event_handler):
+
+        # NOTE: both origin or target endpoints would work:
+        connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            destination["type"], provider_type, event_handler)
+
+        environment_options = task_info['pool_environment_options']
+        pool_shared_resources = task_info['pool_shared_resources']
+        provider.tear_down_pool_shared_resources(
+            ctxt, connection_info, environment_options,
+            pool_shared_resources)
+
+        return {"pool_shared_resources": None}
+
+
+class TearDownSourcePoolSupportingResourcesTask(
+        _BaseTearDownPoolSupportingResourcesTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class TearDownDestinationPoolSupportingResources(
+        _BaseTearDownPoolSupportingResourcesTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
+    """ The purposes of the volume attachment tasks are to:
+    1) attach the volumes of the minions
+    2) return any updated properties for the minions if needed
+    """
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion disk attachment platform specified")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        fields = list(cls._get_minion_task_info_field_mappings().keys())
+        fields.append("volumes_info")
+        return fields
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        fields = list(cls._get_minion_task_info_field_mappings().values())
+        fields.append(cls._get_minion_properties_task_info_field())
+        fields.append("volumes_info")
+        return fields
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        raise NotImplementedError(
+            "No minion disk attachment task info field specified.")
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        raise NotImplementedError(
+            "No minion disk attachment provider operation specified.")
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        raise NotImplementedError(
+            "No minion task info field mappings provided.")
+
+    def _run(self, ctxt, instance, origin, destination,
+             task_info, event_handler):
+
+        platform_to_target = None
+        required_platform = self.get_required_platform()
+        if required_platform == constants.TASK_PLATFORM_SOURCE:
+            platform_to_target = origin
+        elif required_platform == constants.TASK_PLATFORM_DESTINATION:
+            platform_to_target = destination
+        else:
+            raise NotImplementedError(
+                "Unknown minion pool disk operation platform '%s'" % (
+                    required_platform))
+
+        connection_info = base.get_connection_info(ctxt, platform_to_target)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            platform_to_target["type"], provider_type, event_handler)
+
+        volumes_info = task_info["volumes_info"]
+        minion_properties = task_info[
+            self._get_minion_properties_task_info_field()]
+        res = self._get_provider_disk_operation(provider)(
+            ctxt, connection_info, minion_properties, volumes_info)
+
+        missing_result_props = [
+            prop for prop in ["volumes_info", "minion_properties"]
+            if prop not in res]
+        if missing_result_props:
+            raise exception.CoriolisException(
+                "The following properties were missing from minion disk "
+                "operation '%s' from platform '%s'." % (
+                    self._get_provider_disk_operation.__name__,
+                    platform_to_target))
+
+        field_name_map = self._get_minion_task_info_field_mappings()
+        result = {
+            "volumes_info": res['volumes_info'],
+            self._get_minion_properties_task_info_field(): res[
+                "minion_properties"],
+            field_name_map[
+                self._get_minion_properties_task_info_field()]: res[
+                    "minion_properties"]}
+
+        result.update({
+            field_name_map[field]: task_info[field]
+            for field in field_name_map
+            if field_name_map[field] not in result})
+
+        return result
+
+
+class AttachVolumesToSourceMinionTask(_BaseVolumesMinionMachineAttachmentTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        return "source_minion_provider_properties"
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.attach_volumes_to_minion
+
+
+class DetachVolumesFromSourceMinionTask(AttachVolumesToSourceMinionTask):
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.detach_volumes_from_minion
+
+
+class AttachVolumesToDestinationMinionTask(_BaseVolumesMinionMachineAttachmentTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        return "target_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.attach_volumes_to_minion
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return TARGET_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class DetachVolumesFromDestinationMinionTask(AttachVolumesToDestinationMinionTask):
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.detach_volumes_from_minion
+
+
+class AttachVolumesToOSMorphingMinionTask(
+        _BaseVolumesMinionMachineAttachmentTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        return "osmorphing_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.attach_volumes_to_minion
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS
+
+    @classmethod
+    def _clear_mapped_minion_task_info_field(cls):
+        return False
+
+
+class DetachVolumesFromOSMorphingMinionTask(
+        AttachVolumesToOSMorphingMinionTask):
+
+    @classmethod
+    def _get_provider_disk_operation(cls, provider):
+        return provider.detach_volumes_from_minion
+
+    @classmethod
+    def _clear_mapped_minion_task_info_field(cls):
+        return True
+
+
+class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
+    """ The purposes of the minion validation tasks are to:
+    1) run the afferent validation method on the provider
+    2) "translate" the fields related to the minion into fields
+    which are to be consumed by the other tasks
+    (e.g. REPLICATE_DISKS or OS_MORPHING)
+    """
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion validation platform specified")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        base_props = set([
+            "export_info",
+            cls._get_transfer_properties_task_info_field(),
+            cls._get_minion_properties_task_info_field()])
+        base_props.union(set(
+            cls._get_minion_task_info_field_mappings().keys()))
+        return list(base_props)
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return list(
+            cls._get_minion_task_info_field_mappings().values())
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        raise NotImplementedError(
+            "No minion pool provider validation method was specified.")
+
+    @classmethod
+    def _get_transfer_properties_task_info_field(cls):
+        platform = cls.get_required_platform()
+        if platform == constants.PROVIDER_PLATFORM_SOURCE:
+            return "source_environment"
+        elif platform == constants.PROVIDER_PLATFORM_DESTINATION:
+            return "target_environment"
+        raise exception.CoriolisException(
+            "Unknown minion pool validation operation platform '%s'" % (
+                platform))
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        raise NotImplementedError(
+            "No minion validation task info field specified.")
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        raise NotImplementedError(
+            "No minion task info field mappings provided.")
+
+    def _run(self, ctxt, instance, origin, destination,
+             task_info, event_handler):
+
+        platform_to_target = None
+        required_platform = self.get_required_platform()
+        if required_platform == constants.TASK_PLATFORM_SOURCE:
+            platform_to_target = origin
+        elif required_platform == constants.TASK_PLATFORM_DESTINATION:
+            platform_to_target = destination
+        else:
+            raise NotImplementedError(
+                "Unknown minion pool validation operation platform '%s'" % (
+                    required_platform))
+
+        connection_info = base.get_connection_info(ctxt, platform_to_target)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            platform_to_target["type"], provider_type, event_handler)
+
+        export_info = task_info["export_info"]
+        minion_properties = task_info[
+            self._get_minion_properties_task_info_field()]
+        transfer_properties = task_info[
+            self._get_transfer_properties_task_info_field()]
+        validation_op = self._get_provider_pool_validation_operation(provider)
+        validation_op(
+            ctxt, connection_info, export_info, transfer_properties,
+            minion_properties)
+
+        field_mappings = self._get_minion_task_info_field_mappings()
+        return {
+            field_mappings[field]: task_info[field]
+            for field in field_mappings}
+
+
+class ValidateSourceMinionCompatibilityTask(
+        _BaseValidateMinionCompatibilityTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        return "source_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        return provider.validate_minion_compatibility_for_transfer
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class ValidateDestinationMinionCompatibilityTask(
+        _BaseValidateMinionCompatibilityTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        return "target_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        return provider.validate_minion_compatibility_for_transfer
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return TARGET_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class ValidateOSMorphingMinionCompatibilityTask(
+        _BaseValidateMinionCompatibilityTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_properties_task_info_field(cls):
+        return "osmorphing_minion_provider_properties"
+
+    @classmethod
+    def _get_provider_pool_validation_operation(cls, provider):
+        return provider.validate_osmorphing_minion_compatibility_for_transfer
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return  OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class _BaseReleaseMinionTask(base.TaskRunner):
+    """ The purpose of releasal tasks is to clear (set to None) all of the
+    fields afferent to the minion for the respective task type.
+    """
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion releasing platform specified")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        prop_mappings = cls._get_minion_task_info_field_mappings()
+        return list(
+            set(prop_mappings.keys()).union(
+                prop_mappings.values()))
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return cls.get_required_task_info_properties()
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        raise NotImplementedError(
+            "No minion task info field mappings provided.")
+
+    def _run(self, ctxt, instance, origin, destination,
+             task_info, event_handler):
+        event_manager = events.EventManager(event_handler)
+        event_manager.progress_update("Releasing minion machine")
+        return {
+            field: None
+            for field in self.get_returned_task_info_properties()}
+
+
+class ReleaseSourceMinionTask(_BaseReleaseMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return SOURCE_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class ReleaseDestinationMinionTask(_BaseReleaseMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return TARGET_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class ReleaseOSMorphingMinionTask(_BaseReleaseMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+    @classmethod
+    def _get_minion_task_info_field_mappings(cls):
+        return OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS
+
+
+class CollectOSMorphingInfoTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["target_environment", "instance_deployment_info"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return ["osmorphing_info"]
+
+    def _run(self, ctxt, instance, origin, destination, task_info,
+             event_handler):
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            destination["type"], provider_type, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+        target_environment = task_info["target_environment"]
+        instance_deployment_info = task_info["instance_deployment_info"]
+
+        result = provider.get_additional_os_morphing_info(
+            ctxt, connection_info, target_environment,
+            instance_deployment_info)
+
+        if not isinstance(result, dict) or 'osmorphing_info' not in result:
+            raise exception.CoriolisException(
+                "'get_additional_os_morphing_info' method for provider of type"
+                " '%s' failed to return OSMorphing info.")
+
+        return {
+            "osmorphing_info": result["osmorphing_info"]}

+ 26 - 0
coriolis/worker/rpc/client.py

@@ -130,3 +130,29 @@ class WorkerClient(object):
 
 
     def get_service_status(self, ctxt):
     def get_service_status(self, ctxt):
         return self._client.call(ctxt, 'get_service_status')
         return self._client.call(ctxt, 'get_service_status')
+
+    def get_endpoint_source_minion_pool_options(
+            self, ctxt, platform_name, connection_info, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_source_minion_pool_options',
+            platform_name=platform_name, connection_info=connection_info,
+            env=env, option_names=option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, platform_name, connection_info, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_destination_minion_pool_options',
+            platform_name=platform_name, connection_info=connection_info,
+            env=env, option_names=option_names)
+
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, platform_name, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_source_minion_pool_options',
+            platform_name=platform_name, pool_environment=pool_environment)
+
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, platform_name, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_destination_minion_pool_options',
+            platform_name=platform_name, pool_environment=pool_environment)

+ 90 - 0
coriolis/worker/rpc/server.py

@@ -372,6 +372,55 @@ class WorkerServerEndpoint(object):
 
 
         return options
         return options
 
 
+    def get_endpoint_source_minion_pool_options(
+            self, ctxt, platform_name, connection_info, env, option_names):
+        provider = providers_factory.get_provider(
+            platform_name,
+            constants.PROVIDER_TYPE_SOURCE_MINION_POOL,
+            None, raise_if_not_found=False)
+        if not provider:
+            raise exception.InvalidInput(
+                "Provider plugin for platform '%s' does not support source "
+                "minion pool creation or management." % platform_name)
+
+        secret_connection_info = utils.get_secret_connection_info(
+            ctxt, connection_info)
+
+        options = provider.get_minion_pool_options(
+            ctxt, secret_connection_info, env=env,
+            option_names=option_names)
+
+        # NOTE: the structure of option values is the same for minion pools:
+        schemas.validate_value(
+            options, schemas.CORIOLIS_DESTINATION_ENVIRONMENT_OPTIONS_SCHEMA)
+
+        return options
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, platform_name, connection_info, env, option_names):
+        provider = providers_factory.get_provider(
+            platform_name,
+            constants.PROVIDER_TYPE_DESTINATION_MINION_POOL,
+            None, raise_if_not_found=False)
+        if not provider:
+            raise exception.InvalidInput(
+                "Provider plugin for platform '%s' does not support "
+                "destination minion pool creation or management." % (
+                    platform_name))
+
+        secret_connection_info = utils.get_secret_connection_info(
+            ctxt, connection_info)
+
+        options = provider.get_minion_pool_options(
+            ctxt, secret_connection_info, env=env,
+            option_names=option_names)
+
+        # NOTE: the structure of option values is the same for minion pools:
+        schemas.validate_value(
+            options, schemas.CORIOLIS_DESTINATION_ENVIRONMENT_OPTIONS_SCHEMA)
+
+        return options
+
     def get_endpoint_source_options(
     def get_endpoint_source_options(
             self, ctxt, platform_name, connection_info, env, option_names):
             self, ctxt, platform_name, connection_info, env, option_names):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
@@ -459,6 +508,39 @@ class WorkerServerEndpoint(object):
 
 
         return (is_valid, message)
         return (is_valid, message)
 
 
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, platform_name, pool_environment):
+        provider = providers_factory.get_provider(
+            platform_name, constants.PROVIDER_TYPE_SOURCE_MINION_POOL, None)
+        pool_options_schema = provider.get_minion_pool_environment_schema()
+
+        is_valid = True
+        message = None
+        try:
+            schemas.validate_value(pool_environment, pool_options_schema)
+        except exception.SchemaValidationException as ex:
+            is_valid = False
+            message = str(ex)
+
+        return (is_valid, message)
+
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, platform_name, pool_environment):
+        provider = providers_factory.get_provider(
+            platform_name, constants.PROVIDER_TYPE_DESTINATION_MINION_POOL,
+            None)
+        pool_options_schema = provider.get_minion_pool_environment_schema()
+
+        is_valid = True
+        message = None
+        try:
+            schemas.validate_value(pool_environment, pool_options_schema)
+        except exception.SchemaValidationException as ex:
+            is_valid = False
+            message = str(ex)
+
+        return (is_valid, message)
+
     def validate_endpoint_connection(self, ctxt, platform_name,
     def validate_endpoint_connection(self, ctxt, platform_name,
                                      connection_info):
                                      connection_info):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
@@ -511,6 +593,14 @@ class WorkerServerEndpoint(object):
             schema = provider.get_source_environment_schema()
             schema = provider.get_source_environment_schema()
             schemas["source_environment_schema"] = schema
             schemas["source_environment_schema"] = schema
 
 
+        if provider_type == constants.PROVIDER_TYPE_SOURCE_MINION_POOL:
+            schema = provider.get_minion_pool_environment_schema()
+            schemas["source_minion_pool_environment_schema"] = schema
+
+        if provider_type == constants.PROVIDER_TYPE_DESTINATION_MINION_POOL:
+            schema = provider.get_minion_pool_environment_schema()
+            schemas["destination_minion_pool_environment_schema"] = schema
+
         return schemas
         return schemas
 
 
     def get_diagnostics(self, ctxt):
     def get_diagnostics(self, ctxt):

Некоторые файлы не были показаны из-за большого количества измененных файлов