Ver Fonte

Add Scheduler service.

Nashwan Azhari há 5 anos atrás
pai
commit
c45e1ddbd5

+ 2 - 4
coriolis/api/v1/services.py

@@ -38,7 +38,7 @@ class ServiceController(api_wsgi.Controller):
             service = body["service"]
             host = service["host"]
             binary = service["binary"]
-            topic = service.get("topic")
+            topic = service["topic"]
             mapped_regions = service.get('mapped_regions', [])
             enabled = service.get("enabled", True)
             return host, binary, topic, mapped_regions, enabled
@@ -57,9 +57,7 @@ class ServiceController(api_wsgi.Controller):
             self._validate_create_body(body))
         return service_view.single(req, self._service_api.create(
             context, host=host, binary=binary, topic=topic,
-            mapped_regions=mapped_regions, enabled=enabled,
-            # NOTE: providers and specs should be auto-discovered later:
-            providers={}, specs={}))
+            mapped_regions=mapped_regions, enabled=enabled))
 
     def _validate_update_body(self, body):
         try:

+ 3 - 1
coriolis/cmd/conductor.py

@@ -5,6 +5,7 @@ import sys
 
 from oslo_config import cfg
 
+from coriolis import constants
 from coriolis.conductor.rpc import server as rpc_server
 from coriolis import service
 from coriolis import utils
@@ -19,7 +20,8 @@ def main():
     service.check_locks_dir_empty()
 
     server = service.MessagingService(
-        'coriolis_conductor', [rpc_server.ConductorServerEndpoint()],
+        constants.CONDUCTOR_MAIN_MESSAGING_TOPIC,
+        [rpc_server.ConductorServerEndpoint()],
         rpc_server.VERSION)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())

+ 3 - 2
coriolis/cmd/replica_cron.py

@@ -5,9 +5,10 @@ import sys
 
 from oslo_config import cfg
 
-from coriolis.replica_cron.rpc import server as rpc_server
+from coriolis import constants
 from coriolis import service
 from coriolis import utils
+from coriolis.replica_cron.rpc import server as rpc_server
 
 CONF = cfg.CONF
 
@@ -18,7 +19,7 @@ def main():
     utils.setup_logging()
 
     server = service.MessagingService(
-        'coriolis_replica_cron_worker',
+        constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC,
         [rpc_server.ReplicaCronServerEndpoint()],
         rpc_server.VERSION, worker_count=1)
     launcher = service.service.launch(

+ 31 - 0
coriolis/cmd/scheduler.py

@@ -0,0 +1,31 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import sys
+
+from oslo_config import cfg
+
+from coriolis import constants
+from coriolis import service
+from coriolis import utils
+from coriolis.scheduler.rpc import server as rpc_server
+
+CONF = cfg.CONF
+
+
+def main():
+    CONF(sys.argv[1:], project='coriolis',
+         version="1.0.0")
+    utils.setup_logging()
+
+    server = service.MessagingService(
+        constants.SCHEDULER_MAIN_MESSAGING_TOPIC,
+        [rpc_server.SchedulerServerEndpoint()],
+        rpc_server.VERSION, worker_count=1)
+    launcher = service.service.launch(
+        CONF, server, workers=server.get_workers_count())
+    launcher.wait()
+
+
+if __name__ == "__main__":
+    main()

+ 2 - 8
coriolis/cmd/worker.py

@@ -18,15 +18,9 @@ def main():
          version="1.0.0")
     utils.setup_logging()
 
-    worker_topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
-        "host": utils.get_hostname(),
-        "binary": utils.get_binary_name()}
-
-    # TODO(aznashwan): find way to update the messaging topic being
-    # listened on by oslo_messaging.service.Service so as to not have to
-    # "hardcode" the worker topic from the binary level:
     server = service.MessagingService(
-        worker_topic, [rpc_server.WorkerServerEndpoint()],
+        constants.WORKER_MAIN_MESSAGING_TOPIC,
+        [rpc_server.WorkerServerEndpoint()],
         rpc_server.VERSION)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())

+ 6 - 6
coriolis/conductor/rpc/client.py

@@ -4,6 +4,7 @@
 from oslo_config import cfg
 import oslo_messaging as messaging
 
+from coriolis import constants
 from coriolis import rpc
 
 VERSION = "1.0"
@@ -19,8 +20,9 @@ CONF.register_opts(conductor_opts, 'conductor')
 
 
 class ConductorClient(object):
-    def __init__(self, timeout=None):
-        target = messaging.Target(topic='coriolis_conductor', version=VERSION)
+    def __init__(self, timeout=None,
+                 topic=constants.CONDUCTOR_MAIN_MESSAGING_TOPIC):
+        target = messaging.Target(topic=topic, version=VERSION)
         if timeout is None:
             timeout = CONF.conductor.conductor_rpc_timeout
         self._client = rpc.get_client(target, timeout=timeout)
@@ -336,12 +338,10 @@ class ConductorClient(object):
             ctxt, 'delete_region', region_id=region_id)
 
     def register_service(
-            self, ctxt, host, binary, topic, enabled, providers,
-            specs, mapped_regions):
+            self, ctxt, host, binary, topic, enabled, mapped_regions):
         return self._client.call(
             ctxt, 'register_service', host=host, binary=binary,
-            topic=topic, enabled=enabled, providers=providers, specs=specs,
-            mapped_regions=mapped_regions)
+            topic=topic, enabled=enabled, mapped_regions=mapped_regions)
 
     def get_services(self, ctxt):
         return self._client.call(ctxt, 'get_services')

+ 246 - 75
coriolis/conductor/rpc/server.py

@@ -18,6 +18,7 @@ from coriolis import exception
 from coriolis import keystone
 from coriolis.licensing import client as licensing_client
 from coriolis.replica_cron.rpc import client as rpc_cron_client
+from coriolis.scheduler.rpc import client as rpc_scheduler_client
 from coriolis import schemas
 from coriolis.tasks import factory as tasks_factory
 from coriolis import utils
@@ -44,6 +45,14 @@ TASK_DEADLOCK_ERROR_MESSAGE = (
     "A fatal deadlock has occurred. Further debugging is required. "
     "Please review the Conductor logs and contact support for assistance.")
 
+RPC_TOPIC_TO_CLIENT_CLASS_MAP = {
+    constants.WORKER_MAIN_MESSAGING_TOPIC: rpc_worker_client.WorkerClient,
+    constants.SCHEDULER_MAIN_MESSAGING_TOPIC: (
+        rpc_scheduler_client.SchedulerClient),
+    constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC: (
+        rpc_cron_client.ReplicaCronClient)
+}
+
 
 def endpoint_synchronized(func):
     @functools.wraps(func)
@@ -162,37 +171,67 @@ class ConductorServerEndpoint(object):
         self._licensing_client = licensing_client.LicensingClient.from_env()
         self._rpc_worker_client = rpc_worker_client.WorkerClient()
         self._replica_cron_client = rpc_cron_client.ReplicaCronClient()
+        self._scheduler_client = rpc_scheduler_client.SchedulerClient()
 
     def get_all_diagnostics(self, ctxt):
-        conductor = self.get_diagnostics(ctxt)
-        cron = self._replica_cron_client.get_diagnostics(ctxt)
-        # TODO(aznashwan): include diagnostics for every worker service:
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
-        worker = worker_rpc.get_diagnostics(ctxt)
-        return [
-            conductor,
-            cron,
-            worker]
+        diagnostics = [
+            self.get_diagnostics(ctxt),
+            self._replica_cron_client.get_diagnostics(ctxt)]
+        worker_diagnostics = []
+        for worker_service in self._scheduler_client.get_workers_for_specs(
+                ctxt):
+            worker_rpc = self._get_rpc_client_for_service(worker_service)
+            diagnostics.append(worker_rpc.get_diagnostics(ctxt))
+
+        return diagnostics
+
+    def _get_rpc_client_for_service(self, service, *client_args, **client_kwargs):
+        rpc_client_class = RPC_TOPIC_TO_CLIENT_CLASS_MAP.get(service.topic)
+        if not rpc_client_class:
+            raise exception.NotFound(
+                "No RPC client class for service with topic '%s'." % (
+                    service.topic))
+
+        topic = service.topic
+        if service.topic == constants.WORKER_MAIN_MESSAGING_TOPIC:
+            # NOTE: coriolis.service.MessagingService-type services (such
+            # as the worker), always have a dedicated per-host queue
+            # which can be used to target the service:
+            topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
+                "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
+                "host": service.host})
+
+        return rpc_client_class(*client_args, topic=topic, **client_kwargs)
+
+    def _get_any_worker_service(self, ctxt, random_choice=False, raw_dict=False):
+        services = self._scheduler_client.get_workers_for_specs(ctxt)
+        if not services:
+            raise exception.NoWorkerServiceError()
+        service = services[0]
+        if random_choice:
+            service = random.choice(services)
+        if raw_dict:
+            return service
+        return db_api.get_service(ctxt, service['id'])
+
+    def _get_worker_service_rpc_for_specs(
+            self, ctxt, provider_requirements=None, region_ids=None,
+            enabled=True, random_choice=False, raise_on_no_matches=True):
+        services = self._scheduler_client.get_workers_for_specs(
+            ctxt, provider_requirements=provider_requirements,
+            region_ids=region_ids, enabled=enabled)
+        services = self._scheduler_client.get_workers_for_specs(ctxt)
+        if not services:
+            if raise_on_no_matches:
+                raise exception.NoSuitableWorkerServiceError()
+            return None
 
-    def _get_rpc_for_worker_service(self, service):
-         return rpc_worker_client.WorkerClient(topic=service.topic)
+        selected_service = services[0]
+        if random_choice:
+            selected_service = random.choice(services)
+        service = db_api.get_service(ctxt, selected_service["id"])
 
-    def _get_any_worker_service(self, ctxt):
-        services = db_api.get_services(ctxt)
-        if not services:
-            raise exception.CoriolisException("No services.")
-        return random.choice(services)
-
-    def _run_operation_on_worker_service(
-            self, ctxt, service, operation_name, *args, **kwargs):
-        service_rpc_client = self._get_rpc_for_worker_service(service)
-        operation = getattr(service_rpc_client, operation_name, None)
-        if not operation:
-            raise exception.CoriolisException(
-                "Invalid operation name '%s' for RPC client '%s'" % (
-                    operation_name, service_rpc_client))
-        return operation(*args, **kwargs)
+        return self._get_rpc_client_for_service(service)
 
     def _check_delete_reservation_for_transfer(self, transfer_action):
         action_id = transfer_action.base_id
@@ -310,8 +349,11 @@ class ConductorServerEndpoint(object):
                                marker, limit, instance_name_pattern):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_INSTANCES]})
         return worker_rpc.get_endpoint_instances(
             ctxt, endpoint.type, endpoint.connection_info,
             source_environment, marker, limit, instance_name_pattern)
@@ -320,8 +362,12 @@ class ConductorServerEndpoint(object):
             self, ctxt, endpoint_id, source_environment, instance_name):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_INSTANCES]})
+
         return worker_rpc.get_endpoint_instance(
             ctxt, endpoint.type, endpoint.connection_info,
             source_environment, instance_name)
@@ -330,8 +376,13 @@ class ConductorServerEndpoint(object):
             self, ctxt, endpoint_id, env, option_names):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [
+                    constants.PROVIDER_TYPE_SOURCE_ENDPOINT_OPTIONS]})
+
         return worker_rpc.get_endpoint_source_options(
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
 
@@ -339,57 +390,86 @@ class ConductorServerEndpoint(object):
             self, ctxt, endpoint_id, env, option_names):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [
+                    constants.PROVIDER_TYPE_DESTINATION_ENDPOINT_OPTIONS]})
         return worker_rpc.get_endpoint_destination_options(
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
 
     def get_endpoint_networks(self, ctxt, endpoint_id, env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_NETWORKS]})
+
         return worker_rpc.get_endpoint_networks(
             ctxt, endpoint.type, endpoint.connection_info, env)
 
     def get_endpoint_storage(self, ctxt, endpoint_id, env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT_STORAGE]})
+
         return worker_rpc.get_endpoint_storage(
             ctxt, endpoint.type, endpoint.connection_info, env)
 
     def validate_endpoint_connection(self, ctxt, endpoint_id):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
+
         return worker_rpc.validate_endpoint_connection(
             ctxt, endpoint.type, endpoint.connection_info)
 
     def validate_endpoint_target_environment(
             self, ctxt, endpoint_id, target_env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
+
         return worker_rpc.validate_endpoint_target_environment(
             ctxt, endpoint.type, target_env)
 
     def validate_endpoint_source_environment(
             self, ctxt, endpoint_id, source_env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
-        worker_rpc = self._get_rpc_for_worker_service(
-            self._get_any_worker_service(ctxt))
+
+        worker_rpc = self._get_worker_service_rpc_for_specs(
+            ctxt, enabled=True,
+            region_ids=[m['region_id'] for m in endpoint.mapped_regions],
+            provider_requirements={
+                endpoint.type: [constants.PROVIDER_TYPE_ENDPOINT]})
+
         return worker_rpc.validate_endpoint_source_environment(
             ctxt, endpoint.type, source_env)
 
     def get_available_providers(self, ctxt):
-        worker_rpc = self._get_rpc_for_worker_service(
+        # TODO(aznashwan): merge list of all providers from all
+        # worker services:
+        worker_rpc = self._get_rpc_client_for_service(
             self._get_any_worker_service(ctxt))
         return worker_rpc.get_available_providers(ctxt)
 
     def get_provider_schemas(self, ctxt, platform_name, provider_type):
-        worker_rpc = self._get_rpc_for_worker_service(
+        # TODO(aznashwan): merge or version/namespace schemas for each worker?
+        worker_rpc = self._get_rpc_client_for_service(
             self._get_any_worker_service(ctxt))
         return worker_rpc.get_provider_schemas(
             ctxt, platform_name, provider_type)
@@ -448,6 +528,71 @@ class ConductorServerEndpoint(object):
             "target_environment": action.destination_environment
         }
 
+    def _get_worker_service_rpc_for_task(
+            self, ctxt, task, origin_endpoint, destination_endpoint):
+        LOG.debug(
+            "Requesting Worker Service for task with ID '%s' (type) '%s' "
+            "from endpoints '%s' to '%s'", task.id, task.task_type,
+            origin_endpoint.id, destination_endpoint.id)
+        task_cls = tasks_factory.get_task_runner_class(task.task_type)
+
+        # determine required Coriolis regions based on the endpoints:
+        required_regions = []
+        required_platform = task_cls.get_required_platform()
+        origin_endpoint_region_ids = [
+            m.region_id for m in origin_endpoint.mapped_regions]
+        destination_endpoint_region_ids = [
+            m.region_id for m in origin_endpoint.mapped_regions]
+
+        if required_platform == constants.TASK_PLATFORM_SOURCE:
+            required_regions = origin_endpoint_region_ids
+        if required_platform == constants.TASK_PLATFORM_DESTINATION:
+            required_regions = destination_endpoint_region_ids
+        if required_platform == constants.TASK_PLATFORM_BILATERAL:
+            # NOTE: backwards-compatibility for endpoints with
+            # no associated regions:
+            if not origin_endpoint_region_ids and (
+                    not destination_endpoint_region_ids):
+                required_regions = []
+            else:
+                required_regions = list(
+                    set(origin_endpoint_region_ids).intersection(
+                        set(destination_endpoint_region_ids)))
+
+        # determine provider requirements:
+        provider_requirements = {}
+        required_provider_types = task_cls.get_required_provider_types()
+        if constants.PROVIDER_PLATFORM_SOURCE in required_provider_types:
+            provider_requirements[origin_endpoint.type] = (
+                required_provider_types[
+                    constants.PROVIDER_PLATFORM_SOURCE])
+        if constants.PROVIDER_PLATFORM_DESTINATION in required_provider_types:
+            provider_requirements[destination_endpoint.type] = (
+                required_provider_types[
+                    constants.PROVIDER_PLATFORM_DESTINATION])
+
+        worker_rpc = None
+        try:
+            worker_rpc = self._get_worker_service_rpc_for_specs(
+                ctxt, provider_requirements=provider_requirements,
+                region_ids=required_regions, enabled=True)
+        except Exception as ex:
+            LOG.warn(
+                "Failed to schedule task with ID '%s'. Marking as such.")
+            message = (
+                "Failed to schedule task. This may indicate that there are no "
+                "Coriolis Worker services able to perform the task on the "
+                "platforms and in the Coriolis Regions required by the selected"
+                " source/destination Coriolis Endpoints. Please review the "
+                "scheduler logs for more exact details. "
+                "Error message was: %s" % str(ex))
+            db_api.set_task_status(
+                ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
+                exception_details=message)
+            raise
+
+        return worker_rpc
+
     def _begin_tasks(self, ctxt, execution, task_info={}):
         """ Starts all non-error-only tasks which have no depencies. """
         if not ctxt.trust_id:
@@ -456,6 +601,10 @@ class ConductorServerEndpoint(object):
 
         origin = self._get_task_origin(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
+        origin_endpoint = db_api.get_endpoint(
+            ctxt, execution.action.origin_endpoint_id)
+        destination_endpoint = db_api.get_endpoint(
+            ctxt, execution.action.destination_endpoint_id)
 
         newly_started_tasks = []
         for task in execution.tasks:
@@ -466,16 +615,26 @@ class ConductorServerEndpoint(object):
                     task.id, execution.id)
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_PENDING)
-                worker_rpc = self._get_rpc_for_worker_service(
-                        self._get_any_worker_service(ctxt))
-                worker_rpc.begin_task(
-                    ctxt, server=None,
-                    task_id=task.id,
-                    task_type=task.task_type,
-                    origin=origin,
-                    destination=destination,
-                    instance=task.instance,
-                    task_info=task_info.get(task.instance, {}))
+                try:
+                    worker_rpc = self._get_worker_service_rpc_for_task(
+                        ctxt, task, origin_endpoint, destination_endpoint)
+                    worker_rpc.begin_task(
+                        ctxt, server=None,
+                        task_id=task.id,
+                        task_type=task.task_type,
+                        origin=origin,
+                        destination=destination,
+                        instance=task.instance,
+                        task_info=task_info.get(task.instance, {}))
+                except Exception as ex:
+                    msg = (
+                        "Error occured while starting new task '%s'. "
+                        "Cancelling execution '%s'. Error was: %s" % (
+                            task.id, execution.id,
+                            utils.get_exception_details()))
+                    self._cancel_tasks_execution(
+                        ctxt, execution, requery=True)
+                    raise exception.CoriolisException(msg) from ex
                 newly_started_tasks.append(task.id)
 
         # NOTE: this should never happen if _check_execution_tasks_sanity
@@ -499,9 +658,9 @@ class ConductorServerEndpoint(object):
             for instance in all_instances_in_tasks}
 
         def _check_task_cls_param_requirements(task, instance_task_info_keys):
-            task_cls = tasks_factory.get_task_runner_class(task.task_type)()
+            task_cls = tasks_factory.get_task_runner_class(task.task_type)
             missing_params = [
-                p for p in task_cls.required_task_info_properties
+                p for p in task_cls.get_required_task_info_properties()
                 if p not in instance_task_info_keys]
             if missing_params:
                 raise exception.CoriolisException(
@@ -510,7 +669,7 @@ class ConductorServerEndpoint(object):
                     "type '%s': %s" % (
                         task.instance, task.id, task.task_type,
                         missing_params))
-            return task_cls.returned_task_info_properties
+            return task_cls.get_returned_task_info_properties()
 
         for instance, instance_tasks in instances_tasks_mapping.items():
             task_info_keys = set(initial_task_info.get(
@@ -1418,7 +1577,7 @@ class ConductorServerEndpoint(object):
                         task.status, task.id, execution.id)
                     db_api.set_task_status(
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
-                    worker_rpc = self._get_rpc_for_worker_service(
+                    worker_rpc = self._get_rpc_client_for_service(
                         self._get_any_worker_service(ctxt))
                     # TODO(aznashwan): cancel on right worker:
                     worker_rpc.cancel_task(
@@ -1670,6 +1829,10 @@ class ConductorServerEndpoint(object):
         origin = self._get_task_origin(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
         action = db_api.get_action(ctxt, execution.action_id)
+        origin_endpoint = db_api.get_endpoint(
+            ctxt, execution.action.origin_endpoint_id)
+        destination_endpoint = db_api.get_endpoint(
+            ctxt, execution.action.destination_endpoint_id)
 
         started_tasks = []
 
@@ -1687,16 +1850,27 @@ class ConductorServerEndpoint(object):
                 task_info = action.info[task.instance]
             db_api.set_task_status(
                 ctxt, task.id, constants.TASK_STATUS_PENDING)
-            worker_rpc = self._get_rpc_for_worker_service(
-                self._get_any_worker_service(ctxt))
-            worker_rpc.begin_task(
-                ctxt, server=None,
-                task_id=task.id,
-                task_type=task.task_type,
-                origin=origin,
-                destination=destination,
-                instance=task.instance,
-                task_info=task_info)
+            try:
+                worker_rpc = self._get_worker_service_rpc_for_task(
+                    ctxt, task, origin_endpoint, destination_endpoint)
+                worker_rpc.begin_task(
+                    ctxt, server=None,
+                    task_id=task.id,
+                    task_type=task.task_type,
+                    origin=origin,
+                    destination=destination,
+                    instance=task.instance,
+                    task_info=task_info)
+            except Exception as ex:
+                msg = (
+                    "Error occured while starting new task '%s'. "
+                    "Cancelling execution '%s'. Error was: %s" % (
+                        task.id, execution.id,
+                        utils.get_exception_details()))
+                self._cancel_tasks_execution(
+                    ctxt, execution, requery=True)
+                raise exception.CoriolisException(msg) from ex
+
             started_tasks.append(task.id)
 
         # aggregate all tasks and statuses:
@@ -2523,11 +2697,8 @@ class ConductorServerEndpoint(object):
         service.binary = binary
         service.enabled = enabled
         service.topic = topic
-        if not service.topic:
-            service.topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
-                "host": service.host, "binary": service.binary}
 
-        worker_rpc = self._get_rpc_for_worker_service(service)
+        worker_rpc = self._get_rpc_client_for_service(service)
         status = worker_rpc.get_service_status(ctxt)
 
         service.providers = status["providers"]

+ 16 - 5
coriolis/constants.py

@@ -38,6 +38,7 @@ TASK_STATUS_CANCELLING_AFTER_COMPLETION = "CANCELLING_AFTER_COMPLETION"
 TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
 TASK_STATUS_CANCELED_FROM_DEADLOCK = "STRANDED_AFTER_DEADLOCK"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
+TASK_STATUS_FAILED_TO_SCHEDULE = "FAILED_TO_SCHEDULE"
 
 ACTIVE_TASK_STATUSES = [
     TASK_STATUS_PENDING,
@@ -52,7 +53,8 @@ CANCELED_TASK_STATUSES = [
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_CANCELED_AFTER_COMPLETION,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
-    TASK_STATUS_CANCELED_FROM_DEADLOCK
+    TASK_STATUS_CANCELED_FROM_DEADLOCK,
+    TASK_STATUS_FAILED_TO_SCHEDULE
 ]
 
 FINALIZED_TASK_STATUSES = [
@@ -63,7 +65,8 @@ FINALIZED_TASK_STATUSES = [
     TASK_STATUS_FORCE_CANCELED,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FROM_DEADLOCK,
-    TASK_STATUS_CANCELED_AFTER_COMPLETION
+    TASK_STATUS_CANCELED_AFTER_COMPLETION,
+    TASK_STATUS_FAILED_TO_SCHEDULE
 ]
 
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
@@ -124,8 +127,12 @@ TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = (
 TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
 TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 
-TASK_PLATFORM_SOURCE = "SOURCE_PLATFORM"
-TASK_PLATFORM_DESTINATION = "DESTINATION_PLATFORM"
+TASK_PLATFORM_SOURCE = "source"
+TASK_PLATFORM_DESTINATION = "destination"
+TASK_PLATFORM_BILATERAL = "bilateral"
+
+PROVIDER_PLATFORM_SOURCE = "source"
+PROVIDER_PLATFORM_DESTINATION = "destination"
 
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2
@@ -217,4 +224,8 @@ SERVICE_STATUS_UP = "UP"
 SERVICE_STATUS_DOWN = "DOWN"
 SERVICE_STATUS_UNKNOWN = "UNKNOWN"
 
-SERVICE_MESSAGING_TOPIC_FORMAT = "%(host)s-%(binary)s"
+SERVICE_MESSAGING_TOPIC_FORMAT = "%(main_topic)s.%(host)s"
+CONDUCTOR_MAIN_MESSAGING_TOPIC = "coriolis_conductor"
+WORKER_MAIN_MESSAGING_TOPIC = "coriolis_worker"
+SCHEDULER_MAIN_MESSAGING_TOPIC = "coriolis_scheduler"
+REPLICA_CRON_MAIN_MESSAGING_TOPIC = "coriolis_replica_cron_worker"

+ 1 - 1
coriolis/db/api.py

@@ -202,7 +202,7 @@ def update_endpoint(context, endpoint_id, updated_values):
 
         # get all existing mappings:
         existing_region_mappings = [
-            mapping.id
+            mapping.region_id
             for mapping in get_region_mappings_for_endpoint(
                 context, endpoint_id)]
 

+ 20 - 0
coriolis/exception.py

@@ -388,3 +388,23 @@ class UnrecognizedWorkerInitSystem(CoriolisException):
         "Could not determine init system for temporary worker VM. The image "
         "used for the worker VM must use systemd as an init system for "
         "Coriolis to be able to use it for data Replication.")
+
+
+class NoServiceError(CoriolisException):
+    safe = True
+    code = 503
+    message = _(
+        "No service is avaialable to process this request at this time.")
+
+
+class NoWorkerServiceError(NoServiceError):
+    message = _(
+        "No Coriolis Worker Service(s) were found. Please ensure that "
+        "at least one or Coriolis Worker Service(s) are registered "
+        "within the Coriolis installation.")
+
+
+class NoSuitableWorkerServiceError(NoServiceError):
+    message = _(
+        "No suitable Coriolis Worker service was found which fits the "
+        "criteria for the required operation.")

+ 1 - 1
coriolis/providers/factory.py

@@ -61,7 +61,7 @@ def get_available_providers():
             provider_data = providers.get(cls.platform, {})
 
             provider_types = provider_data.get("types", [])
-            if (provider_class in cls.__bases__ and
+            if (provider_class in cls.__mro__ and
                     provider_type not in provider_types):
                 provider_types.append(provider_type)
 

+ 3 - 2
coriolis/replica_cron/rpc/client.py

@@ -3,15 +3,16 @@
 
 import oslo_messaging as messaging
 
+from coriolis import constants
 from coriolis import rpc
 
 VERSION = "1.0"
 
 
 class ReplicaCronClient(object):
-    def __init__(self):
+    def __init__(self, topic=constants.REPLICA_CRON_MAIN_MESSAGING_TOPIC):
         target = messaging.Target(
-            topic='coriolis_replica_cron_worker', version=VERSION)
+            topic=topic, version=VERSION)
         self._client = rpc.get_client(target)
 
     def register(self, ctxt, schedule):

+ 0 - 0
coriolis/scheduler/__init__.py


+ 0 - 0
coriolis/scheduler/filters/__init__.py


+ 22 - 0
coriolis/scheduler/filters/base.py

@@ -0,0 +1,22 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import abc
+
+from six import with_metaclass
+
+
+class BaseServiceFilter(with_metaclass(abc.ABCMeta)):
+
+    def is_service_acceptable(self, service):
+        return self.rate_service(service) > 0
+
+    def filter_services(self, services):
+        return [
+            service for service in services
+            if self.is_service_acceptable(service)]
+
+    @abc.abstractmethod
+    def rate_service(self, service):
+        """ Returns a rating out of 100 for the service. """
+        pass

+ 102 - 0
coriolis/scheduler/filters/trivial_filters.py

@@ -0,0 +1,102 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+
+from coriolis import constants
+from coriolis.scheduler.filters import base
+
+
+LOG = logging.getLogger(__name__)
+
+
+class RegionsFilter(base.BaseServiceFilter):
+
+    def __init__(self, regions):
+        self._regions = regions
+
+    def __repr__(self):
+        return "<%s(regions=%s)>" % (
+            self.__class__.__name__, self._regions)
+
+    def rate_service(self, service):
+        service_regions = [
+            mapping["region_id"] for mapping in service.mapped_regions]
+        missing_regions = [
+            region
+            for region in self._regions
+            if region not in service_regions]
+
+        if missing_regions:
+            LOG.debug(
+                "The following required regions are missing from service "
+                "with ID '%s': %s", service.id, missing_regions)
+            return 0
+
+        return 100
+
+
+class TopicFilter(base.BaseServiceFilter):
+
+    def __init__(self, topic):
+        self._topic = topic
+
+    def __repr__(self):
+        return "<%s(topic=%s)>" % (
+            self.__class__.__name__, self._topic)
+
+    def rate_service(self, service):
+        if service.topic == self._topic:
+            return 100
+        return 0
+
+
+class EnabledFilter(base.BaseServiceFilter):
+
+    def __init__(self, enabled=True):
+        self._enabled = enabled
+
+    def __repr__(self):
+        return "<%s(enabled=%s)>" % (
+            self.__class__.__name__, self._enabled)
+
+    def rate_service(self, service):
+        if service.enabled == self._enabled:
+            return 100
+        return 0
+
+
+class ProviderTypesFilter(base.BaseServiceFilter):
+
+    def __init__(self, provider_requirements):
+        """ Filters based on requested provider capabilities.
+        :param provider_requirements: dict of the form {
+            "<platform_type>": [constants.PROVIDER_TYPE_*, ...]}
+        """
+        self._provider_requirements = provider_requirements
+
+    def __repr__(self):
+        return "<%s(provider_requirements=%s)>" % (
+            self.__class__.__name__, self._provider_requirements)
+
+    def rate_service(self, service):
+        for platform_type in self._provider_requirements:
+            if platform_type not in service.providers:
+                LOG.debug(
+                    "Service with ID '%s' does not have a provider for platform "
+                    "type '%s'", service.id, platform_type)
+                return 0
+
+            available_types = service.providers[
+                platform_type].get('types', [])
+            missing_types = [
+                typ for typ in self._provider_requirements[platform_type]
+                if typ not in available_types]
+            if missing_types:
+                LOG.debug(
+                    "Service with ID '%s' is missing the following required "
+                    "provider types for platform '%s': %s",
+                    service.id, platform_type, missing_types)
+                return 0
+
+        return 100

+ 0 - 0
coriolis/scheduler/rpc/__init__.py


+ 55 - 0
coriolis/scheduler/rpc/client.py

@@ -0,0 +1,55 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_config import cfg
+import oslo_messaging as messaging
+
+from coriolis import rpc
+
+VERSION = "1.0"
+
+scheduler_opts = [
+    cfg.IntOpt("scheduler_rpc_timeout",
+               help="Number of seconds until RPC calls to the "
+                    "scheduler timeout.")
+]
+
+CONF = cfg.CONF
+CONF.register_opts(scheduler_opts, 'scheduler')
+
+
+class SchedulerClient(object):
+    def __init__(self, timeout=None):
+        target = messaging.Target(topic='coriolis_scheduler', version=VERSION)
+        if timeout is None:
+            timeout = CONF.scheduler.scheduler_rpc_timeout
+        self._client = rpc.get_client(target, timeout=timeout)
+
+    def get_diagnostics(self, ctxt):
+        return self._client.call(ctxt, 'get_diagnostics')
+
+    def get_workers_for_specs(
+            self, ctxt, provider_requirements=None,
+            region_ids=None, enabled=None):
+        return self._client.call(
+            ctxt, 'get_workers_for_specs', region_ids=region_ids,
+            enabled=enabled, provider_requirements=provider_requirements)
+
+    '''
+    def get_workers_for_action(
+            self, ctxt, endpoint_type, provider_type, region_ids=None):
+        return self._client.call(
+            ctxt, 'get_workers_for_action', endpoint_type=endpoint_type,
+            provider_type=provider_type, region_ids=region_ids)
+
+    def get_workers_for_task(
+            self, ctxt, task_type, source_endpoint_type,
+            destination_endpoint_type, source_region_ids=None,
+            destination_region_ids=None):
+        return self._client.call(
+            ctxt, 'get_workers_for_task', task_type=task_type,
+            source_endpoint_type=source_endpoint_type,
+            destination_endpoint_type=destination_endpoint_type,
+            source_region_ids=source_region_ids,
+            destination_region_ids=destination_region_ids)
+    '''

+ 159 - 0
coriolis/scheduler/rpc/server.py

@@ -0,0 +1,159 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import copy
+import functools
+import random
+import uuid
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from coriolis import constants
+from coriolis import exception
+from coriolis import utils
+from coriolis.conductor.rpc import client as rpc_conductor_client
+from coriolis.scheduler.filters import trivial_filters
+from coriolis.db import api as db_api
+
+
+VERSION = "1.0"
+
+LOG = logging.getLogger(__name__)
+
+
+SCHEDULER_OPTS = []
+
+CONF = cfg.CONF
+CONF.register_opts(SCHEDULER_OPTS, 'scheduler')
+
+
+class SchedulerServerEndpoint(object):
+    def __init__(self):
+        self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
+
+    def get_diagnostics(self, ctxt):
+        return utils.get_diagnostics_info()
+
+    def _get_all_worker_services(self, ctxt):
+        services = db_api.get_services(ctxt)
+        services = trivial_filters.TopicFilter(
+            constants.WORKER_MAIN_MESSAGING_TOPIC).filter_services(
+                services)
+        if not services:
+            raise exception.NoWorkerServiceError()
+
+        return services
+
+    def _get_weighted_filtered_services(
+            self, services, filters, minimum_per_filter_rating=1):
+        """ Returns list of services and their scores for the given filters.
+        Services which are rejected by any filter will be excluded.
+        """
+        if not filters:
+            LOG.warn(
+                "No filters provided. Presuming all services acceptable.")
+            return [(service, 100) for service in services]
+
+        scores = []
+
+
+        service_ids = [service.id for service in services]
+        LOG.debug(
+            "Running following filters on worker services '%s': %s",
+            service_ids, filters)
+        for service in services:
+            total_score = 0
+
+            acceptable = True
+            flt = None
+            for flt in filters:
+                rating = flt.rate_service(service)
+                if rating < minimum_per_filter_rating:
+                    acceptable = False
+                    break
+                total_score = total_score + rating
+            if not acceptable:
+                LOG.debug(
+                    "Service with ID '%s' was rejected by filter %r",
+                    service.id, flt)
+                continue
+
+            scores.append((service, total_score))
+
+        if not scores:
+            message = (
+                "None of the inspected Coriolis Worker services (IDs %s) "
+                "matched the requested filtering criteria (minimum score %d) "
+                "for the following required filters: %s" % (
+                    [s.id for s in services],
+                    minimum_per_filter_rating, filters))
+            raise exception.NoSuitableWorkerServiceError(message)
+
+        LOG.debug(
+            "Determined following scores for services based on filters '%s': "
+            "%s", filters, scores)
+
+        return sorted(
+            scores, key=lambda s: s[1], reverse=True)
+
+    def get_workers_for_specs(
+            self, ctxt, provider_requirements=None,
+            region_ids=None, enabled=None):
+        """ Returns a list of enabled Worker Services with the specified
+        parameters.
+        :param provider_requirements: dict of the form {
+            "<platform_type>": [constants.PROVIDER_TYPE_*, ...]}
+        """
+        filters = []
+        worker_services = self._get_all_worker_services(ctxt)
+
+        LOG.debug(
+            "Searching for Worker Services with specs: %s" % {
+                "provider_requirements": provider_requirements,
+                "region_ids": region_ids, "enabled": enabled})
+
+        if enabled is not None:
+            filters.append(trivial_filters.EnabledFilter(enabled=enabled))
+        if region_ids:
+            filters.append(trivial_filters.RegionsFilter(region_ids))
+        if provider_requirements:
+            filters.append(
+                trivial_filters.ProviderTypesFilter(provider_requirements))
+
+        filtered_services = self._get_weighted_filtered_services(
+            worker_services, filters)
+        LOG.info(
+            "Found Worker Services %s for specs: %s" % (
+                filtered_services, {
+                    "provider_requirements": provider_requirements,
+                    "region_ids": region_ids, "enabled": enabled}))
+
+        return [s[0] for s in filtered_services]
+
+    '''
+    def get_workers_for_action(
+            self, ctxt, endpoint_type, provider_type, region_ids=None):
+        """ Returns a list of worker services which would be able to
+        perform the required provider-related action.
+
+        :param endpoint_type: the type of the platform of the endpoint
+        :param provider_type: the type of the required plugin as defined
+                              in constants.PROVIDER_TYPE_*
+        :param region_ids: list of region IDs to carry out the operation in.
+                           Will be carried out ar random if not provided.
+        """
+        # TODO
+        return self._get_all_worker_services(ctxt)
+
+    def get_workers_for_task(
+            self, ctxt, task_type, source_endpoint_type,
+            destination_endpoint_type,
+            source_region_ids=None,
+            destination_region_ids=None):
+        """ Returns a list of worker services which would be
+        able and willing to accomplish the given task.
+        """
+        # TODO
+        return self._get_all_worker_services(ctxt)
+    '''

+ 2 - 3
coriolis/services/api.py

@@ -11,10 +11,9 @@ class API(object):
 
     def create(
             self, ctxt, host, binary, topic, mapped_regions,
-            enabled, providers, specs):
+            enabled):
         return self._rpc_client.register_service(
-            ctxt, host, binary, topic, enabled,
-            providers, specs, mapped_regions)
+            ctxt, host, binary, topic, enabled, mapped_regions)
 
     def update(self, ctxt, service_id, updated_values):
         return self._rpc_client.update_service(

+ 23 - 10
coriolis/tasks/base.py

@@ -52,27 +52,40 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
 
         return required_libs
 
-    @property
-    @abc.abstractmethod
-    def required_task_info_properties(self):
+    @abc.abstractclassmethod
+    def get_required_task_info_properties(cls):
         """ Returns a list of the string fields which are required
         to be present during the tasks' run method. """
         pass
 
-    @property
-    @abc.abstractmethod
-    def returned_task_info_properties(self):
+    @abc.abstractclassmethod
+    def get_returned_task_info_properties(cls):
         """ Returns a list of the string fields which are returned by the
         tasks' run method to be added to the task info.
         """
         pass
 
+    @abc.abstractclassmethod
+    def get_required_provider_types(cls):
+        """ Returns a dict with 'source/destination' as keys containing a list
+        of all the provider types (constants.PROVIDER_TYPE_*) required for the
+        task.
+        """
+        pass
+
+    @abc.abstractclassmethod
+    def get_required_platform(cls):
+        """ Returns whether the task operates on the source platform, the
+        destination, or both. (constants.TASK_PLATFORM_*)
+        """
+        pass
+
     @abc.abstractmethod
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         """ The actual logic run by the task.
         Should return a dict with all the fields declared by
-        'self.returned_task_info_properties'.
+        'self.get_returned_task_info_properties'.
         Must be implemented in all child classes.
         """
         pass
@@ -84,7 +97,7 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
         NOTE: This should NOT modify the existing task_info in any way.
         """
         missing_info_props = [
-            prop for prop in self.required_task_info_properties
+            prop for prop in self.get_required_task_info_properties()
             if prop not in task_info]
         if missing_info_props:
             raise exception.CoriolisException(
@@ -102,7 +115,7 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
                     self.__class__, type(result), result))
 
         missing_returns = [
-            prop for prop in self.returned_task_info_properties
+            prop for prop in self.get_returned_task_info_properties()
             if prop not in result.keys()]
         if missing_returns:
             raise exception.CoriolisException(
@@ -114,7 +127,7 @@ class TaskRunner(with_metaclass(abc.ABCMeta)):
 
         undeclared_returns = [
             prop for prop in result.keys()
-            if prop not in self.returned_task_info_properties]
+            if prop not in self.get_returned_task_info_properties()]
         if undeclared_returns:
             raise exception.CoriolisException(
                 "Task type '%s' returned the following undeclared "

+ 79 - 124
coriolis/tasks/factory.py

@@ -8,131 +8,86 @@ from coriolis.tasks import osmorphing_tasks
 from coriolis.tasks import replica_tasks
 
 _TASKS_MAP = {
-    constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": migration_tasks.DeployMigrationSourceResourcesTask},
-    constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.DeployMigrationTargetResourcesTask},
-    constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": migration_tasks.DeleteMigrationSourceResourcesTask},
-    constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.DeleteMigrationTargetResourcesTask},
-    constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.DeployInstanceResourcesTask},
-    constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.FinalizeInstanceDeploymentTask},
-    constants.TASK_TYPE_CREATE_INSTANCE_DISKS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.CreateInstanceDisksTask},
-    constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.CleanupFailedInstanceDeploymentTask},
-    constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.CleanupInstanceTargetStorageTask},
-    constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": migration_tasks.CleanupInstanceSourceStorageTask},
-    constants.TASK_TYPE_GET_OPTIMAL_FLAVOR: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.GetOptimalFlavorTask},
-    constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": migration_tasks.ValidateMigrationSourceInputsTask},
-    constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": migration_tasks.ValidateMigrationDestinationInputsTask},
-    constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": osmorphing_tasks.DeployOSMorphingResourcesTask},
-    constants.TASK_TYPE_OS_MORPHING: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": osmorphing_tasks.OSMorphingTask},
-    constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": osmorphing_tasks.DeleteOSMorphingResourcesTask},
-    constants.TASK_TYPE_GET_INSTANCE_INFO: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.GetInstanceInfoTask},
-    constants.TASK_TYPE_REPLICATE_DISKS: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.ReplicateDisksTask},
-    constants.TASK_TYPE_SHUTDOWN_INSTANCE: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.ShutdownInstanceTask},
-    constants.TASK_TYPE_DEPLOY_REPLICA_DISKS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.DeployReplicaDisksTask},
-    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.DeleteReplicaSourceDiskSnapshotsTask},
-    constants.TASK_TYPE_DELETE_REPLICA_DISKS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.DeleteReplicaDisksTask},
-    constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.DeployReplicaTargetResourcesTask},
-    constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.DeleteReplicaTargetResourcesTask},
-    constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.DeployReplicaSourceResourcesTask},
-    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.DeleteReplicaSourceResourcesTask},
-    constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.DeployReplicaInstanceResourcesTask},
-    constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.FinalizeReplicaInstanceDeploymentTask},
-    constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.CleanupFailedReplicaInstanceDeploymentTask},
-    constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.CreateReplicaDiskSnapshotsTask},
-    constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.DeleteReplicaTargetDiskSnapshotsTask},
-    constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.RestoreReplicaDiskSnapshotsTask},
-    constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.ValidateReplicaExecutionSourceInputsTask},
-    constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.ValidateReplicaExecutionDestinationInputsTask},
-    constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.ValidateReplicaDeploymentParametersTask},
-    constants.TASK_TYPE_UPDATE_SOURCE_REPLICA: {
-        "task_platform": constants.TASK_PLATFORM_SOURCE,
-        "task_runner_class": replica_tasks.UpdateSourceReplicaTask},
-    constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA: {
-        "task_platform": constants.TASK_PLATFORM_DESTINATION,
-        "task_runner_class": replica_tasks.UpdateDestinationReplicaTask}
+    constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES:
+        migration_tasks.DeployMigrationSourceResourcesTask,
+    constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES:
+        migration_tasks.DeployMigrationTargetResourcesTask,
+    constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES:
+        migration_tasks.DeleteMigrationSourceResourcesTask,
+    constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES:
+        migration_tasks.DeleteMigrationTargetResourcesTask,
+    constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES:
+        migration_tasks.DeployInstanceResourcesTask,
+    constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT:
+        migration_tasks.FinalizeInstanceDeploymentTask,
+    constants.TASK_TYPE_CREATE_INSTANCE_DISKS:
+        migration_tasks.CreateInstanceDisksTask,
+    constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT:
+        migration_tasks.CleanupFailedInstanceDeploymentTask,
+    constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE:
+        migration_tasks.CleanupInstanceTargetStorageTask,
+    constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE:
+        migration_tasks.CleanupInstanceSourceStorageTask,
+    constants.TASK_TYPE_GET_OPTIMAL_FLAVOR:
+        migration_tasks.GetOptimalFlavorTask,
+    constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS:
+        migration_tasks.ValidateMigrationSourceInputsTask,
+    constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS:
+        migration_tasks.ValidateMigrationDestinationInputsTask,
+    constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES:
+        osmorphing_tasks.DeployOSMorphingResourcesTask,
+    constants.TASK_TYPE_OS_MORPHING:
+        osmorphing_tasks.OSMorphingTask,
+    constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES:
+        osmorphing_tasks.DeleteOSMorphingResourcesTask,
+    constants.TASK_TYPE_GET_INSTANCE_INFO:
+        replica_tasks.GetInstanceInfoTask,
+    constants.TASK_TYPE_REPLICATE_DISKS:
+        replica_tasks.ReplicateDisksTask,
+    constants.TASK_TYPE_SHUTDOWN_INSTANCE:
+        replica_tasks.ShutdownInstanceTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_DISKS:
+        replica_tasks.DeployReplicaDisksTask,
+    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS:
+        replica_tasks.DeleteReplicaSourceDiskSnapshotsTask,
+    constants.TASK_TYPE_DELETE_REPLICA_DISKS:
+        replica_tasks.DeleteReplicaDisksTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES:
+        replica_tasks.DeployReplicaTargetResourcesTask,
+    constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES:
+        replica_tasks.DeleteReplicaTargetResourcesTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES:
+        replica_tasks.DeployReplicaSourceResourcesTask,
+    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES:
+        replica_tasks.DeleteReplicaSourceResourcesTask,
+    constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES:
+        replica_tasks.DeployReplicaInstanceResourcesTask,
+    constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT:
+        replica_tasks.FinalizeReplicaInstanceDeploymentTask,
+    constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT:
+        replica_tasks.CleanupFailedReplicaInstanceDeploymentTask,
+    constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS:
+        replica_tasks.CreateReplicaDiskSnapshotsTask,
+    constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS:
+        replica_tasks.DeleteReplicaTargetDiskSnapshotsTask,
+    constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
+        replica_tasks.RestoreReplicaDiskSnapshotsTask,
+    constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS:
+        replica_tasks.ValidateReplicaExecutionSourceInputsTask,
+    constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS:
+        replica_tasks.ValidateReplicaExecutionDestinationInputsTask,
+    constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS:
+        replica_tasks.ValidateReplicaDeploymentParametersTask,
+    constants.TASK_TYPE_UPDATE_SOURCE_REPLICA:
+        replica_tasks.UpdateSourceReplicaTask,
+    constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
+        replica_tasks.UpdateDestinationReplicaTask
 }
 
 
-def _get_task_class_info(task_type):
-    match = _TASKS_MAP.get(task_type)
-    if not match:
-        raise exception.NotFound(
-            "TaskRunner info not found for task type: %s" % task_type)
-    return match
-
-
 def get_task_runner_class(task_type):
-    return _get_task_class_info(task_type)["task_runner_class"]
-
-
-def get_task_platform(task_type):
-    return _get_task_class_info(task_type)["task_platform"]
+    cls = _TASKS_MAP.get(task_type)
+    if not cls:
+        raise exception.NotFound(
+            "TaskRunner not found for task type: %s" % task_type)
+    return cls

+ 15 - 4
coriolis/tasks/migration_tasks.py

@@ -14,14 +14,25 @@ LOG = logging.getLogger(__name__)
 
 class GetOptimalFlavorTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["export_info", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["instance_deployment_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_INSTANCE_FLAVOR]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(

+ 47 - 12
coriolis/tasks/osmorphing_tasks.py

@@ -16,16 +16,29 @@ LOG = logging.getLogger(__name__)
 
 class OSMorphingTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return [
             "osmorphing_info", "osmorphing_connection_info",
             "user_scripts"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return []
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_REPLICA_EXPORT],
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT],
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
 
@@ -64,16 +77,27 @@ class OSMorphingTask(base.TaskRunner):
 
 class DeployOSMorphingResourcesTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_environment", "instance_deployment_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return [
             "os_morphing_resources", "osmorphing_info",
             "osmorphing_connection_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_OS_MORPHING]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -124,14 +148,25 @@ class DeployOSMorphingResourcesTask(base.TaskRunner):
 
 class DeleteOSMorphingResourcesTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_environment", "os_morphing_resources"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["os_morphing_resources", "osmorphing_connection_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_OS_MORPHING]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(

+ 317 - 84
coriolis/tasks/replica_tasks.py

@@ -66,14 +66,25 @@ def _check_ensure_volumes_info_ordering(export_info, volumes_info):
 class GetInstanceInfoTask(base.TaskRunner):
     """ Task which gathers the export info for a VM.  """
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["source_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["export_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_REPLICA_EXPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -96,14 +107,25 @@ class GetInstanceInfoTask(base.TaskRunner):
 class ShutdownInstanceTask(base.TaskRunner):
     """ Task which shuts down a VM. """
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["source_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return []
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_REPLICA_EXPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -119,18 +141,32 @@ class ShutdownInstanceTask(base.TaskRunner):
 
 class ReplicateDisksTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        # NOTE: considering Replication reads from one end (be it PMR minion
+        # or otherwise) to the disk writer minion on the destination,
+        # replicate_disks would need access to both:
+        return constants.TASK_PLATFORM_BILATERAL
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return [
             "export_info", "volumes_info", "source_environment",
             "source_resources",
             "source_resources_connection_info",
             "target_resources_connection_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_REPLICA_EXPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -181,15 +217,26 @@ class ReplicateDisksTask(base.TaskRunner):
 
 class DeployReplicaDisksTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return [
             "export_info", "volumes_info", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         target_environment = task_info['target_environment']
@@ -216,15 +263,25 @@ class DeployReplicaDisksTask(base.TaskRunner):
 
 class DeleteReplicaSourceDiskSnapshotsTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return [
             "volumes_info", "source_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_REPLICA_EXPORT]
+        }
 
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
@@ -253,15 +310,26 @@ class DeleteReplicaSourceDiskSnapshotsTask(base.TaskRunner):
 
 class DeleteReplicaDisksTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return [
             "volumes_info", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         event_manager = events.EventManager(event_handler)
@@ -295,14 +363,25 @@ class DeleteReplicaDisksTask(base.TaskRunner):
 
 class DeployReplicaSourceResourcesTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["source_environment", "export_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["source_resources", "source_resources_connection_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_REPLICA_EXPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -355,14 +434,25 @@ class DeployReplicaSourceResourcesTask(base.TaskRunner):
 
 class DeleteReplicaSourceResourcesTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["source_environment", "source_resources"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["source_resources", "source_resources_connection_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_REPLICA_EXPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -384,16 +474,27 @@ class DeleteReplicaSourceResourcesTask(base.TaskRunner):
 
 class DeployReplicaTargetResourcesTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["export_info", "volumes_info", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return [
             "volumes_info", "target_resources",
             "target_resources_connection_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         target_environment = task_info["target_environment"]
@@ -470,15 +571,26 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
 
 class DeleteReplicaTargetResourcesTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_resources", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return [
             "target_resources", "target_resources_connection_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -500,14 +612,25 @@ class DeleteReplicaTargetResourcesTask(base.TaskRunner):
 
 class DeployReplicaInstanceResourcesTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["export_info", "target_environment", "clone_disks"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["instance_deployment_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         target_environment = task_info["target_environment"]
@@ -533,14 +656,25 @@ class DeployReplicaInstanceResourcesTask(base.TaskRunner):
 
 class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_environment", "instance_deployment_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["transfer_result"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -564,14 +698,25 @@ class FinalizeReplicaInstanceDeploymentTask(base.TaskRunner):
 
 class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_environment", "instance_deployment_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["instance_deployment_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -591,14 +736,25 @@ class CleanupFailedReplicaInstanceDeploymentTask(base.TaskRunner):
 
 class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_environment", "export_info", "volumes_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -624,14 +780,25 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
 
 class DeleteReplicaTargetDiskSnapshotsTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_environment", "export_info", "volumes_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         export_info = task_info['export_info']
@@ -657,14 +824,25 @@ class DeleteReplicaTargetDiskSnapshotsTask(base.TaskRunner):
 
 class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["target_environment", "export_info", "volumes_info"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         provider = providers_factory.get_provider(
@@ -690,14 +868,25 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
 
 class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["source_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return []
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_VALIDATE_REPLICA_EXPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         event_manager = events.EventManager(event_handler)
@@ -720,14 +909,25 @@ class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
 
 class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["export_info", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return []
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_VALIDATE_REPLICA_IMPORT]
+        }
+
     def _validate_provider_replica_import_input(
             self, provider, ctxt, conn_info, target_environment, export_info):
         provider.validate_replica_import_input(
@@ -769,14 +969,25 @@ class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
 
 class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["export_info", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return []
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_VALIDATE_REPLICA_IMPORT]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         event_manager = events.EventManager(event_handler)
@@ -811,14 +1022,25 @@ class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
 
 class UpdateSourceReplicaTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["volumes_info", "source_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info", "source_environment"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_SOURCE: [
+                constants.PROVIDER_TYPE_SOURCE_REPLICA_UPDATE]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         event_manager = events.EventManager(event_handler)
@@ -868,14 +1090,25 @@ class UpdateSourceReplicaTask(base.TaskRunner):
 
 class UpdateDestinationReplicaTask(base.TaskRunner):
 
-    @property
-    def required_task_info_properties(self):
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+    @classmethod
+    def get_required_task_info_properties(cls):
         return ["export_info", "volumes_info", "target_environment"]
 
-    @property
-    def returned_task_info_properties(self):
+    @classmethod
+    def get_returned_task_info_properties(cls):
         return ["volumes_info", "target_environment"]
 
+    @classmethod
+    def get_required_provider_types(cls):
+        return {
+            constants.PROVIDER_PLATFORM_DESTINATION: [
+                constants.PROVIDER_TYPE_DESTINATION_REPLICA_UPDATE]
+        }
+
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
         event_manager = events.EventManager(event_handler)

+ 1 - 1
coriolis/utils.py

@@ -123,7 +123,7 @@ def get_diagnostics_info():
     # diagnostics.
     packages = list(freeze.freeze())
     return {
-        "binary": get_binary_name(),
+        "application": get_binary_name(),
         "packages": packages,
         "os_info": _get_host_os_info(),
         "hostname": get_hostname(),

+ 3 - 1
coriolis/worker/rpc/client.py

@@ -4,6 +4,7 @@
 from oslo_config import cfg
 import oslo_messaging as messaging
 
+from coriolis import constants
 from coriolis import rpc
 
 VERSION = "1.0"
@@ -19,7 +20,8 @@ CONF.register_opts(worker_opts, 'worker')
 
 
 class WorkerClient(object):
-    def __init__(self, timeout=None, topic='coriolis_worker'):
+    def __init__(
+            self, timeout=None, topic=constants.WORKER_MAIN_MESSAGING_TOPIC):
         target = messaging.Target(topic=topic, version=VERSION)
         if timeout is None:
             timeout = CONF.worker.worker_rpc_timeout

+ 2 - 6
coriolis/worker/rpc/server.py

@@ -77,15 +77,11 @@ class WorkerServerEndpoint(object):
         diagnostics = self.get_diagnostics(ctxt)
         status = {
             "host": diagnostics["hostname"],
-            "binary": diagnostics["binary"],
+            "binary": diagnostics["application"],
+            "topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
             "providers": self.get_available_providers(ctxt),
             "specs": diagnostics
         }
-        # TODO(aznashwan): have the topic be updated automatically in
-        # oslo_messaging.Service instead of relying on "hardcoding" it:
-        status["topic"] = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
-            "host": status["host"],
-            "binary": diagnostics["application"]}
 
         return status
 

+ 1 - 0
setup.cfg

@@ -29,6 +29,7 @@ console_scripts =
     coriolis-conductor = coriolis.cmd.conductor:main
     coriolis-worker = coriolis.cmd.worker:main
     coriolis-replica-cron = coriolis.cmd.replica_cron:main
+    coriolis-scheduler= coriolis.cmd.scheduler:main
     coriolis-dbsync = coriolis.cmd.db_sync:main
 
 [wheel]