Просмотр исходного кода

Add logical separation between source and target minion pool providers.

Nashwan Azhari 5 лет назад
Родитель
Сommit
9dda21f27f

+ 46 - 0
coriolis/api/v1/endpoint_destination_minion_pool_options.py

@@ -0,0 +1,46 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+
+from coriolis import utils
+from coriolis.api.v1.views import (
+    endpoint_destination_minion_pool_options_view)
+from coriolis.api import wsgi as api_wsgi
+from coriolis.endpoint_minion_pool_options import api
+from coriolis.policies import endpoints as endpoint_policies
+
+
+LOG = logging.getLogger(__name__)
+
+
+class EndpointDestinationMinionPoolOptionsController(api_wsgi.Controller):
+    def __init__(self):
+        self._minion_pool_options_api = api.API()
+        super(EndpointDestinationMinionPoolOptionsController, self).__init__()
+
+    def index(self, req, endpoint_id):
+        context = req.environ['coriolis.context']
+        context.can("%s:list_destination_minion_pool_options" % (
+            endpoint_policies.ENDPOINTS_POLICY_PREFIX))
+
+        env = req.GET.get("env")
+        if env is not None:
+            env = utils.decode_base64_param(env, is_json=True)
+        else:
+            env = {}
+
+        options = req.GET.get("options")
+        if options is not None:
+            options = utils.decode_base64_param(options, is_json=True)
+        else:
+            options = {}
+
+        return endpoint_destination_minion_pool_options_view.collection(
+            req,
+            self._minion_pool_options_api.get_endpoint_destination_minion_pool_options(
+                context, endpoint_id, env=env, option_names=options))
+
+
+def create_resource():
+    return api_wsgi.Resource(EndpointDestinationMinionPoolOptionsController())

+ 9 - 8
coriolis/api/v1/endpoint_minion_pool_options.py → coriolis/api/v1/endpoint_source_minion_pool_options.py

@@ -4,23 +4,24 @@
 from oslo_log import log as logging
 from oslo_log import log as logging
 
 
 from coriolis import utils
 from coriolis import utils
-from coriolis.api.v1.views import endpoint_minion_pool_options_view
+from coriolis.api.v1.views import (
+        endpoint_source_minion_pool_options_view)
 from coriolis.api import wsgi as api_wsgi
 from coriolis.api import wsgi as api_wsgi
-from coriolis.endpoint_minion_options import api
+from coriolis.endpoint_minion_pool_options import api
 from coriolis.policies import endpoints as endpoint_policies
 from coriolis.policies import endpoints as endpoint_policies
 
 
 
 
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
 
 
 
 
-class EndpointMinionPoolOptionsController(api_wsgi.Controller):
+class EndpointSourceMinionPoolOptionsController(api_wsgi.Controller):
     def __init__(self):
     def __init__(self):
         self._minion_pool_options_api = api.API()
         self._minion_pool_options_api = api.API()
-        super(EndpointMinionPoolOptionsController, self).__init__()
+        super(EndpointSourceMinionPoolOptionsController, self).__init__()
 
 
     def index(self, req, endpoint_id):
     def index(self, req, endpoint_id):
         context = req.environ['coriolis.context']
         context = req.environ['coriolis.context']
-        context.can("%s:list_minion_pool_options" % (
+        context.can("%s:list_source_minion_pool_options" % (
             endpoint_policies.ENDPOINTS_POLICY_PREFIX))
             endpoint_policies.ENDPOINTS_POLICY_PREFIX))
 
 
         env = req.GET.get("env")
         env = req.GET.get("env")
@@ -35,11 +36,11 @@ class EndpointMinionPoolOptionsController(api_wsgi.Controller):
         else:
         else:
             options = {}
             options = {}
 
 
-        return endpoint_minion_pool_options_view.collection(
+        return endpoint_source_minion_pool_options_view.collection(
             req,
             req,
-            self._minion_pool_options_api.get_endpoint_minion_pool_options(
+            self._minion_pool_options_api.get_endpoint_source_minion_pool_options(
                 context, endpoint_id, env=env, option_names=options))
                 context, endpoint_id, env=env, option_names=options))
 
 
 
 
 def create_resource():
 def create_resource():
-    return api_wsgi.Resource(EndpointMinionPoolOptionsController())
+    return api_wsgi.Resource(EndpointSourceMinionPoolOptionsController())

+ 108 - 28
coriolis/api/v1/minion_pools.py

@@ -37,6 +37,42 @@ class MinionPoolController(api_wsgi.Controller):
         return minion_pool_view.collection(
         return minion_pool_view.collection(
             req, self._minion_pool_api.get_minion_pools(context))
             req, self._minion_pool_api.get_minion_pools(context))
 
 
+    def _check_pool_retention_strategy(self, pool_retention_strategy):
+        if not pool_retention_strategy:
+            LOG.debug(
+                "Ignoring void minion pool retention strategy '%s'",
+                pool_retention_strategy)
+        valid_strats = [
+            constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE,
+            constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF]
+        if pool_retention_strategy not in valid_strats:
+            raise Exception(
+                "Invalid minion pool retention strategy '%s'. Must be one of "
+                "the following: %s" % (pool_retention_strategy, valid_strats))
+
+    def _check_pool_numeric_values(
+            self, minimum_minions, maximum_minions, minion_max_idle_time):
+        if minimum_minions is not None:
+            if minimum_minions <= 0:
+                raise Exception(
+                    "'minimum_minions' must be a strictly positive integer. "
+                    "Got: %s" % minimum_minions)
+        if maximum_minions is not None:
+            if maximum_minions <= 0:
+                raise Exception(
+                    "'maximum_minions' must be a strictly positive integer. "
+                    "Got: %s" % maximum_minions)
+            if maximum_minions < minimum_minions:
+                raise Exception(
+                    "'maximum_minions' value (%s) must be at least as large as"
+                    " the 'minimum_minions' value (%s)." % (
+                        maximum_minions, minimum_minions))
+        if minion_max_idle_time is not None:
+            if minion_max_idle_time <= 0:
+                raise Exception(
+                    "'minion_max_idle_time' must be a strictly positive "
+                    "integer. Got: %s" % maximum_minions)
+
     def _validate_create_body(self, ctxt, body):
     def _validate_create_body(self, ctxt, body):
         try:
         try:
             minion_pool = body["minion_pool"]
             minion_pool = body["minion_pool"]
@@ -48,31 +84,46 @@ class MinionPoolController(api_wsgi.Controller):
                     "The provided pool OS type '%s' is invalid. Must be one "
                     "The provided pool OS type '%s' is invalid. Must be one "
                     "of the following: %s" % (
                     "of the following: %s" % (
                         pool_os_type, constants.VALID_OS_TYPES))
                         pool_os_type, constants.VALID_OS_TYPES))
+            pool_platform = minion_pool["pool_platform"]
+            supported_pool_platforms = [
+                constants.PROVIDER_PLATFORM_SOURCE,
+                constants.PROVIDER_PLATFORM_DESTINATION]
+            if pool_platform not in supported_pool_platforms:
+                raise Exception(
+                    "The provided pool platform ('%s') is invalid. Must be one"
+                    " of the following: %s" % (
+                        pool_platform, supported_pool_platforms))
+            if pool_platform == constants.PROVIDER_PLATFORM_SOURCE and (
+                    pool_os_type != constants.OS_TYPE_LINUX):
+                raise Exception(
+                    "Source Minion Pools are required to be of OS type "
+                    "'%s', not '%s'." % (
+                        constants.OS_TYPE_LINUX, pool_os_type))
             environment_options = minion_pool["environment_options"]
             environment_options = minion_pool["environment_options"]
-            self._endpoints_api.validate_endpoint_minion_pool_options(
-                ctxt, endpoint_id, environment_options)
+            if pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+                self._endpoints_api.validate_endpoint_source_minion_pool_options(
+                    ctxt, endpoint_id, environment_options)
+            elif pool_platform == constants.PROVIDER_PLATFORM_DESTINATION:
+                self._endpoints_api.validate_endpoint_destination_minion_pool_options(
+                    ctxt, endpoint_id, environment_options)
 
 
             minimum_minions = minion_pool.get("minimum_minions", 1)
             minimum_minions = minion_pool.get("minimum_minions", 1)
             maximum_minions = minion_pool.get(
             maximum_minions = minion_pool.get(
                 "maximum_minions", minimum_minions)
                 "maximum_minions", minimum_minions)
             minion_max_idle_time = minion_pool.get(
             minion_max_idle_time = minion_pool.get(
                 "minion_max_idle_time", 1)
                 "minion_max_idle_time", 1)
+            self._check_pool_numeric_values(
+                minimum_minions, maximum_minions, minion_max_idle_time)
             minion_retention_strategy = minion_pool.get(
             minion_retention_strategy = minion_pool.get(
                 "minion_retention_strategy",
                 "minion_retention_strategy",
                 constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE)
                 constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE)
-            suppoted_retention_strategies = [
-                constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE]
-            if minion_retention_strategy not in suppoted_retention_strategies:
-                raise Exception(
-                    "Unsupported minion retention strategy '%s'. Must be "
-                    "one of: %s" % (
-                        minion_retention_strategy,
-                        suppoted_retention_strategies))
+            self._check_pool_retention_strategy(
+                minion_retention_strategy)
             notes = minion_pool.get("notes")
             notes = minion_pool.get("notes")
             return (
             return (
-                name, endpoint_id, pool_os_type, environment_options,
-                minimum_minions, maximum_minions, minion_max_idle_time,
-                minion_retention_strategy, notes)
+                name, endpoint_id, pool_platform, pool_os_type,
+                environment_options, minimum_minions, maximum_minions,
+                minion_max_idle_time, minion_retention_strategy, notes)
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)
             if hasattr(ex, "message"):
             if hasattr(ex, "message"):
@@ -84,34 +135,63 @@ class MinionPoolController(api_wsgi.Controller):
     def create(self, req, body):
     def create(self, req, body):
         context = req.environ["coriolis.context"]
         context = req.environ["coriolis.context"]
         context.can(pools_policies.get_minion_pools_policy_label("create"))
         context.can(pools_policies.get_minion_pools_policy_label("create"))
-        (name, endpoint_id, pool_os_type, environment_options, minimum_minions,
-         maximum_minions, minion_max_idle_time, minion_retention_strategy,
-         notes) = (
+        (name, endpoint_id, pool_platform, pool_os_type, environment_options,
+         minimum_minions, maximum_minions, minion_max_idle_time,
+         minion_retention_strategy, notes) = (
             self._validate_create_body(context, body))
             self._validate_create_body(context, body))
         return minion_pool_view.single(req, self._minion_pool_api.create(
         return minion_pool_view.single(req, self._minion_pool_api.create(
-            context, name, endpoint_id, pool_os_type, environment_options,
-            minimum_minions, maximum_minions, minion_max_idle_time,
-            minion_retention_strategy, notes=notes))
+            context, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=notes))
 
 
     def _validate_update_body(self, id, context, body):
     def _validate_update_body(self, id, context, body):
         try:
         try:
             minion_pool = body["minion_pool"]
             minion_pool = body["minion_pool"]
             if 'endpoint_id' in minion_pool:
             if 'endpoint_id' in minion_pool:
-                raise exception.InvalidInput(
-                    "The 'endpoint_id' of a minion pool cannot be "
-                    "updated.")
+                raise Exception(
+                    "The 'endpoint_id' of a minion pool cannot be updated.")
+            if 'pool_platform' in minion_pool:
+                raise Exception(
+                    "The 'pool_platform' of a minion pool cannot be updated.")
             vals = {k: minion_pool[k] for k in minion_pool.keys() &
             vals = {k: minion_pool[k] for k in minion_pool.keys() &
                     {"name", "environment_options", "minimum_minions",
                     {"name", "environment_options", "minimum_minions",
                      "maximum_minions", "minion_max_idle_time",
                      "maximum_minions", "minion_max_idle_time",
                      "minion_retention_strategy", "notes", "pool_os_type"}}
                      "minion_retention_strategy", "notes", "pool_os_type"}}
-            if 'environment_options' in vals:
+            if 'minion_retention_strategy' in vals:
+                self._check_pool_retention_strategy(
+                    vals['minion_retention_strategy'])
+            if any([
+                    f in vals for f in [
+                        'environment_options', 'minimum_minions',
+                        'maximum_minions', 'minion_max_idle_time']]):
                 minion_pool = self._minion_pool_api.get_minion_pool(
                 minion_pool = self._minion_pool_api.get_minion_pool(
                     context, id)
                     context, id)
-                self._endpoints_api.validate_endpoint_minion_pool_options(
-                    # TODO(aznashwan): remove endpoint ID fields reduncancy
-                    # once DB models are overhauled:
-                    context, minion_pool['origin_endpoint_id'],
-                    vals['environment_options'])
+                self._check_pool_numeric_values(
+                    vals.get(
+                        'minimum_minions', minion_pool['minimum_minions']),
+                    vals.get(
+                        'maximum_minions', minion_pool['maximum_minions']),
+                    vals.get('minion_max_idle_time'))
+
+                if 'environment_options' in vals:
+                    if minion_pool['pool_platform'] == (
+                            constants.PROVIDER_PLATFORM_SOURCE):
+                        self._endpoints_api.validate_endpoint_source_minion_pool_options(
+                            # TODO(aznashwan): remove endpoint ID fields redundancy
+                            # once DB models are overhauled:
+                            context, minion_pool['origin_endpoint_id'],
+                            vals['environment_options'])
+                    elif minion_pool['pool_platform'] == (
+                            constants.PROVIDER_PLATFORM_DESTINATION):
+                        self._endpoints_api.validate_endpoint_destination_minion_pool_options(
+                            # TODO(aznashwan): remove endpoint ID fields redundancy
+                            # once DB models are overhauled:
+                            context, minion_pool['origin_endpoint_id'],
+                            vals['environment_options'])
+                    else:
+                        raise Exception(
+                            "Unknown pool platform: %s" % minion_pool[
+                                'pool_platform'])
             return vals
             return vals
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)

+ 15 - 5
coriolis/api/v1/router.py

@@ -6,10 +6,11 @@ from oslo_log import log as logging
 from coriolis import api
 from coriolis import api
 from coriolis.api.v1 import diagnostics
 from coriolis.api.v1 import diagnostics
 from coriolis.api.v1 import endpoint_actions
 from coriolis.api.v1 import endpoint_actions
+from coriolis.api.v1 import endpoint_destination_minion_pool_options
 from coriolis.api.v1 import endpoint_destination_options
 from coriolis.api.v1 import endpoint_destination_options
 from coriolis.api.v1 import endpoint_instances
 from coriolis.api.v1 import endpoint_instances
-from coriolis.api.v1 import endpoint_minion_pool_options
 from coriolis.api.v1 import endpoint_networks
 from coriolis.api.v1 import endpoint_networks
+from coriolis.api.v1 import endpoint_source_minion_pool_options
 from coriolis.api.v1 import endpoint_source_options
 from coriolis.api.v1 import endpoint_source_options
 from coriolis.api.v1 import endpoint_storage
 from coriolis.api.v1 import endpoint_storage
 from coriolis.api.v1 import endpoints
 from coriolis.api.v1 import endpoints
@@ -100,12 +101,21 @@ class APIRouter(api.APIRouter):
                        action='action',
                        action='action',
                        conditions={'method': 'POST'})
                        conditions={'method': 'POST'})
 
 
-        self.resources['endpoint_minion_pool_options'] = \
-            endpoint_minion_pool_options.create_resource()
+        self.resources['endpoint_source_minion_pool_options'] = \
+            endpoint_source_minion_pool_options.create_resource()
         mapper.resource('minion_pool_options',
         mapper.resource('minion_pool_options',
-                        'endpoints/{endpoint_id}/minion-pool-options',
+                        'endpoints/{endpoint_id}/source-minion-pool-options',
                         controller=(
                         controller=(
-                            self.resources['endpoint_minion_pool_options']))
+                            self.resources[
+                                'endpoint_source_minion_pool_options']))
+
+        self.resources['endpoint_destination_minion_pool_options'] = \
+            endpoint_destination_minion_pool_options.create_resource()
+        mapper.resource('minion_pool_options',
+                        'endpoints/{endpoint_id}/destination-minion-pool-options',
+                        controller=(
+                            self.resources[
+                                'endpoint_destination_minion_pool_options']))
 
 
         endpoint_actions_resource = endpoint_actions.create_resource()
         endpoint_actions_resource = endpoint_actions.create_resource()
         self.resources['endpoint_actions'] = endpoint_actions_resource
         self.resources['endpoint_actions'] = endpoint_actions_resource

+ 3 - 3
coriolis/api/v1/views/endpoint_minion_pool_options_view.py → coriolis/api/v1/views/endpoint_destination_minion_pool_options_view.py

@@ -14,7 +14,7 @@ def _format_dest_opt(req, destination_option, keys=None):
         transform(k, v) for k, v in destination_option.items()))
         transform(k, v) for k, v in destination_option.items()))
 
 
 
 
-def collection(req, destination_options):
+def collection(req, destination_pool_options):
     formatted_opts = [
     formatted_opts = [
-        _format_dest_opt(req, opt) for opt in destination_options]
-    return {'minion_pool_options': formatted_opts}
+        _format_dest_opt(req, opt) for opt in destination_pool_options]
+    return {'destination_minion_pool_options': formatted_opts}

+ 20 - 0
coriolis/api/v1/views/endpoint_source_minion_pool_options_view.py

@@ -0,0 +1,20 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_dest_opt(req, source_option, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    return dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in source_option.items()))
+
+
+def collection(req, source_pool_options):
+    formatted_opts = [
+        _format_dest_opt(req, opt) for opt in source_pool_options]
+    return {'source_minion_pool_options': formatted_opts}

+ 24 - 10
coriolis/conductor/rpc/client.py

@@ -392,13 +392,15 @@ class ConductorClient(object):
             ctxt, 'delete_service', service_id=service_id)
             ctxt, 'delete_service', service_id=service_id)
 
 
     def create_minion_pool(
     def create_minion_pool(
-            self, ctxt, name, endpoint_id, pool_os_type, environment_options,
-            minimum_minions, maximum_minions, minion_max_idle_time,
-            minion_retention_strategy, notes=None):
+            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=None):
         return self._client.call(
         return self._client.call(
             ctxt, 'create_minion_pool', name=name, endpoint_id=endpoint_id,
             ctxt, 'create_minion_pool', name=name, endpoint_id=endpoint_id,
-            pool_os_type=pool_os_type, environment_options=environment_options,
-            minimum_minions=minimum_minions, maximum_minions=maximum_minions,
+            pool_platform=pool_platform, pool_os_type=pool_os_type,
+            environment_options=environment_options,
+            minimum_minions=minimum_minions,
+            maximum_minions=maximum_minions,
             minion_max_idle_time=minion_max_idle_time,
             minion_max_idle_time=minion_max_idle_time,
             minion_retention_strategy=minion_retention_strategy,
             minion_retention_strategy=minion_retention_strategy,
             notes=notes)
             notes=notes)
@@ -464,14 +466,26 @@ class ConductorClient(object):
             minion_pool_id=minion_pool_id, execution_id=execution_id,
             minion_pool_id=minion_pool_id, execution_id=execution_id,
             force=force)
             force=force)
 
 
-    def get_endpoint_minion_pool_options(
+    def get_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, env, option_names):
             self, ctxt, endpoint_id, env, option_names):
         return self._client.call(
         return self._client.call(
-            ctxt, 'get_endpoint_minion_pool_options', endpoint_id=endpoint_id,
-            env=env, option_names=option_names)
+            ctxt, 'get_endpoint_source_minion_pool_options',
+            endpoint_id=endpoint_id, env=env, option_names=option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_destination_minion_pool_options',
+            endpoint_id=endpoint_id, env=env, option_names=option_names)
+
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_source_minion_pool_options',
+            endpoint_id=endpoint_id, pool_environment=pool_environment)
 
 
-    def validate_endpoint_minion_pool_options(
+    def validate_endpoint_destination_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
             self, ctxt, endpoint_id, pool_environment):
         return self._client.call(
         return self._client.call(
-            ctxt, 'validate_endpoint_minion_pool_options',
+            ctxt, 'validate_endpoint_destination_minion_pool_options',
             endpoint_id=endpoint_id, pool_environment=pool_environment)
             endpoint_id=endpoint_id, pool_environment=pool_environment)

+ 145 - 35
coriolis/conductor/rpc/server.py

@@ -461,7 +461,7 @@ class ConductorServerEndpoint(object):
         return worker_rpc.get_endpoint_destination_options(
         return worker_rpc.get_endpoint_destination_options(
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
 
 
-    def get_endpoint_minion_pool_options(
+    def get_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, env, option_names):
             self, ctxt, endpoint_id, env, option_names):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
 
@@ -470,8 +470,21 @@ class ConductorServerEndpoint(object):
             region_sets=[[reg.id for reg in endpoint.mapped_regions]],
             region_sets=[[reg.id for reg in endpoint.mapped_regions]],
             provider_requirements={
             provider_requirements={
                 endpoint.type: [
                 endpoint.type: [
-                    constants.PROVIDER_TYPE_MINION_POOL]})
-        return worker_rpc.get_endpoint_minion_pool_options(
+                    constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
+        return worker_rpc.get_endpoint_source_minion_pool_options(
+            ctxt, endpoint.type, endpoint.connection_info, env, option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, env, option_names):
+        endpoint = self.get_endpoint(ctxt, endpoint_id)
+
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_sets=[[reg.id for reg in endpoint.mapped_regions]],
+            provider_requirements={
+                endpoint.type: [
+                    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
+        return worker_rpc.get_endpoint_destination_minion_pool_options(
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
 
 
     def get_endpoint_networks(self, ctxt, endpoint_id, env):
     def get_endpoint_networks(self, ctxt, endpoint_id, env):
@@ -535,7 +548,7 @@ class ConductorServerEndpoint(object):
         return worker_rpc.validate_endpoint_source_environment(
         return worker_rpc.validate_endpoint_source_environment(
             ctxt, endpoint.type, source_env)
             ctxt, endpoint.type, source_env)
 
 
-    def validate_endpoint_minion_pool_options(
+    def validate_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
             self, ctxt, endpoint_id, pool_environment):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
 
@@ -543,9 +556,24 @@ class ConductorServerEndpoint(object):
             ctxt, enabled=True,
             ctxt, enabled=True,
             region_sets=[[reg.id for reg in endpoint.mapped_regions]],
             region_sets=[[reg.id for reg in endpoint.mapped_regions]],
             provider_requirements={
             provider_requirements={
-                endpoint.type: [constants.PROVIDER_TYPE_MINION_POOL]})
+                endpoint.type: [
+                    constants.PROVIDER_TYPE_SOURCE_MINION_POOL]})
 
 
-        return worker_rpc.validate_endpoint_minion_pool_options(
+        return worker_rpc.validate_endpoint_source_minion_pool_options(
+            ctxt, endpoint.type, pool_environment)
+
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        endpoint = self.get_endpoint(ctxt, endpoint_id)
+
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_sets=[[reg.id for reg in endpoint.mapped_regions]],
+            provider_requirements={
+                endpoint.type: [
+                    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL]})
+
+        return worker_rpc.validate_endpoint_destination_minion_pool_options(
             ctxt, endpoint.type, pool_environment)
             ctxt, endpoint.type, pool_environment)
 
 
     def get_available_providers(self, ctxt):
     def get_available_providers(self, ctxt):
@@ -1261,6 +1289,14 @@ class ConductorServerEndpoint(object):
                         action.origin_minion_pool_id,
                         action.origin_minion_pool_id,
                         origin_pool.origin_endpoint_id,
                         origin_pool.origin_endpoint_id,
                         action.origin_endpoint_id))
                         action.origin_endpoint_id))
+            if origin_pool.pool_platform != constants.PROVIDER_PLATFORM_SOURCE:
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected origin minion pool ('%s') is configured as a"
+                    " '%s' pool. The pool must be of type %s to be used for "
+                    "data exports." % (
+                        action.origin_minion_pool_id,
+                        origin_pool.pool_platform,
+                        constants.PROVIDER_PLATFORM_SOURCE))
             if origin_pool.pool_os_type != constants.OS_TYPE_LINUX:
             if origin_pool.pool_os_type != constants.OS_TYPE_LINUX:
                 raise exception.InvalidMinionPoolSelection(
                 raise exception.InvalidMinionPoolSelection(
                     "The selected origin minion pool ('%s') is of OS type '%s'"
                     "The selected origin minion pool ('%s') is of OS type '%s'"
@@ -1280,6 +1316,15 @@ class ConductorServerEndpoint(object):
                         action.destination_minion_pool_id,
                         action.destination_minion_pool_id,
                         destination_pool.origin_endpoint_id,
                         destination_pool.origin_endpoint_id,
                         action.destination_endpoint_id))
                         action.destination_endpoint_id))
+            if destination_pool.pool_platform != (
+                    constants.PROVIDER_PLATFORM_DESTINATION):
+                raise exception.InvalidMinionPoolSelection(
+                    "The selected destination minion pool ('%s') is configured"
+                    " as a '%s'. The pool must be of type %s to be used for "
+                    "data imports." % (
+                        action.destination_minion_pool_id,
+                        destination_pool.pool_platform,
+                        constants.PROVIDER_PLATFORM_DESTINATION))
             if destination_pool.pool_os_type != constants.OS_TYPE_LINUX:
             if destination_pool.pool_os_type != constants.OS_TYPE_LINUX:
                 raise exception.InvalidMinionPoolSelection(
                 raise exception.InvalidMinionPoolSelection(
                     "The selected destination minion pool ('%s') is of OS type"
                     "The selected destination minion pool ('%s') is of OS type"
@@ -1301,6 +1346,15 @@ class ConductorServerEndpoint(object):
                             instance, pool_id,
                             instance, pool_id,
                             osmorphing_pool.origin_endpoint_id,
                             osmorphing_pool.origin_endpoint_id,
                             action.destination_endpoint_id))
                             action.destination_endpoint_id))
+                if osmorphing_pool.pool_platform != (
+                        constants.PROVIDER_PLATFORM_DESTINATION):
+                    raise exception.InvalidMinionPoolSelection(
+                        "The selected OSMorphing minion pool for instance '%s'"
+                        "  ('%s') is configured as a '%s' pool. The pool must "
+                        "be of type %s to be used for OSMorphing." % (
+                            instance, pool_id,
+                            osmorphing_pool.pool_platform,
+                            constants.PROVIDER_PLATFORM_DESTINATION))
 
 
     def create_instances_replica(self, ctxt, origin_endpoint_id,
     def create_instances_replica(self, ctxt, origin_endpoint_id,
                                  destination_endpoint_id,
                                  destination_endpoint_id,
@@ -1728,7 +1782,8 @@ class ConductorServerEndpoint(object):
                 ctxt, include_tasks_executions=False, include_info=False,
                 ctxt, include_tasks_executions=False, include_info=False,
                 include_machines=True, to_dict=False)
                 include_machines=True, to_dict=False)
             minion_pool_id_mappings = {
             minion_pool_id_mappings = {
-                pool.id: pool for pool in minion_pools}
+                pool.id: pool for pool in minion_pools
+                if pool.id in minion_pool_ids}
 
 
             missing_pools = [
             missing_pools = [
                 pool_id for pool_id in minion_pool_ids
                 pool_id for pool_id in minion_pool_ids
@@ -1739,7 +1794,8 @@ class ConductorServerEndpoint(object):
                         missing_pools))
                         missing_pools))
 
 
             unallocated_pools = {
             unallocated_pools = {
-                pool.id: pool.pool_status for pool in minion_pools
+                pool_id: pool.pool_status
+                for (pool_id, pool) in minion_pool_id_mappings.items()
                 if pool.pool_status != constants.MINION_POOL_STATUS_ALLOCATED}
                 if pool.pool_status != constants.MINION_POOL_STATUS_ALLOCATED}
             if unallocated_pools:
             if unallocated_pools:
                 raise exception.InvalidMinionPoolSelection(
                 raise exception.InvalidMinionPoolSelection(
@@ -1792,11 +1848,12 @@ class ConductorServerEndpoint(object):
                         osmorphing_pool_id = osmorphing_pool_map[instance]
                         osmorphing_pool_id = osmorphing_pool_map[instance]
                         # if the selected target and OSMorphing pools
                         # if the selected target and OSMorphing pools
                         # are the same, reuse the same worker:
                         # are the same, reuse the same worker:
+                        ima = instance_machine_allocations[instance]
                         if osmorphing_pool_id == (
                         if osmorphing_pool_id == (
-                                action.destination_minion_pool_id):
-                            allocated_target_machine = (
-                                instance_machine_allocations[
-                                    instance].get('target_minion'))
+                                action.destination_minion_pool_id) and (
+                                    'target_minion' in ima):
+                            allocated_target_machine = ima[
+                                'target_minion']
                             LOG.debug(
                             LOG.debug(
                                 "Reusing disk sync minion '%s' for the "
                                 "Reusing disk sync minion '%s' for the "
                                 "OSMorphing of instance '%s' as port of "
                                 "OSMorphing of instance '%s' as port of "
@@ -3104,7 +3161,9 @@ class ConductorServerEndpoint(object):
                     db_api.update_replica(
                     db_api.update_replica(
                         ctxt, execution.action_id, task_info)
                         ctxt, execution.action_id, task_info)
 
 
-        elif task_type == constants.TASK_TYPE_SET_UP_SHARED_POOL_RESOURCES:
+        elif task_type in (
+                constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES,
+                constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES):
             still_running = _check_other_tasks_running(execution, task)
             still_running = _check_other_tasks_running(execution, task)
             if not still_running:
             if not still_running:
                 LOG.info(
                 LOG.info(
@@ -3116,7 +3175,9 @@ class ConductorServerEndpoint(object):
                         "pool_shared_resources": task_info.get(
                         "pool_shared_resources": task_info.get(
                             "pool_shared_resources", {})})
                             "pool_shared_resources", {})})
 
 
-        elif task_type == constants.TASK_TYPE_TEAR_DOWN_SHARED_POOL_RESOURCES:
+        elif task_type in (
+                constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
+                constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES):
             still_running = _check_other_tasks_running(execution, task)
             still_running = _check_other_tasks_running(execution, task)
             if not still_running:
             if not still_running:
                 LOG.info(
                 LOG.info(
@@ -3127,7 +3188,9 @@ class ConductorServerEndpoint(object):
                     ctxt, execution.action_id, {
                     ctxt, execution.action_id, {
                         "pool_shared_resources": {}})
                         "pool_shared_resources": {}})
 
 
-        elif task_type == constants.TASK_TYPE_CREATE_MINION:
+        elif task_type in (
+                constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
+                constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE):
             LOG.info(
             LOG.info(
                 "Adding DB entry for Minion Machine '%s' of pool %s "
                 "Adding DB entry for Minion Machine '%s' of pool %s "
                 "following completion of task '%s' (type %s).",
                 "following completion of task '%s' (type %s).",
@@ -3145,11 +3208,13 @@ class ConductorServerEndpoint(object):
                 "minion_backup_writer_connection_info"]
                 "minion_backup_writer_connection_info"]
             db_api.add_minion_machine(ctxt, minion_machine)
             db_api.add_minion_machine(ctxt, minion_machine)
 
 
-        elif task_type == constants.TASK_TYPE_DELETE_MINION:
+        elif task_type in (
+                constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE,
+                constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE):
             LOG.info(
             LOG.info(
                 "%s task for Minon Machine '%s' has completed successfully. "
                 "%s task for Minon Machine '%s' has completed successfully. "
                 "Deleting minion machine from DB.",
                 "Deleting minion machine from DB.",
-                constants.TASK_TYPE_DELETE_MINION, task.instance)
+                task_type, task.instance)
             db_api.delete_minion_machine(ctxt, task.instance)
             db_api.delete_minion_machine(ctxt, task.instance)
 
 
         elif task_type in (
         elif task_type in (
@@ -3836,15 +3901,16 @@ class ConductorServerEndpoint(object):
         db_api.delete_service(ctxt, service_id)
         db_api.delete_service(ctxt, service_id)
 
 
     def create_minion_pool(
     def create_minion_pool(
-            self, ctxt, name, endpoint_id, pool_os_type, environment_options,
-            minimum_minions, maximum_minions, minion_max_idle_time,
-            minion_retention_strategy, notes=None):
+            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=None):
         endpoint = db_api.get_endpoint(ctxt, endpoint_id)
         endpoint = db_api.get_endpoint(ctxt, endpoint_id)
 
 
         minion_pool = models.MinionPoolLifecycle()
         minion_pool = models.MinionPoolLifecycle()
         minion_pool.id = str(uuid.uuid4())
         minion_pool.id = str(uuid.uuid4())
         minion_pool.pool_name = name
         minion_pool.pool_name = name
         minion_pool.notes = notes
         minion_pool.notes = notes
+        minion_pool.pool_platform = pool_platform
         minion_pool.pool_os_type = pool_os_type
         minion_pool.pool_os_type = pool_os_type
         minion_pool.pool_status = constants.MINION_POOL_STATUS_UNINITIALIZED
         minion_pool.pool_status = constants.MINION_POOL_STATUS_UNINITIALIZED
         minion_pool.minimum_minions = minimum_minions
         minion_pool.minimum_minions = minimum_minions
@@ -3909,14 +3975,22 @@ class ConductorServerEndpoint(object):
             # action DB models have been overhauled:
             # action DB models have been overhauled:
             "pool_environment_options": minion_pool.source_environment}
             "pool_environment_options": minion_pool.source_environment}
 
 
+        validate_task_type = (
+            constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS)
+        set_up_task_type = (
+            constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES)
+        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+            validate_task_type = (
+                constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS)
+            set_up_task_type = (
+                constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES)
+
         validate_pool_options_task = self._create_task(
         validate_pool_options_task = self._create_task(
-            minion_pool.id,
-            constants.TASK_TYPE_VALIDATE_MINION_POOL_OPTIONS,
-            execution)
+            minion_pool.id, validate_task_type, execution)
 
 
         setup_pool_resources_task = self._create_task(
         setup_pool_resources_task = self._create_task(
             minion_pool.id,
             minion_pool.id,
-            constants.TASK_TYPE_SET_UP_SHARED_POOL_RESOURCES,
+            set_up_task_type,
             execution,
             execution,
             depends_on=[validate_pool_options_task.id])
             depends_on=[validate_pool_options_task.id])
 
 
@@ -3962,10 +4036,14 @@ class ConductorServerEndpoint(object):
         execution.type = (
         execution.type = (
             constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES)
             constants.EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES)
 
 
+        tear_down_task_type = (
+            constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES)
+        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+            tear_down_task_type = (
+                constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES)
+
         self._create_task(
         self._create_task(
-            minion_pool.id,
-            constants.TASK_TYPE_TEAR_DOWN_SHARED_POOL_RESOURCES,
-            execution)
+            minion_pool.id, tear_down_task_type, execution)
 
 
         self._check_execution_tasks_sanity(execution, minion_pool.info)
         self._check_execution_tasks_sanity(execution, minion_pool.info)
 
 
@@ -4009,6 +4087,16 @@ class ConductorServerEndpoint(object):
         new_minion_machine_ids = [
         new_minion_machine_ids = [
             str(uuid.uuid4()) for _ in range(minion_pool.minimum_minions)]
             str(uuid.uuid4()) for _ in range(minion_pool.minimum_minions)]
 
 
+        create_minion_task_type = (
+            constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE)
+        delete_minion_task_type = (
+            constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+            create_minion_task_type = (
+                constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
+            delete_minion_task_type = (
+                constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+
         for minion_machine_id in new_minion_machine_ids:
         for minion_machine_id in new_minion_machine_ids:
             minion_pool.info[minion_machine_id] = {
             minion_pool.info[minion_machine_id] = {
                 "pool_identifier": minion_pool_id,
                 "pool_identifier": minion_pool_id,
@@ -4021,13 +4109,11 @@ class ConductorServerEndpoint(object):
                 "minion_provider_properties": {}}
                 "minion_provider_properties": {}}
 
 
             create_minion_task = self._create_task(
             create_minion_task = self._create_task(
-                minion_machine_id,
-                constants.TASK_TYPE_CREATE_MINION,
-                execution)
+                minion_machine_id, create_minion_task_type, execution)
 
 
             self._create_task(
             self._create_task(
                 minion_machine_id,
                 minion_machine_id,
-                constants.TASK_TYPE_DELETE_MINION,
+                delete_minion_task_type,
                 execution, on_error_only=True,
                 execution, on_error_only=True,
                 depends_on=[create_minion_task.id])
                 depends_on=[create_minion_task.id])
 
 
@@ -4050,6 +4136,25 @@ class ConductorServerEndpoint(object):
         return self._get_minion_pool_lifecycle_execution(
         return self._get_minion_pool_lifecycle_execution(
             ctxt, minion_pool_id, execution.id).to_dict()
             ctxt, minion_pool_id, execution.id).to_dict()
 
 
+    def _check_all_pool_minion_machines_available(self, minion_pool):
+        if not minion_pool.minion_machines:
+            LOG.debug(
+                "Minion pool '%s' does not have any allocated machines.",
+                minion_pool.id)
+            return
+
+        allocated_machine_statuses = {
+            machine.id: machine.status
+            for machine in minion_pool.minion_machines
+            if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE}
+
+        if allocated_machine_statuses:
+            raise exception.InvalidMinionPoolState(
+                "Minion pool with ID '%s' has one or machines which are in-use "
+                "or otherwise unmodifiable: %s" % (
+                    minion_pool.id,
+                    allocated_machine_statuses))
+
     @minion_pool_synchronized
     @minion_pool_synchronized
     def deallocate_minion_pool_machines(self, ctxt, minion_pool_id):
     def deallocate_minion_pool_machines(self, ctxt, minion_pool_id):
         LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
         LOG.info("Attempting to deallocate Minion Pool '%s'.", minion_pool_id)
@@ -4064,8 +4169,7 @@ class ConductorServerEndpoint(object):
                     minion_pool_id, minion_pool.pool_status,
                     minion_pool_id, minion_pool.pool_status,
                     constants.MINION_POOL_STATUS_ALLOCATED))
                     constants.MINION_POOL_STATUS_ALLOCATED))
 
 
-        # TODO(aznashwan): check minion pool running
-        # executions/allocated machines
+        self._check_all_pool_minion_machines_available(minion_pool)
 
 
         execution = models.TasksExecution()
         execution = models.TasksExecution()
         execution.id = str(uuid.uuid4())
         execution.id = str(uuid.uuid4())
@@ -4074,6 +4178,12 @@ class ConductorServerEndpoint(object):
         execution.type = (
         execution.type = (
             constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS)
             constants.EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS)
 
 
+        delete_minion_task_type = (
+            constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+        if minion_pool.pool_platform == constants.PROVIDER_PLATFORM_SOURCE:
+            delete_minion_task_type = (
+                constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+
         for minion_machine in minion_pool.minion_machines:
         for minion_machine in minion_pool.minion_machines:
             minion_machine_id = minion_machine.id
             minion_machine_id = minion_machine.id
             minion_pool.info[minion_machine_id] = {
             minion_pool.info[minion_machine_id] = {
@@ -4081,7 +4191,7 @@ class ConductorServerEndpoint(object):
                 "minion_provider_properties": (
                 "minion_provider_properties": (
                     minion_machine.provider_properties)}
                     minion_machine.provider_properties)}
             self._create_task(
             self._create_task(
-                minion_machine_id, constants.TASK_TYPE_DELETE_MINION,
+                minion_machine_id, delete_minion_task_type,
                 # NOTE: we set 'on_error=True' to allow for the completion of
                 # NOTE: we set 'on_error=True' to allow for the completion of
                 # already running deletion tasks to prevent partial deletes:
                 # already running deletion tasks to prevent partial deletes:
                 execution, on_error=True)
                 execution, on_error=True)
@@ -4136,7 +4246,7 @@ class ConductorServerEndpoint(object):
     def delete_minion_pool(self, ctxt, minion_pool_id):
     def delete_minion_pool(self, ctxt, minion_pool_id):
         minion_pool = self._get_minion_pool(
         minion_pool = self._get_minion_pool(
             ctxt, minion_pool_id, include_tasks_executions=False,
             ctxt, minion_pool_id, include_tasks_executions=False,
-            include_machines=False)
+            include_machines=True)
         acceptable_deletion_statuses = [
         acceptable_deletion_statuses = [
             constants.MINION_POOL_STATUS_UNINITIALIZED,
             constants.MINION_POOL_STATUS_UNINITIALIZED,
             constants.MINION_POOL_STATUS_ERROR]
             constants.MINION_POOL_STATUS_ERROR]

+ 20 - 10
coriolis/constants.py

@@ -130,15 +130,24 @@ TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = (
 TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
 TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
 TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 
 
-TASK_TYPE_VALIDATE_MINION_POOL_OPTIONS = (
-    "VALIDATE_MINION_POOL_ENVIRONMENT_OPTIONS")
-TASK_TYPE_CREATE_MINION = "CREATE_MINION"
-TASK_TYPE_DELETE_MINION = "DELETE_MINION"
-TASK_TYPE_STOP_MINION = "STOP_MINION"
-TASK_TYPE_START_MINION = "START_MINION"
-TASK_TYPE_SET_UP_SHARED_POOL_RESOURCES = "SET_UP_POOL_RESOURCES"
-TASK_TYPE_TEAR_DOWN_SHARED_POOL_RESOURCES = (
-    "TEAR_DOWN_SHARED_POOL_RESOURCES")
+TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS = (
+    "VALIDATE_SOURCE_MINION_POOL_ENVIRONMENT_OPTIONS")
+TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS = (
+    "VALIDATE_DESTINATION_MINION_POOL_ENVIRONMENT_OPTIONS")
+TASK_TYPE_CREATE_SOURCE_MINION_MACHINE = "CREATE_SOURCE_MINION_MACHINE"
+TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE = (
+    "CREATE_DESTINATION_MINION_MACHINE")
+TASK_TYPE_DELETE_SOURCE_MINION_MACHINE = "DELETE_SOURCE_MINION_MACHINE"
+TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE = (
+    "DELETE_DESTINATION_MINION_MACHINE")
+TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES = (
+    "SET_UP_SOURCE_POOL_SHARED_RESOURCES")
+TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES = (
+    "SET_UP_DESTINATION_POOL_SHARED_RESOURCES")
+TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES = (
+    "TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES")
+TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES = (
+    "TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES")
 TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION = "ATTACH_VOLUMES_TO_SOURCE_MINION"
 TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION = "ATTACH_VOLUMES_TO_SOURCE_MINION"
 TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION = (
 TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION = (
     "DETACH_VOLUMES_FROM_SOURCE_MINION")
     "DETACH_VOLUMES_FROM_SOURCE_MINION")
@@ -187,7 +196,8 @@ PROVIDER_TYPE_ENDPOINT_STORAGE = 32768
 PROVIDER_TYPE_SOURCE_REPLICA_UPDATE = 65536
 PROVIDER_TYPE_SOURCE_REPLICA_UPDATE = 65536
 PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS = 131072
 PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS = 131072
 PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE = 262144
 PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE = 262144
-PROVIDER_TYPE_MINION_POOL = 524288
+PROVIDER_TYPE_SOURCE_MINION_POOL = 524288
+PROVIDER_TYPE_DESTINATION_MINION_POOL = 1048576
 
 
 DISK_FORMAT_VMDK = 'vmdk'
 DISK_FORMAT_VMDK = 'vmdk'
 DISK_FORMAT_RAW = 'raw'
 DISK_FORMAT_RAW = 'raw'

+ 2 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -48,6 +48,8 @@ def upgrade(migrate_engine):
                 "pool_name", sqlalchemy.String(255), nullable=False),
                 "pool_name", sqlalchemy.String(255), nullable=False),
             sqlalchemy.Column(
             sqlalchemy.Column(
                 "pool_os_type", sqlalchemy.String(255), nullable=False),
                 "pool_os_type", sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                "pool_platform", sqlalchemy.String(255), nullable=True),
             sqlalchemy.Column(
             sqlalchemy.Column(
                 "pool_status", sqlalchemy.String(255), nullable=False,
                 "pool_status", sqlalchemy.String(255), nullable=False,
                 default=lambda: "UNKNOWN"),
                 default=lambda: "UNKNOWN"),

+ 3 - 0
coriolis/db/sqlalchemy/models.py

@@ -516,6 +516,8 @@ class MinionPoolLifecycle(BaseTransferAction):
         nullable=False)
         nullable=False)
     pool_os_type = sqlalchemy.Column(
     pool_os_type = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False)
         sqlalchemy.String(255), nullable=False)
+    pool_platform = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False)
     pool_status = sqlalchemy.Column(
     pool_status = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False,
         sqlalchemy.String(255), nullable=False,
         default=lambda: constants.MINION_POOL_STATUS_UNKNOWN)
         default=lambda: constants.MINION_POOL_STATUS_UNKNOWN)
@@ -546,6 +548,7 @@ class MinionPoolLifecycle(BaseTransferAction):
             "id": self.id,
             "id": self.id,
             "pool_name": self.pool_name,
             "pool_name": self.pool_name,
             "pool_os_type": self.pool_os_type,
             "pool_os_type": self.pool_os_type,
+            "pool_platform": self.pool_platform,
             "pool_shared_resources": self.pool_shared_resources,
             "pool_shared_resources": self.pool_shared_resources,
             "pool_status": self.pool_status,
             "pool_status": self.pool_status,
             "minimum_minions": self.minimum_minions,
             "minimum_minions": self.minimum_minions,

+ 0 - 14
coriolis/endpoint_minion_options/api.py

@@ -1,14 +0,0 @@
-# Copyright 2016 Cloudbase Solutions Srl
-# All Rights Reserved.
-
-from coriolis.conductor.rpc import client as rpc_client
-
-
-class API(object):
-    def __init__(self):
-        self._rpc_client = rpc_client.ConductorClient()
-
-    def get_endpoint_minion_pool_options(
-            self, ctxt, endpoint_id, env=None, option_names=None):
-        return self._rpc_client.get_endpoint_minion_pool_options(
-            ctxt, endpoint_id, env, option_names)

+ 0 - 0
coriolis/endpoint_minion_options/__init__.py → coriolis/endpoint_minion_pool_options/__init__.py


+ 19 - 0
coriolis/endpoint_minion_pool_options/api.py

@@ -0,0 +1,19 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def get_endpoint_source_minion_pool_options(
+            self, ctxt, endpoint_id, env=None, option_names=None):
+        return self._rpc_client.get_endpoint_source_minion_pool_options(
+            ctxt, endpoint_id, env, option_names)
+
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, env=None, option_names=None):
+        return self._rpc_client.get_endpoint_destination_minion_pool_options(
+            ctxt, endpoint_id, env, option_names)

+ 9 - 4
coriolis/endpoints/api.py

@@ -42,10 +42,15 @@ class API(object):
         return self._rpc_client.validate_endpoint_source_environment(
         return self._rpc_client.validate_endpoint_source_environment(
             ctxt, endpoint_id, source_env)
             ctxt, endpoint_id, source_env)
 
 
-    @utils.bad_request_on_error("Invalid minion pool environment: %s")
-    def validate_endpoint_minion_pool_options(
+    @utils.bad_request_on_error("Invalid source minion pool environment: %s")
+    def validate_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
             self, ctxt, endpoint_id, pool_environment):
-        return self._rpc_client.validate_endpoint_minion_pool_options(
+        return self._rpc_client.validate_endpoint_source_minion_pool_options(
             ctxt, endpoint_id, pool_environment)
             ctxt, endpoint_id, pool_environment)
 
 
-
+    @utils.bad_request_on_error(
+        "Invalid destination minion pool environment: %s")
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, endpoint_id, pool_environment):
+        return self._rpc_client.validate_endpoint_destination_minion_pool_options(
+            ctxt, endpoint_id, pool_environment)

+ 6 - 6
coriolis/minion_pools/api.py

@@ -10,13 +10,13 @@ class API(object):
         self._rpc_client = rpc_client.ConductorClient()
         self._rpc_client = rpc_client.ConductorClient()
 
 
     def create(
     def create(
-            self, ctxt, name, endpoint_id, pool_os_type, environment_options,
-            minimum_minions, maximum_minions, minion_max_idle_time,
-            minion_retention_strategy, notes=None):
+            self, ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=None):
         return self._rpc_client.create_minion_pool(
         return self._rpc_client.create_minion_pool(
-            ctxt, name, endpoint_id, pool_os_type, environment_options,
-            minimum_minions, maximum_minions, minion_max_idle_time,
-            minion_retention_strategy, notes=notes)
+            ctxt, name, endpoint_id, pool_platform, pool_os_type,
+            environment_options, minimum_minions, maximum_minions,
+            minion_max_idle_time, minion_retention_strategy, notes=notes)
 
 
     def update(self, ctxt, minion_pool_id, updated_values):
     def update(self, ctxt, minion_pool_id, updated_values):
         return self._rpc_client.update_minion_pool(
         return self._rpc_client.update_minion_pool(

+ 16 - 5
coriolis/policies/endpoints.py

@@ -151,17 +151,28 @@ ENDPOINTS_POLICY_DEFAULT_RULES = [
         ]
         ]
     ),
     ),
     policy.DocumentedRuleDefault(
     policy.DocumentedRuleDefault(
-        get_endpoints_policy_label('list_minion_pool_options'),
+        get_endpoints_policy_label('list_source_minion_pool_options'),
         ENDPOINTS_POLICY_DEFAULT_RULE,
         ENDPOINTS_POLICY_DEFAULT_RULE,
-        "List available minion pool options for endpoint",
+        "List available source minion pool options for endpoint",
         [
         [
             {
             {
-                "path": "/endpoint/{endpoint_id}/minion-pool-option",
+                "path": "/endpoint/{endpoint_id}/source-minion-pool-options",
                 "method": "GET"
                 "method": "GET"
             }
             }
         ]
         ]
-    )
-
+    ),
+    policy.DocumentedRuleDefault(
+        get_endpoints_policy_label('list_destination_minion_pool_options'),
+        ENDPOINTS_POLICY_DEFAULT_RULE,
+        "List available destination pool options for endpoint",
+        [
+            {
+                "path": (
+                    "/endpoint/{endpoint_id}/destination-minion-pool-options"),
+                "method": "GET"
+            }
+        ]
+    ),
 ]
 ]
 
 
 
 

+ 21 - 16
coriolis/providers/base.py

@@ -534,7 +534,7 @@ class BaseUpdateDestinationReplicaProvider(
         """
         """
 
 
 
 
-class BaseMinionPoolProvider(
+class _BaseMinionPoolProvider(
         object, with_metaclass(abc.ABCMeta)):
         object, with_metaclass(abc.ABCMeta)):
     """ Class for providers which offer Minion Pool management functionality.
     """ Class for providers which offer Minion Pool management functionality.
     """
     """
@@ -562,18 +562,7 @@ class BaseMinionPoolProvider(
         pass
         pass
 
 
     @abc.abstractmethod
     @abc.abstractmethod
-    def validate_osmorphing_minion_compatibility_for_transfer(
-            self, ctxt, connection_info, export_info, environment_options,
-            minion_properties):
-        """ Validates compatibility between the OSMorphing pool's options and
-        the options selected for a given transfer. Should raise if any options
-        of the minions in the pool might be deemed incompatible with the
-        desired transfer options.
-        """
-        pass
-
-    @abc.abstractmethod
-    def validate_minion_pool_options(
+    def validate_minion_pool_environment_options(
             self, ctxt, connection_info, environment_options):
             self, ctxt, connection_info, environment_options):
         """ Validates the provided pool options. """
         """ Validates the provided pool options. """
         pass
         pass
@@ -625,14 +614,30 @@ class BaseMinionPoolProvider(
             self, ctxt, connection_info, minion_properties, volumes_info):
             self, ctxt, connection_info, minion_properties, volumes_info):
         pass
         pass
 
 
+
+class BaseSourceMinionPoolProvider(_BaseMinionPoolProvider):
+
+    pass
+
+
+class BaseDestinationMinionPoolProvider(_BaseMinionPoolProvider):
+
+    @abc.abstractmethod
+    def validate_osmorphing_minion_compatibility_for_transfer(
+            self, ctxt, connection_info, export_info, environment_options,
+            minion_properties):
+        """ Validates compatibility between the OSMorphing pool's options and
+        the options selected for a given transfer. Should raise if any options
+        of the minions in the pool might be deemed incompatible with the
+        desired transfer options.
+        """
+        pass
+
     @abc.abstractmethod
     @abc.abstractmethod
     def get_additional_os_morphing_info(
     def get_additional_os_morphing_info(
             self, ctxt, connection_info, target_environment,
             self, ctxt, connection_info, target_environment,
             instance_deployment_info):
             instance_deployment_info):
         """ This method should return any additional 'osmorphing_info'
         """ This method should return any additional 'osmorphing_info'
         as defined in coriolis.schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA
         as defined in coriolis.schemas.CORIOLIS_OS_MORPHING_RESOURCES_SCHEMA
-        Source-only providers can safely implement a stub method which returns
-        nothing, as this will only ever be called during OSMorphing for a
-        target plugin.
         """
         """
         pass
         pass

+ 4 - 2
coriolis/providers/factory.py

@@ -50,8 +50,10 @@ PROVIDER_TYPE_MAP = {
         base.BaseUpdateDestinationReplicaProvider),
         base.BaseUpdateDestinationReplicaProvider),
     constants.PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS: (
     constants.PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS: (
         base.BaseEndpointSourceOptionsProvider),
         base.BaseEndpointSourceOptionsProvider),
-    constants.PROVIDER_TYPE_MINION_POOL: (
-        base.BaseMinionPoolProvider)
+    constants.PROVIDER_TYPE_SOURCE_MINION_POOL: (
+        base.BaseSourceMinionPoolProvider),
+    constants.PROVIDER_TYPE_DESTINATION_MINION_POOL: (
+        base.BaseDestinationMinionPoolProvider)
 }
 }
 
 
 
 

+ 4 - 2
coriolis/schemas.py

@@ -22,8 +22,10 @@ PROVIDER_CONNECTION_INFO_SCHEMA_NAME = "connection_info_schema.json"
 
 
 PROVIDER_TARGET_ENVIRONMENT_SCHEMA_NAME = "target_environment_schema.json"
 PROVIDER_TARGET_ENVIRONMENT_SCHEMA_NAME = "target_environment_schema.json"
 PROVIDER_SOURCE_ENVIRONMENT_SCHEMA_NAME = "source_environment_schema.json"
 PROVIDER_SOURCE_ENVIRONMENT_SCHEMA_NAME = "source_environment_schema.json"
-PROVIDER_MINION_POOL_ENVIRONMENT_SCHEMA_NAME = (
-    "minion_pool_environment_schema.json")
+PROVIDER_SOURCE_MINION_POOL_ENVIRONMENT_SCHEMA_NAME = (
+    "source_minion_pool_environment_schema.json")
+PROVIDER_DESTINATION_MINION_POOL_ENVIRONMENT_SCHEMA_NAME = (
+    "destination_minion_pool_environment_schema.json")
 
 
 _CORIOLIS_VM_EXPORT_INFO_SCHEMA_NAME = "vm_export_info_schema.json"
 _CORIOLIS_VM_EXPORT_INFO_SCHEMA_NAME = "vm_export_info_schema.json"
 _CORIOLIS_VM_INSTANCE_INFO_SCHEMA_NAME = "vm_instance_info_schema.json"
 _CORIOLIS_VM_INSTANCE_INFO_SCHEMA_NAME = "vm_instance_info_schema.json"

+ 15 - 5
coriolis/tasks/base.py

@@ -56,14 +56,18 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
         """ Returns a list of the string fields which are required
         """ Returns a list of the string fields which are required
         to be present during the tasks' run method. """
         to be present during the tasks' run method. """
-        pass
+        raise NotImplementedError(
+            "No required task info properties specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractclassmethod
     @abc.abstractclassmethod
     def get_returned_task_info_properties(cls):
     def get_returned_task_info_properties(cls):
         """ Returns a list of the string fields which are returned by the
         """ Returns a list of the string fields which are returned by the
         tasks' run method to be added to the task info.
         tasks' run method to be added to the task info.
         """
         """
-        pass
+        raise NotImplementedError(
+            "No returned task info properties specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractclassmethod
     @abc.abstractclassmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
@@ -71,14 +75,18 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
         of all the provider types (constants.PROVIDER_TYPE_*) required for the
         of all the provider types (constants.PROVIDER_TYPE_*) required for the
         task.
         task.
         """
         """
-        pass
+        raise NotImplementedError(
+            "No required provider types specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractclassmethod
     @abc.abstractclassmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
         """ Returns whether the task operates on the source platform, the
         """ Returns whether the task operates on the source platform, the
         destination, or both. (constants.TASK_PLATFORM_*)
         destination, or both. (constants.TASK_PLATFORM_*)
         """
         """
-        pass
+        raise NotImplementedError(
+            "No required platform specified for task class of "
+            "type '%s'." % cls)
 
 
     @abc.abstractmethod
     @abc.abstractmethod
     def _run(self, ctxt, instance, origin, destination, task_info,
     def _run(self, ctxt, instance, origin, destination, task_info,
@@ -88,7 +96,9 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
         'self.get_returned_task_info_properties'.
         'self.get_returned_task_info_properties'.
         Must be implemented in all child classes.
         Must be implemented in all child classes.
         """
         """
-        pass
+        raise NotImplementedError(
+            "No base run method implemented for task class of type '%s'." % (
+                self.__class__))
 
 
     def run(self, ctxt, instance, origin, destination, task_info,
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):

+ 20 - 10
coriolis/tasks/factory.py

@@ -83,16 +83,26 @@ _TASKS_MAP = {
         replica_tasks.UpdateSourceReplicaTask,
         replica_tasks.UpdateSourceReplicaTask,
     constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
     constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
         replica_tasks.UpdateDestinationReplicaTask,
         replica_tasks.UpdateDestinationReplicaTask,
-    constants.TASK_TYPE_VALIDATE_MINION_POOL_OPTIONS:
-        minion_pool_tasks.ValidateMinionPoolOptionsTask,
-    constants.TASK_TYPE_CREATE_MINION:
-        minion_pool_tasks.CreateMinionTask,
-    constants.TASK_TYPE_DELETE_MINION:
-        minion_pool_tasks.DeleteMinionTask,
-    constants.TASK_TYPE_SET_UP_SHARED_POOL_RESOURCES:
-        minion_pool_tasks.SetUpPoolSupportingResourcesTask,
-    constants.TASK_TYPE_TEAR_DOWN_SHARED_POOL_RESOURCES:
-        minion_pool_tasks.TearDownPoolSupportingResourcesTask,
+    constants.TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS:
+        minion_pool_tasks.ValidateSourceMinionPoolOptionsTask,
+    constants.TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS:
+        minion_pool_tasks.ValidateDestinationMinionPoolOptionsTask,
+    constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE:
+        minion_pool_tasks.CreateSourceMinionMachineTask,
+    constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE:
+        minion_pool_tasks.CreateDestinationMinionMachineTask,
+    constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE:
+        minion_pool_tasks.DeleteSourceMinionMachineTask,
+    constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE:
+        minion_pool_tasks.DeleteDestinationMinionMachineTask,
+    constants.TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.SetUpSourcePoolSupportingResourcesTask,
+    constants.TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.SetUpDestinationPoolSupportingResources,
+    constants.TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.TearDownSourcePoolSupportingResourcesTask,
+    constants.TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES:
+        minion_pool_tasks.TearDownDestinationPoolSupportingResources,
     constants.TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION:
     constants.TASK_TYPE_ATTACH_VOLUMES_TO_SOURCE_MINION:
         minion_pool_tasks.AttachVolumesToSourceMinionTask,
         minion_pool_tasks.AttachVolumesToSourceMinionTask,
     constants.TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION:
     constants.TASK_TYPE_DETACH_VOLUMES_FROM_SOURCE_MINION:

+ 154 - 99
coriolis/tasks/minion_pool_tasks.py

@@ -4,6 +4,7 @@
 from oslo_log import log as logging
 from oslo_log import log as logging
 
 
 from coriolis import constants
 from coriolis import constants
+from coriolis import events
 from coriolis import exception
 from coriolis import exception
 from coriolis.providers import factory as providers_factory
 from coriolis.providers import factory as providers_factory
 from coriolis.tasks import base
 from coriolis.tasks import base
@@ -29,15 +30,27 @@ OSMOPRHING_MINION_TASK_INFO_FIELD_MAPPINGS = {
     "osmorphing_minion_connection_info": "osmorphing_connection_info"}
     "osmorphing_minion_connection_info": "osmorphing_connection_info"}
 
 
 
 
-class ValidateMinionPoolOptionsTask(base.TaskRunner):
+def _get_required_minion_pool_provider_types_for_platform(
+        platform_type):
+    provider_type = None
+    if platform_type == constants.PROVIDER_PLATFORM_SOURCE:
+        provider_type = constants.PROVIDER_TYPE_SOURCE_MINION_POOL
+    elif platform_type == constants.PROVIDER_PLATFORM_DESTINATION:
+        provider_type = constants.PROVIDER_TYPE_DESTINATION_MINION_POOL
+    else:
+        raise NotImplementedError(
+            "Cannot determine required minion pool provider type for "
+            "platform of type '%s'" % platform_type)
+    return {
+        platform_type: [provider_type]}
+
+
+class _BaseValidateMinionPoolOptionsTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
-        # TODO(aznashwan): this is only used to determined the Worker Service
-        # region of which endpoint to aim the Scheduler towards during normal
-        # transfer actions. Once the DB model hirearchy for transfer actions
-        # gets overhauled and this will be redundant, it should be removed.
-        return constants.TASK_PLATFORM_DESTINATION
+        raise NotImplementedError(
+            "No Minion pool options validation platform specified.")
 
 
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
@@ -49,40 +62,47 @@ class ValidateMinionPoolOptionsTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            # TODO(aznashwan): remove redundant doubling after
-            # transfer action DB model overhaul:
-            constants.PROVIDER_PLATFORM_SOURCE: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-            constants.PROVIDER_PLATFORM_DESTINATION: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-        }
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
 
 
         # NOTE: both origin or target endpoints would work:
         # NOTE: both origin or target endpoints would work:
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            destination["type"], provider_type, event_handler)
 
 
         environment_options = task_info['pool_environment_options']
         environment_options = task_info['pool_environment_options']
-        provider.validate_minion_pool_options(
+        provider.validate_minion_pool_environment_options(
             ctxt, connection_info, environment_options)
             ctxt, connection_info, environment_options)
 
 
         return {}
         return {}
 
 
 
 
-class CreateMinionTask(base.TaskRunner):
+class ValidateSourceMinionPoolOptionsTask(_BaseValidateMinionPoolOptionsTask):
 
 
     @classmethod
     @classmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
-        # TODO(aznashwan): this is only used to determined the Worker Service
-        # region of which endpoint to aim the Scheduler towards during normal
-        # transfer actions. Once the DB model hirearchy for transfer actions
-        # gets overhauled and this will be redundant, it should be removed.
-        return constants.TASK_PLATFORM_DESTINATION
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class ValidateDestinationMinionPoolOptionsTask(
+        _BaseValidateMinionPoolOptionsTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseCreateMinionMachineTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion pool creation required platform specified.")
 
 
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
@@ -98,23 +118,18 @@ class CreateMinionTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            # TODO(aznashwan): remove redundant doubling after
-            # transfer action DB model overhaul:
-            constants.PROVIDER_PLATFORM_SOURCE: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-            constants.PROVIDER_PLATFORM_DESTINATION: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-        }
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
 
 
         # NOTE: both origin or target endpoints would work:
         # NOTE: both origin or target endpoints would work:
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            destination["type"], provider_type, event_handler)
 
 
         pool_identifier = task_info['pool_identifier']
         pool_identifier = task_info['pool_identifier']
         environment_options = task_info['pool_environment_options']
         environment_options = task_info['pool_environment_options']
@@ -156,15 +171,26 @@ class CreateMinionTask(base.TaskRunner):
                 "minion_provider_properties")}
                 "minion_provider_properties")}
 
 
 
 
-class DeleteMinionTask(base.TaskRunner):
+class CreateSourceMinionMachineTask(_BaseCreateMinionMachineTask):
 
 
     @classmethod
     @classmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
-        # TODO(aznashwan): this is only used to determined the Worker Service
-        # region of which endpoint to aim the Scheduler towards during normal
-        # transfer actions. Once the DB model hirearchy for transfer actions
-        # gets overhauled and this will be redundant, it should be removed.
-        return constants.TASK_PLATFORM_DESTINATION
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class CreateDestinationMinionMachineTask(_BaseCreateMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseDeleteMinionMachineTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion deletion required platform specified.")
 
 
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
@@ -176,23 +202,18 @@ class DeleteMinionTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            # TODO(aznashwan): remove redundant doubling after
-            # transfer action DB model overhaul:
-            constants.PROVIDER_PLATFORM_SOURCE: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-            constants.PROVIDER_PLATFORM_DESTINATION: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-        }
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
 
 
         # NOTE: both origin or target endpoints would work:
         # NOTE: both origin or target endpoints would work:
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            destination["type"], provider_type, event_handler)
 
 
         minion_provider_properties = task_info['minion_provider_properties']
         minion_provider_properties = task_info['minion_provider_properties']
         provider.delete_minion(
         provider.delete_minion(
@@ -201,15 +222,26 @@ class DeleteMinionTask(base.TaskRunner):
         return {}
         return {}
 
 
 
 
-class SetUpPoolSupportingResourcesTask(base.TaskRunner):
+class DeleteSourceMinionMachineTask(_BaseDeleteMinionMachineTask):
 
 
     @classmethod
     @classmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
-        # TODO(aznashwan): this is only used to determined the Worker Service
-        # region of which endpoint to aim the Scheduler towards during normal
-        # transfer actions. Once the DB model hirearchy for transfer actions
-        # gets overhauled and this will be redundant, it should be removed.
-        return constants.TASK_PLATFORM_DESTINATION
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class DeleteDestinationMinionMachineTask(_BaseDeleteMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseSetUpPoolSupportingResourcesTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No pool shared resource setup required platform specified.")
 
 
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
@@ -221,23 +253,18 @@ class SetUpPoolSupportingResourcesTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            # TODO(aznashwan): remove redundant doubling after
-            # transfer action DB model overhaul:
-            constants.PROVIDER_PLATFORM_SOURCE: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-            constants.PROVIDER_PLATFORM_DESTINATION: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-        }
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
 
 
         # NOTE: both origin or target endpoints would work:
         # NOTE: both origin or target endpoints would work:
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            destination["type"], provider_type, event_handler)
 
 
         pool_identifier = task_info['pool_identifier']
         pool_identifier = task_info['pool_identifier']
         environment_options = task_info['pool_environment_options']
         environment_options = task_info['pool_environment_options']
@@ -247,15 +274,28 @@ class SetUpPoolSupportingResourcesTask(base.TaskRunner):
         return {"pool_shared_resources": pool_shared_resources}
         return {"pool_shared_resources": pool_shared_resources}
 
 
 
 
-class TearDownPoolSupportingResourcesTask(base.TaskRunner):
+class SetUpSourcePoolSupportingResourcesTask(
+        _BaseSetUpPoolSupportingResourcesTask):
 
 
     @classmethod
     @classmethod
     def get_required_platform(cls):
     def get_required_platform(cls):
-        # TODO(aznashwan): this is only used to determined the Worker Service
-        # region of which endpoint to aim the Scheduler towards during normal
-        # transfer actions. Once the DB model hirearchy for transfer actions
-        # gets overhauled and this will be redundant, it should be removed.
-        return constants.TASK_PLATFORM_DESTINATION
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class SetUpDestinationPoolSupportingResources(
+        _BaseSetUpPoolSupportingResourcesTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
+class _BaseTearDownPoolSupportingResourcesTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No pool tear down shared resoures required platform specified.")
 
 
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
@@ -267,23 +307,18 @@ class TearDownPoolSupportingResourcesTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            # TODO(aznashwan): remove redundant doubling after
-            # transfer action DB model overhaul:
-            constants.PROVIDER_PLATFORM_SOURCE: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-            constants.PROVIDER_PLATFORM_DESTINATION: [
-                constants.PROVIDER_TYPE_MINION_POOL],
-        }
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
     def _run(self, ctxt, minion_pool_machine_id, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
 
 
         # NOTE: both origin or target endpoints would work:
         # NOTE: both origin or target endpoints would work:
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            destination["type"], provider_type, event_handler)
 
 
         environment_options = task_info['pool_environment_options']
         environment_options = task_info['pool_environment_options']
         pool_shared_resources = task_info['pool_shared_resources']
         pool_shared_resources = task_info['pool_shared_resources']
@@ -294,6 +329,22 @@ class TearDownPoolSupportingResourcesTask(base.TaskRunner):
         return {"pool_shared_resources": None}
         return {"pool_shared_resources": None}
 
 
 
 
+class TearDownSourcePoolSupportingResourcesTask(
+        _BaseTearDownPoolSupportingResourcesTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_SOURCE
+
+
+class TearDownDestinationPoolSupportingResources(
+        _BaseTearDownPoolSupportingResourcesTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.PROVIDER_PLATFORM_DESTINATION
+
+
 class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
 class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
     """ The purposes of the volume attachment tasks are to:
     """ The purposes of the volume attachment tasks are to:
     1) attach the volumes of the minions
     1) attach the volumes of the minions
@@ -320,8 +371,8 @@ class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            cls.get_required_platform(): [constants.PROVIDER_TYPE_MINION_POOL]}
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     @classmethod
     @classmethod
     def _get_minion_properties_task_info_field(cls):
     def _get_minion_properties_task_info_field(cls):
@@ -353,9 +404,10 @@ class _BaseVolumesMinionMachineAttachmentTask(base.TaskRunner):
                     required_platform))
                     required_platform))
 
 
         connection_info = base.get_connection_info(ctxt, platform_to_target)
         connection_info = base.get_connection_info(ctxt, platform_to_target)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            platform_to_target["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            platform_to_target["type"], provider_type, event_handler)
 
 
         volumes_info = task_info["volumes_info"]
         volumes_info = task_info["volumes_info"]
         minion_properties = task_info[
         minion_properties = task_info[
@@ -508,8 +560,8 @@ class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            cls.get_required_platform(): [constants.PROVIDER_TYPE_MINION_POOL]}
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     @classmethod
     @classmethod
     def _get_provider_pool_validation_operation(cls, provider):
     def _get_provider_pool_validation_operation(cls, provider):
@@ -552,9 +604,10 @@ class _BaseValidateMinionCompatibilityTask(base.TaskRunner):
                     required_platform))
                     required_platform))
 
 
         connection_info = base.get_connection_info(ctxt, platform_to_target)
         connection_info = base.get_connection_info(ctxt, platform_to_target)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            platform_to_target["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            platform_to_target["type"], provider_type, event_handler)
 
 
         export_info = task_info["export_info"]
         export_info = task_info["export_info"]
         minion_properties = task_info[
         minion_properties = task_info[
@@ -655,8 +708,8 @@ class _BaseReleaseMinionTask(base.TaskRunner):
 
 
     @classmethod
     @classmethod
     def get_required_provider_types(cls):
     def get_required_provider_types(cls):
-        return {
-            cls.get_required_platform(): [constants.PROVIDER_TYPE_MINION_POOL]}
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
 
 
     @classmethod
     @classmethod
     def _get_minion_task_info_field_mappings(cls):
     def _get_minion_task_info_field_mappings(cls):
@@ -665,6 +718,8 @@ class _BaseReleaseMinionTask(base.TaskRunner):
 
 
     def _run(self, ctxt, instance, origin, destination,
     def _run(self, ctxt, instance, origin, destination,
              task_info, event_handler):
              task_info, event_handler):
+        event_manager = events.EventManager(event_handler)
+        event_manager.progress_update("Releasing minion machine")
         return {
         return {
             field: None
             field: None
             for field in self.get_returned_task_info_properties()}
             for field in self.get_returned_task_info_properties()}
@@ -709,6 +764,11 @@ class CollectOSMorphingInfoTask(base.TaskRunner):
     def get_required_platform(cls):
     def get_required_platform(cls):
         return constants.TASK_PLATFORM_DESTINATION
         return constants.TASK_PLATFORM_DESTINATION
 
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
     @classmethod
     @classmethod
     def get_required_task_info_properties(cls):
     def get_required_task_info_properties(cls):
         return ["target_environment", "instance_deployment_info"]
         return ["target_environment", "instance_deployment_info"]
@@ -717,17 +777,12 @@ class CollectOSMorphingInfoTask(base.TaskRunner):
     def get_returned_task_info_properties(cls):
     def get_returned_task_info_properties(cls):
         return ["osmorphing_info"]
         return ["osmorphing_info"]
 
 
-    @classmethod
-    def get_required_provider_types(cls):
-        return {
-            constants.PROVIDER_PLATFORM_DESTINATION: [
-                constants.PROVIDER_TYPE_MINION_POOL]}
-
     def _run(self, ctxt, instance, origin, destination, task_info,
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
              event_handler):
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            destination["type"], constants.PROVIDER_TYPE_MINION_POOL,
-            event_handler)
+            destination["type"], provider_type, event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
         connection_info = base.get_connection_info(ctxt, destination)
         target_environment = task_info["target_environment"]
         target_environment = task_info["target_environment"]
         instance_deployment_info = task_info["instance_deployment_info"]
         instance_deployment_info = task_info["instance_deployment_info"]

+ 17 - 4
coriolis/worker/rpc/client.py

@@ -131,15 +131,28 @@ class WorkerClient(object):
     def get_service_status(self, ctxt):
     def get_service_status(self, ctxt):
         return self._client.call(ctxt, 'get_service_status')
         return self._client.call(ctxt, 'get_service_status')
 
 
-    def get_endpoint_minion_pool_options(
+    def get_endpoint_source_minion_pool_options(
             self, ctxt, platform_name, connection_info, env, option_names):
             self, ctxt, platform_name, connection_info, env, option_names):
         return self._client.call(
         return self._client.call(
-            ctxt, 'get_endpoint_minion_pool_options',
+            ctxt, 'get_endpoint_source_minion_pool_options',
             platform_name=platform_name, connection_info=connection_info,
             platform_name=platform_name, connection_info=connection_info,
             env=env, option_names=option_names)
             env=env, option_names=option_names)
 
 
-    def validate_endpoint_minion_pool_options(
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, platform_name, connection_info, env, option_names):
+        return self._client.call(
+            ctxt, 'get_endpoint_destination_minion_pool_options',
+            platform_name=platform_name, connection_info=connection_info,
+            env=env, option_names=option_names)
+
+    def validate_endpoint_source_minion_pool_options(
+            self, ctxt, platform_name, pool_environment):
+        return self._client.call(
+            ctxt, 'validate_endpoint_source_minion_pool_options',
+            platform_name=platform_name, pool_environment=pool_environment)
+
+    def validate_endpoint_destination_minion_pool_options(
             self, ctxt, platform_name, pool_environment):
             self, ctxt, platform_name, pool_environment):
         return self._client.call(
         return self._client.call(
-            ctxt, 'validate_endpoint_minion_pool_options',
+            ctxt, 'validate_endpoint_destination_minion_pool_options',
             platform_name=platform_name, pool_environment=pool_environment)
             platform_name=platform_name, pool_environment=pool_environment)

+ 53 - 7
coriolis/worker/rpc/server.py

@@ -372,15 +372,15 @@ class WorkerServerEndpoint(object):
 
 
         return options
         return options
 
 
-    def get_endpoint_minion_pool_options(
+    def get_endpoint_source_minion_pool_options(
             self, ctxt, platform_name, connection_info, env, option_names):
             self, ctxt, platform_name, connection_info, env, option_names):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
             platform_name,
             platform_name,
-            constants.PROVIDER_TYPE_MINION_POOL,
+            constants.PROVIDER_TYPE_SOURCE_MINION_POOL,
             None, raise_if_not_found=False)
             None, raise_if_not_found=False)
         if not provider:
         if not provider:
             raise exception.InvalidInput(
             raise exception.InvalidInput(
-                "Provider plugin for platform '%s' does not support listing "
+                "Provider plugin for platform '%s' does not support source "
                 "minion pool creation or management." % platform_name)
                 "minion pool creation or management." % platform_name)
 
 
         secret_connection_info = utils.get_secret_connection_info(
         secret_connection_info = utils.get_secret_connection_info(
@@ -396,6 +396,31 @@ class WorkerServerEndpoint(object):
 
 
         return options
         return options
 
 
+    def get_endpoint_destination_minion_pool_options(
+            self, ctxt, platform_name, connection_info, env, option_names):
+        provider = providers_factory.get_provider(
+            platform_name,
+            constants.PROVIDER_TYPE_DESTINATION_MINION_POOL,
+            None, raise_if_not_found=False)
+        if not provider:
+            raise exception.InvalidInput(
+                "Provider plugin for platform '%s' does not support "
+                "destination minion pool creation or management." % (
+                    platform_name))
+
+        secret_connection_info = utils.get_secret_connection_info(
+            ctxt, connection_info)
+
+        options = provider.get_minion_pool_options(
+            ctxt, secret_connection_info, env=env,
+            option_names=option_names)
+
+        # NOTE: the structure of option values is the same for minion pools:
+        schemas.validate_value(
+            options, schemas.CORIOLIS_DESTINATION_ENVIRONMENT_OPTIONS_SCHEMA)
+
+        return options
+
     def get_endpoint_source_options(
     def get_endpoint_source_options(
             self, ctxt, platform_name, connection_info, env, option_names):
             self, ctxt, platform_name, connection_info, env, option_names):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
@@ -483,10 +508,27 @@ class WorkerServerEndpoint(object):
 
 
         return (is_valid, message)
         return (is_valid, message)
 
 
-    def validate_endpoint_minion_pool_options(
+    def validate_endpoint_source_minion_pool_options(
             self, ctxt, platform_name, pool_environment):
             self, ctxt, platform_name, pool_environment):
         provider = providers_factory.get_provider(
         provider = providers_factory.get_provider(
-            platform_name, constants.PROVIDER_TYPE_MINION_POOL, None)
+            platform_name, constants.PROVIDER_TYPE_SOURCE_MINION_POOL, None)
+        pool_options_schema = provider.get_minion_pool_environment_schema()
+
+        is_valid = True
+        message = None
+        try:
+            schemas.validate_value(pool_environment, pool_options_schema)
+        except exception.SchemaValidationException as ex:
+            is_valid = False
+            message = str(ex)
+
+        return (is_valid, message)
+
+    def validate_endpoint_destination_minion_pool_options(
+            self, ctxt, platform_name, pool_environment):
+        provider = providers_factory.get_provider(
+            platform_name, constants.PROVIDER_TYPE_DESTINATION_MINION_POOL,
+            None)
         pool_options_schema = provider.get_minion_pool_environment_schema()
         pool_options_schema = provider.get_minion_pool_environment_schema()
 
 
         is_valid = True
         is_valid = True
@@ -546,9 +588,13 @@ class WorkerServerEndpoint(object):
             schema = provider.get_source_environment_schema()
             schema = provider.get_source_environment_schema()
             schemas["source_environment_schema"] = schema
             schemas["source_environment_schema"] = schema
 
 
-        if provider_type == constants.PROVIDER_TYPE_MINION_POOL:
+        if provider_type == constants.PROVIDER_TYPE_SOURCE_MINION_POOL:
+            schema = provider.get_minion_pool_environment_schema()
+            schemas["source_minion_pool_environment_schema"] = schema
+
+        if provider_type == constants.PROVIDER_TYPE_DESTINATION_MINION_POOL:
             schema = provider.get_minion_pool_environment_schema()
             schema = provider.get_minion_pool_environment_schema()
-            schemas["minion_pool_environment_schema"] = schema
+            schemas["destination_minion_pool_environment_schema"] = schema
 
 
         return schemas
         return schemas