2
0
Эх сурвалжийг харах

Add models and API for Coriolis regions.

Nashwan Azhari 5 жил өмнө
parent
commit
a923728e05

+ 12 - 5
coriolis/api/v1/endpoints.py

@@ -40,7 +40,10 @@ class EndpointController(api_wsgi.Controller):
             description = endpoint.get("description")
             endpoint_type = endpoint["type"]
             connection_info = endpoint["connection_info"]
-            return name, endpoint_type, description, connection_info
+            mapped_regions = endpoint.get("mapped_regions", [])
+            return (
+                name, endpoint_type, description, connection_info,
+                mapped_regions)
         except Exception as ex:
             LOG.exception(ex)
             if hasattr(ex, "message"):
@@ -53,15 +56,19 @@ class EndpointController(api_wsgi.Controller):
         context = req.environ["coriolis.context"]
         context.can(endpoint_policies.get_endpoints_policy_label("create"))
         (name, endpoint_type, description,
-         connection_info) = self._validate_create_body(body)
+         connection_info, mapped_regions) = self._validate_create_body(body)
         return endpoint_view.single(req, self._endpoint_api.create(
-            context, name, endpoint_type, description, connection_info))
+            context, name, endpoint_type, description, connection_info,
+            mapped_regions))
 
     def _validate_update_body(self, body):
         try:
             endpoint = body["endpoint"]
-            return {k: endpoint[k] for k in endpoint.keys() &
-                    {"name", "description", "connection_info"}}
+            return {
+                k: endpoint[k]
+                for k in endpoint.keys() & {
+                    "name", "description", "connection_info",
+                    "mapped_regions"}}
         except Exception as ex:
             LOG.exception(ex)
             if hasattr(ex, "message"):

+ 90 - 0
coriolis/api/v1/regions.py

@@ -0,0 +1,90 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+from webob import exc
+
+from coriolis import exception
+from coriolis.api.v1.views import region_view
+from coriolis.api import wsgi as api_wsgi
+from coriolis.policies import regions as region_policies
+from coriolis.regions import api
+
+LOG = logging.getLogger(__name__)
+
+
+class RegionController(api_wsgi.Controller):
+    def __init__(self):
+        self._region_api = api.API()
+        super(RegionController, self).__init__()
+
+    def show(self, req, id):
+        context = req.environ["coriolis.context"]
+        context.can(region_policies.get_regions_policy_label("show"))
+        region = self._region_api.get_region(context, id)
+        if not region:
+            raise exc.HTTPNotFound()
+
+        return region_view.single(req, region)
+
+    def index(self, req):
+        context = req.environ["coriolis.context"]
+        context.can(region_policies.get_regions_policy_label("list"))
+        return region_view.collection(
+            req, self._region_api.get_regions(context))
+
+    def _validate_create_body(self, body):
+        try:
+            region = body["region"]
+            name = region["name"]
+            description = region.get("description", "")
+            enabled = region.get("enabled", True)
+            return name, description, enabled
+        except Exception as ex:
+            LOG.exception(ex)
+            if hasattr(ex, "message"):
+                msg = ex.message
+            else:
+                msg = str(ex)
+            raise exception.InvalidInput(msg)
+
+    def create(self, req, body):
+        context = req.environ["coriolis.context"]
+        context.can(region_policies.get_regions_policy_label("create"))
+        (name, description, enabled) = self._validate_create_body(body)
+        return region_view.single(req, self._region_api.create(
+            context, region_name=name, description=description,
+            enabled=enabled))
+
+    def _validate_update_body(self, body):
+        try:
+            region = body["region"]
+            return {k: region[k] for k in region.keys() &
+                    {"name", "description", "enabled"}}
+        except Exception as ex:
+            LOG.exception(ex)
+            if hasattr(ex, "message"):
+                msg = ex.message
+            else:
+                msg = str(ex)
+            raise exception.InvalidInput(msg)
+
+    def update(self, req, id, body):
+        context = req.environ["coriolis.context"]
+        context.can(region_policies.get_regions_policy_label("update"))
+        updated_values = self._validate_update_body(body)
+        return region_view.single(req, self._region_api.update(
+            req.environ['coriolis.context'], id, updated_values))
+
+    def delete(self, req, id):
+        context = req.environ["coriolis.context"]
+        context.can(region_policies.get_regions_policy_label("delete"))
+        try:
+            self._region_api.delete(req.environ['coriolis.context'], id)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(RegionController())

+ 12 - 0
coriolis/api/v1/router.py

@@ -16,11 +16,13 @@ from coriolis.api.v1 import migration_actions
 from coriolis.api.v1 import migrations
 from coriolis.api.v1 import provider_schemas
 from coriolis.api.v1 import providers
+from coriolis.api.v1 import regions
 from coriolis.api.v1 import replica_actions
 from coriolis.api.v1 import replica_schedules
 from coriolis.api.v1 import replica_tasks_execution_actions
 from coriolis.api.v1 import replica_tasks_executions
 from coriolis.api.v1 import replicas
+from coriolis.api.v1 import services
 
 LOG = logging.getLogger(__name__)
 
@@ -43,12 +45,22 @@ class APIRouter(api.APIRouter):
         mapper.resource('provider', 'providers',
                         controller=self.resources['providers'])
 
+        self.resources['regions'] = regions.create_resource()
+        mapper.resource('region', 'regions',
+                        controller=self.resources['regions'],
+                        collection={'detail': 'GET'})
+
         self.resources['endpoints'] = endpoints.create_resource()
         mapper.resource('endpoint', 'endpoints',
                         controller=self.resources['endpoints'],
                         collection={'detail': 'GET'},
                         member={'action': 'POST'})
 
+        self.resources['services'] = services.create_resource()
+        mapper.resource('service', 'services',
+                        controller=self.resources['services'],
+                        collection={'detail': 'GET'})
+
         endpoint_actions_resource = endpoint_actions.create_resource()
         self.resources['endpoint_actions'] = endpoint_actions_resource
         endpoint_path = '/{project_id}/endpoints/{id}'

+ 95 - 0
coriolis/api/v1/services.py

@@ -0,0 +1,95 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+from webob import exc
+
+from coriolis import exception
+from coriolis.api.v1.views import service_view
+from coriolis.api import wsgi as api_wsgi
+from coriolis.policies import services as service_policies
+from coriolis.services import api
+
+LOG = logging.getLogger(__name__)
+
+
+class ServiceController(api_wsgi.Controller):
+    def __init__(self):
+        self._service_api = api.API()
+        super(ServiceController, self).__init__()
+
+    def show(self, req, id):
+        context = req.environ["coriolis.context"]
+        context.can(service_policies.get_services_policy_label("show"))
+        service = self._service_api.get_service(context, id)
+        if not service:
+            raise exc.HTTPNotFound()
+
+        return service_view.single(req, service)
+
+    def index(self, req):
+        context = req.environ["coriolis.context"]
+        context.can(service_policies.get_services_policy_label("list"))
+        return service_view.collection(
+            req, self._service_api.get_services(context))
+
+    def _validate_create_body(self, body):
+        try:
+            service = body["service"]
+            host = service["host"]
+            binary = service["binary"]
+            topic = service.get("topic")
+            mapped_regions = service.get('mapped_regions', [])
+            enabled = service.get("enabled", True)
+            return host, binary, topic, mapped_regions, enabled
+        except Exception as ex:
+            LOG.exception(ex)
+            if hasattr(ex, "message"):
+                msg = ex.message
+            else:
+                msg = str(ex)
+            raise exception.InvalidInput(msg)
+
+    def create(self, req, body):
+        context = req.environ["coriolis.context"]
+        context.can(service_policies.get_services_policy_label("create"))
+        (host, binary, topic, mapped_regions, enabled) = (
+            self._validate_create_body(body))
+        return service_view.single(req, self._service_api.create(
+            context, host=host, binary=binary, topic=topic,
+            mapped_regions=mapped_regions, enabled=enabled,
+            # NOTE: providers and specs should be auto-discovered later:
+            providers={}, specs={}))
+
+    def _validate_update_body(self, body):
+        try:
+            service = body["service"]
+            return {k: service[k] for k in service.keys() & {
+                "enabled", "mapped_regions"}}
+        except Exception as ex:
+            LOG.exception(ex)
+            if hasattr(ex, "message"):
+                msg = ex.message
+            else:
+                msg = str(ex)
+            raise exception.InvalidInput(msg)
+
+    def update(self, req, id, body):
+        context = req.environ["coriolis.context"]
+        context.can(service_policies.get_services_policy_label("update"))
+        updated_values = self._validate_update_body(body)
+        return service_view.single(req, self._service_api.update(
+            req.environ['coriolis.context'], id, updated_values))
+
+    def delete(self, req, id):
+        context = req.environ["coriolis.context"]
+        context.can(service_policies.get_services_policy_label("delete"))
+        try:
+            self._service_api.delete(req.environ['coriolis.context'], id)
+            raise exc.HTTPNoContent()
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
+
+def create_resource():
+    return api_wsgi.Resource(ServiceController())

+ 6 - 1
coriolis/api/v1/views/endpoint_view.py

@@ -10,8 +10,13 @@ def _format_endpoint(req, endpoint, keys=None):
             return
         yield (key, value)
 
-    return dict(itertools.chain.from_iterable(
+    endpoint_dict = dict(itertools.chain.from_iterable(
         transform(k, v) for k, v in endpoint.items()))
+    mapped_regions = endpoint_dict.get('mapped_regions', [])
+    endpoint_dict['mapped_regions'] = [
+        reg['region_id'] for reg in mapped_regions]
+
+    return endpoint_dict
 
 
 def single(req, endpoint):

+ 30 - 0
coriolis/api/v1/views/region_view.py

@@ -0,0 +1,30 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_region(req, region, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    region_dict = dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in region.items()))
+
+    mapped_endpoints = region_dict.get('mapped_endpoints', [])
+    region_dict['mapped_endpoints'] = [
+        endp['endpoint_id'] for endp in mapped_endpoints]
+
+    return region_dict
+
+
+def single(req, region):
+    return {"region": _format_region(req, region)}
+
+
+def collection(req, regions):
+    formatted_regions = [
+        _format_region(req, r) for r in regions]
+    return {'regions': formatted_regions}

+ 30 - 0
coriolis/api/v1/views/service_view.py

@@ -0,0 +1,30 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import itertools
+
+
+def _format_service(req, service, keys=None):
+    def transform(key, value):
+        if keys and key not in keys:
+            return
+        yield (key, value)
+
+    service_dict = dict(itertools.chain.from_iterable(
+        transform(k, v) for k, v in service.items()))
+
+    mapped_regions = service_dict.get('mapped_regions', [])
+    service_dict['mapped_regions'] = [
+        mapping['region_id'] for mapping in mapped_regions]
+
+    return service_dict
+
+
+def single(req, service):
+    return {"service": _format_service(req, service)}
+
+
+def collection(req, services):
+    formatted_services = [
+        _format_service(req, r) for r in services]
+    return {'services': formatted_services}

+ 9 - 1
coriolis/cmd/worker.py

@@ -5,6 +5,7 @@ import sys
 
 from oslo_config import cfg
 
+from coriolis import constants
 from coriolis import service
 from coriolis import utils
 from coriolis.worker.rpc import server as rpc_server
@@ -17,8 +18,15 @@ def main():
          version="1.0.0")
     utils.setup_logging()
 
+    worker_topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
+        "host": utils.get_hostname(),
+        "binary": utils.get_binary_name()}
+
+    # TODO(aznashwan): find way to update the messaging topic being
+    # listened on by oslo_messaging.service.Service so as to not have to
+    # "hardcode" the worker topic from the binary level:
     server = service.MessagingService(
-        'coriolis_worker', [rpc_server.WorkerServerEndpoint()],
+        worker_topic, [rpc_server.WorkerServerEndpoint()],
         rpc_server.VERSION)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())

+ 52 - 2
coriolis/conductor/rpc/client.py

@@ -26,10 +26,11 @@ class ConductorClient(object):
         self._client = rpc.get_client(target, timeout=timeout)
 
     def create_endpoint(self, ctxt, name, endpoint_type, description,
-                        connection_info):
+                        connection_info, mapped_regions):
         return self._client.call(
             ctxt, 'create_endpoint', name=name, endpoint_type=endpoint_type,
-            description=description, connection_info=connection_info)
+            description=description, connection_info=connection_info,
+            mapped_regions=mapped_regions)
 
     def update_endpoint(self, ctxt, endpoint_id, updated_values):
         return self._client.call(
@@ -308,3 +309,52 @@ class ConductorClient(object):
 
     def get_all_diagnostics(self, ctxt):
         return self._client.call(ctxt, 'get_all_diagnostics')
+
+    def create_region(
+            self, ctxt, region_name, description="", enabled=True):
+        return self._client.call(
+            ctxt, 'create_region',
+            region_name=region_name,
+            description=description,
+            enabled=enabled)
+
+    def get_regions(self, ctxt):
+        return self._client.call(ctxt, 'get_regions')
+
+    def get_region(self, ctxt, region_id):
+        return self._client.call(
+            ctxt, 'get_region', region_id=region_id)
+
+    def update_region(self, ctxt, region_id, updated_values):
+        return self._client.call(
+            ctxt, 'update_region',
+            region_id=region_id,
+            updated_values=updated_values)
+
+    def delete_region(self, ctxt, region_id):
+        return self._client.call(
+            ctxt, 'delete_region', region_id=region_id)
+
+    def register_service(
+            self, ctxt, host, binary, topic, enabled, providers,
+            specs, mapped_regions):
+        return self._client.call(
+            ctxt, 'register_service', host=host, binary=binary,
+            topic=topic, enabled=enabled, providers=providers, specs=specs,
+            mapped_regions=mapped_regions)
+
+    def get_services(self, ctxt):
+        return self._client.call(ctxt, 'get_services')
+
+    def get_service(self, ctxt, service_id):
+        return self._client.call(
+            ctxt, 'get_service', service_id=service_id)
+
+    def update_service(self, ctxt, service_id, updated_values):
+        return self._client.call(
+            ctxt, 'update_service', service_id=service_id,
+            updated_values=updated_values)
+
+    def delete_service(self, ctxt, service_id):
+        return self._client.call(
+            ctxt, 'delete_service', service_id=service_id)

+ 220 - 19
coriolis/conductor/rpc/server.py

@@ -3,6 +3,7 @@
 
 import copy
 import functools
+import random
 import uuid
 
 from oslo_concurrency import lockutils
@@ -132,6 +133,30 @@ def tasks_execution_synchronized(func):
     return wrapper
 
 
+def region_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, region_id, *args, **kwargs):
+        @lockutils.synchronized(
+            constants.REGION_LOCK_NAME_FORMAT % region_id,
+            external=True)
+        def inner():
+            return func(self, ctxt, region_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
+def service_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, service_id, *args, **kwargs):
+        @lockutils.synchronized(
+            constants.SERVICE_LOCK_NAME_FORMAT % service_id,
+            external=True)
+        def inner():
+            return func(self, ctxt, service_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
 class ConductorServerEndpoint(object):
     def __init__(self):
         self._licensing_client = licensing_client.LicensingClient.from_env()
@@ -141,12 +166,33 @@ class ConductorServerEndpoint(object):
     def get_all_diagnostics(self, ctxt):
         conductor = self.get_diagnostics(ctxt)
         cron = self._replica_cron_client.get_diagnostics(ctxt)
-        worker = self._rpc_worker_client.get_diagnostics(ctxt)
+        # TODO(aznashwan): include diagnostics for every worker service:
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        worker = worker_rpc.get_diagnostics(ctxt)
         return [
             conductor,
             cron,
-            worker,
-        ]
+            worker]
+
+    def _get_rpc_for_worker_service(self, service):
+         return rpc_worker_client.WorkerClient(topic=service.topic)
+
+    def _get_any_worker_service(self, ctxt):
+        services = db_api.get_services(ctxt)
+        if not services:
+            raise exception.CoriolisException("No services.")
+        return random.choice(services)
+
+    def _run_operation_on_worker_service(
+            self, ctxt, service, operation_name, *args, **kwargs):
+        service_rpc_client = self._get_rpc_for_worker_service(service)
+        operation = getattr(service_rpc_client, operation_name, None)
+        if not operation:
+            raise exception.CoriolisException(
+                "Invalid operation name '%s' for RPC client '%s'" % (
+                    operation_name, service_rpc_client))
+        return operation(*args, **kwargs)
 
     def _check_delete_reservation_for_transfer(self, transfer_action):
         action_id = transfer_action.base_id
@@ -204,8 +250,9 @@ class ConductorServerEndpoint(object):
                 "all reservation licensing checks.", action_id)
 
     def create_endpoint(self, ctxt, name, endpoint_type, description,
-                        connection_info):
+                        connection_info, mapped_regions=None):
         endpoint = models.Endpoint()
+        endpoint.id = str(uuid.uuid4())
         endpoint.name = name
         endpoint.type = endpoint_type
         endpoint.description = description
@@ -213,12 +260,31 @@ class ConductorServerEndpoint(object):
 
         db_api.add_endpoint(ctxt, endpoint)
         LOG.info("Endpoint created: %s", endpoint.id)
+
+        # add region associations:
+        if mapped_regions:
+            try:
+                db_api.update_endpoint(
+                    ctxt, endpoint.id, {
+                        "mapped_regions": mapped_regions})
+            except Exception as ex:
+                LOG.warn(
+                    "Error adding region mappings during new endpoint creation "
+                    "(name: %s), cleaning up endpoint and all created "
+                    "mappings for regions: %s", endpoint.name, mapped_regions)
+                db_api.delete_endpoint(ctxt, endpoint.id)
+                raise
+
         return self.get_endpoint(ctxt, endpoint.id)
 
+    @endpoint_synchronized
     def update_endpoint(self, ctxt, endpoint_id, updated_values):
+        LOG.info(
+            "Attempting to update endpoint '%s' with payload: %s",
+            endpoint_id, updated_values)
         db_api.update_endpoint(ctxt, endpoint_id, updated_values)
         LOG.info("Endpoint updated: %s", endpoint_id)
-        return self.get_endpoint(ctxt, endpoint_id)
+        return db_api.get_endpoint(ctxt, endpoint_id)
 
     def get_endpoints(self, ctxt):
         return db_api.get_endpoints(ctxt)
@@ -244,7 +310,9 @@ class ConductorServerEndpoint(object):
                                marker, limit, instance_name_pattern):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        return self._rpc_worker_client.get_endpoint_instances(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_endpoint_instances(
             ctxt, endpoint.type, endpoint.connection_info,
             source_environment, marker, limit, instance_name_pattern)
 
@@ -252,7 +320,9 @@ class ConductorServerEndpoint(object):
             self, ctxt, endpoint_id, source_environment, instance_name):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        return self._rpc_worker_client.get_endpoint_instance(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_endpoint_instance(
             ctxt, endpoint.type, endpoint.connection_info,
             source_environment, instance_name)
 
@@ -260,50 +330,68 @@ class ConductorServerEndpoint(object):
             self, ctxt, endpoint_id, env, option_names):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        return self._rpc_worker_client.get_endpoint_source_options(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_endpoint_source_options(
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
 
     def get_endpoint_destination_options(
             self, ctxt, endpoint_id, env, option_names):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        return self._rpc_worker_client.get_endpoint_destination_options(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_endpoint_destination_options(
             ctxt, endpoint.type, endpoint.connection_info, env, option_names)
 
     def get_endpoint_networks(self, ctxt, endpoint_id, env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        return self._rpc_worker_client.get_endpoint_networks(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_endpoint_networks(
             ctxt, endpoint.type, endpoint.connection_info, env)
 
     def get_endpoint_storage(self, ctxt, endpoint_id, env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
 
-        return self._rpc_worker_client.get_endpoint_storage(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_endpoint_storage(
             ctxt, endpoint.type, endpoint.connection_info, env)
 
     def validate_endpoint_connection(self, ctxt, endpoint_id):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
-        return self._rpc_worker_client.validate_endpoint_connection(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.validate_endpoint_connection(
             ctxt, endpoint.type, endpoint.connection_info)
 
     def validate_endpoint_target_environment(
             self, ctxt, endpoint_id, target_env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
-        return self._rpc_worker_client.validate_endpoint_target_environment(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.validate_endpoint_target_environment(
             ctxt, endpoint.type, target_env)
 
     def validate_endpoint_source_environment(
             self, ctxt, endpoint_id, source_env):
         endpoint = self.get_endpoint(ctxt, endpoint_id)
-        return self._rpc_worker_client.validate_endpoint_source_environment(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.validate_endpoint_source_environment(
             ctxt, endpoint.type, source_env)
 
     def get_available_providers(self, ctxt):
-        return self._rpc_worker_client.get_available_providers(ctxt)
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_available_providers(ctxt)
 
     def get_provider_schemas(self, ctxt, platform_name, provider_type):
-        return self._rpc_worker_client.get_provider_schemas(
+        worker_rpc = self._get_rpc_for_worker_service(
+            self._get_any_worker_service(ctxt))
+        return worker_rpc.get_provider_schemas(
             ctxt, platform_name, provider_type)
 
     @staticmethod
@@ -378,7 +466,9 @@ class ConductorServerEndpoint(object):
                     task.id, execution.id)
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_PENDING)
-                self._rpc_worker_client.begin_task(
+                worker_rpc = self._get_rpc_for_worker_service(
+                        self._get_any_worker_service(ctxt))
+                worker_rpc.begin_task(
                     ctxt, server=None,
                     task_id=task.id,
                     task_type=task.task_type,
@@ -1328,7 +1418,10 @@ class ConductorServerEndpoint(object):
                         task.status, task.id, execution.id)
                     db_api.set_task_status(
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
-                    self._rpc_worker_client.cancel_task(
+                    worker_rpc = self._get_rpc_for_worker_service(
+                        self._get_any_worker_service(ctxt))
+                    # TODO(aznashwan): cancel on right worker:
+                    worker_rpc.cancel_task(
                         ctxt, task.host, task.id, task.process_id, force)
                 # let any on-error tasks run to completion but mark
                 # them as CANCELLING_AFTER_COMPLETION so they will
@@ -1594,7 +1687,9 @@ class ConductorServerEndpoint(object):
                 task_info = action.info[task.instance]
             db_api.set_task_status(
                 ctxt, task.id, constants.TASK_STATUS_PENDING)
-            self._rpc_worker_client.begin_task(
+            worker_rpc = self._get_rpc_for_worker_service(
+                self._get_any_worker_service(ctxt))
+            worker_rpc.begin_task(
                 ctxt, server=None,
                 task_id=task.id,
                 task_type=task.task_type,
@@ -2377,3 +2472,109 @@ class ConductorServerEndpoint(object):
 
     def get_diagnostics(self, ctxt):
         return utils.get_diagnostics_info()
+
+    def create_region(self, ctxt, region_name, description="", enabled=True):
+        region = models.Region()
+        region.id = str(uuid.uuid4())
+        region.name = region_name
+        region.description = description
+        region.enabled = enabled
+        db_api.add_region(ctxt, region)
+        return self.get_region(ctxt, region.id)
+
+    def get_regions(self, ctxt):
+        return db_api.get_regions(ctxt)
+
+    @region_synchronized
+    def get_region(self, ctxt, region_id):
+        region = db_api.get_region(ctxt, region_id)
+        if not region:
+            raise exception.NotFound(
+                "Region with ID '%s' not found." % region_id)
+        return region
+
+    @region_synchronized
+    def update_region(self, ctxt, region_id, updated_values):
+        LOG.info(
+            "Attempting to update region '%s' with payload: %s",
+            region_id, updated_values)
+        db_api.update_region(ctxt, region_id, updated_values)
+        LOG.info("Region '%s' successfully updated", region_id)
+        return db_api.get_region(ctxt, region_id)
+
+    @region_synchronized
+    def delete_region(self, ctxt, region_id):
+        # TODO(aznashwan): add checks for endpoints/services
+        # associated to the region before deletion:
+        db_api.delete_region(ctxt, region_id)
+
+    def register_service(
+            self, ctxt, host, binary, topic, enabled, mapped_regions=None):
+        exists = db_api.find_service(ctxt, host, binary, topic=topic)
+        if exists:
+            raise exception.Conflict(
+                "A Service with the specified parameters (host %s, binary %s, "
+                "topic %s) has already been registered under ID: %s" % (
+                    host, binary, topic, exists.id))
+
+        service = models.Service()
+        service.id = str(uuid.uuid4())
+        service.host = host
+        service.binary = binary
+        service.enabled = enabled
+        service.topic = topic
+        if not service.topic:
+            service.topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
+                "host": service.host, "binary": service.binary}
+
+        worker_rpc = self._get_rpc_for_worker_service(service)
+        status = worker_rpc.get_service_status(ctxt)
+
+        service.providers = status["providers"]
+        service.specs = status["specs"]
+
+        # create the service:
+        db_api.add_service(ctxt, service)
+        LOG.debug(
+            "Added new service to DB: %s", service.id)
+
+        # add region associations:
+        if mapped_regions:
+            try:
+                db_api.update_service(
+                    ctxt, service.id, {
+                        "mapped_regions": mapped_regions})
+            except Exception as ex:
+                LOG.warn(
+                    "Error adding region mappings during new service "
+                    "registration (host: %s), cleaning up endpoint and "
+                    "all created mappings for regions: %s",
+                    service.host, mapped_regions)
+                db_api.delete_service(ctxt, service.id)
+                raise
+
+        return self.get_service(ctxt, service.id)
+
+    def get_services(self, ctxt):
+        return db_api.get_services(ctxt)
+
+    @service_synchronized
+    def get_service(self, ctxt, service_id):
+        service = db_api.get_service(ctxt, service_id)
+        if not service:
+            raise exception.NotFound(
+                "Service with ID '%s' not found." % service_id)
+        return service
+
+    @service_synchronized
+    def update_service(self, ctxt, service_id, updated_values):
+        LOG.info(
+            "Attempting to update service '%s' with payload: %s",
+            service_id, updated_values)
+        db_api.update_service(ctxt, service_id, updated_values)
+        LOG.info("Successfully updated service '%s'", service_id)
+        return db_api.get_service(ctxt, service_id)
+
+    @service_synchronized
+    def delete_service(self, ctxt, service_id):
+        db_api.delete_service(ctxt, service_id)

+ 13 - 0
coriolis/constants.py

@@ -1,6 +1,8 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+DEFAULT_CORIOLIS_REGION_NAME = "Default Region"
+
 EXECUTION_STATUS_RUNNING = "RUNNING"
 EXECUTION_STATUS_COMPLETED = "COMPLETED"
 EXECUTION_STATUS_ERROR = "ERROR"
@@ -122,6 +124,9 @@ TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = (
 TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
 TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 
+TASK_PLATFORM_SOURCE = "SOURCE_PLATFORM"
+TASK_PLATFORM_DESTINATION = "DESTINATION_PLATFORM"
+
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2
 PROVIDER_TYPE_REPLICA_IMPORT = 4
@@ -197,6 +202,8 @@ ENDPOINT_LOCK_NAME_FORMAT = "endpoint-%s"
 MIGRATION_LOCK_NAME_FORMAT = "migration-%s"
 REPLICA_LOCK_NAME_FORMAT = "replica-%s"
 SCHEDULE_LOCK_NAME_FORMAT = "schedule-%s"
+REGION_LOCK_NAME_FORMAT = "region-%s"
+SERVICE_LOCK_NAME_FORMAT = "service-%s"
 
 EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = {
     EXECUTION_TYPE_MIGRATION: MIGRATION_LOCK_NAME_FORMAT,
@@ -205,3 +212,9 @@ EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP = {
     EXECUTION_TYPE_REPLICA_UPDATE: REPLICA_LOCK_NAME_FORMAT,
     EXECUTION_TYPE_REPLICA_DISKS_DELETE: REPLICA_LOCK_NAME_FORMAT
 }
+
+SERVICE_STATUS_UP = "UP"
+SERVICE_STATUS_DOWN = "DOWN"
+SERVICE_STATUS_UNKNOWN = "UNKNOWN"
+
+SERVICE_MESSAGING_TOPIC_FORMAT = "%(host)s-%(binary)s"

+ 432 - 12
coriolis/db/api.py

@@ -14,6 +14,7 @@ from sqlalchemy.sql import null
 
 from coriolis.db.sqlalchemy import models
 from coriolis import exception
+from coriolis import utils
 
 CONF = cfg.CONF
 db_options.set_defaults(CONF)
@@ -62,6 +63,38 @@ def _model_query(context, *args):
     return session.query(*args)
 
 
+def _update_sqlalchemy_object_fields(obj, updateable_fields, values_to_update):
+    """ Updates the given 'values_to_update' on the provided sqlalchemy object
+    as long as they are included as 'updateable_fields'.
+    :param obj: object: sqlalchemy object
+    :param updateable_fields: list(str): list of fields which are updateable
+    :param values_to_update: dict: dict with the key/vals to update
+    """
+    if not isinstance(values_to_update, dict):
+        raise exception.InvalidInput(
+            "Properties to update for DB object of type '%s' must be a dict, "
+            "got the following (type %s): %s" % (
+                type(obj), type(values_to_update), values_to_update))
+
+    non_updateable_fields = set(
+        values_to_update.keys()).difference(
+            set(updateable_fields))
+    if non_updateable_fields:
+        raise exception.Conflict(
+            "Fields %s for '%s' database cannot be updated. "
+            "Only updateable fields are: %s" % (
+                non_updateable_fields, type(obj), updateable_fields))
+
+    for field_name, field_val in values_to_update.items():
+        if not hasattr(obj, field_name):
+            raise exception.InvalidInput(
+                "No region field named '%s' to update." % field_name)
+        setattr(obj, field_name, field_val)
+    LOG.debug(
+        "Successfully updated the following fields on DB object "
+        "of type '%s': %s" % (type(obj), values_to_update.keys()))
+
+
 def _get_replica_schedules_filter(context, replica_id=None,
                                   schedule_id=None, expired=True):
     now = timeutils.utcnow()
@@ -100,7 +133,8 @@ def _soft_delete_aware_query(context, *args, **kwargs):
 
 @enginefacade.reader
 def get_endpoints(context):
-    q = _soft_delete_aware_query(context, models.Endpoint)
+    q = _soft_delete_aware_query(context, models.Endpoint).options(
+        orm.joinedload('mapped_regions'))
     if is_user_context(context):
         q = q.filter(
             models.Endpoint.project_id == context.tenant)
@@ -109,7 +143,8 @@ def get_endpoints(context):
 
 @enginefacade.reader
 def get_endpoint(context, endpoint_id):
-    q = _soft_delete_aware_query(context, models.Endpoint)
+    q = _soft_delete_aware_query(context, models.Endpoint).options(
+        orm.joinedload('mapped_regions'))
     if is_user_context(context):
         q = q.filter(
             models.Endpoint.project_id == context.tenant)
@@ -128,10 +163,94 @@ def add_endpoint(context, endpoint):
 def update_endpoint(context, endpoint_id, updated_values):
     endpoint = get_endpoint(context, endpoint_id)
     if not endpoint:
-        raise exception.NotFound("Endpoint not found")
-    for n in ["name", "description", "connection_info"]:
-        if n in updated_values:
-            setattr(endpoint, n, updated_values[n])
+        raise exception.NotFound("Endpoint with ID '%s' found" % endpoint_id)
+
+
+    if not isinstance(updated_values, dict):
+        raise exception.InvalidInput(
+            "Update payload for endpoints must be a dict. Got the following "
+            "(type: %s): %s" % (type(updated_values), updated_values))
+
+    def _try_unmap_regions(region_ids):
+         for region_to_unmap in region_ids:
+            try:
+                LOG.debug(
+                    "Attempting to unmap region '%s' from endpoint '%s'",
+                    region_to_unmap, endpoint_id)
+                delete_endpoint_region_mapping(
+                    context, endpoint_id, region_to_unmap)
+            except Exception as ex:
+                LOG.warn(
+                    "Exception occurred while attempting to unmap region '%s' "
+                    "from endpoint '%s'. Ignoring. Error was: %s",
+                    region_to_unmap, endpoint_id,
+                    utils.get_exception_details())
+
+    newly_mapped_regions = []
+    regions_to_unmap = []
+    # NOTE: `.pop()` required for  `_update_sqlalchemy_object_fields` call:
+    desired_region_mappings = updated_values.pop('mapped_regions', None)
+    if desired_region_mappings is not None:
+        # ensure all requested regions exist:
+        for region_id in desired_region_mappings:
+            region = get_region(context, region_id)
+            if not region:
+                raise exception.NotFound(
+                    "Could not find region with ID '%s' for associating "
+                    "with endpoint '%s' during update process." % (
+                        region_id, endpoint_id))
+
+        # get all existing mappings:
+        existing_region_mappings = [
+            mapping.id
+            for mapping in get_region_mappings_for_endpoint(
+                context, endpoint_id)]
+
+        # check and add new mappings:
+        to_map = set(
+            desired_region_mappings).difference(set(existing_region_mappings))
+        regions_to_unmap = set(
+            existing_region_mappings).difference(set(desired_region_mappings))
+
+        LOG.debug(
+            "Remapping regions for endpoint '%s' from %s to %s",
+            endpoint_id, existing_region_mappings, desired_region_mappings)
+
+        region_id = None
+        try:
+            for region_id in to_map:
+                mapping = models.EndpointRegionMapping()
+                mapping.region_id = region_id
+                mapping.endpoint_id = endpoint_id
+                add_endpoint_region_mapping(context, mapping)
+                newly_mapped_regions.append(region_id)
+        except Exception as ex:
+            LOG.warn(
+                "Exception occurred while adding region mapping for '%s' to "
+                "endpoint '%s'. Cleaning up created mappings (%s). Error was: "
+                "%s", region_id, endpoint_id, newly_mapped_regions,
+                utils.get_exception_details())
+            _try_unmap_regions(newly_mapped_regions)
+            raise
+
+
+    updateable_fields = ["name", "description", "connection_info"]
+    try:
+        _update_sqlalchemy_object_fields(
+            endpoint, updateable_fields, updated_values)
+    except Exception as ex:
+        LOG.warn(
+            "Exception occurred while updating fields of endpoint '%s'. "
+            "Cleaning ""up created mappings (%s). Error was: %s",
+            endpoint_id, newly_mapped_regions, utils.get_exception_details())
+        _try_unmap_regions(newly_mapped_regions)
+        raise
+
+    # remove all of the old region mappings:
+    LOG.debug(
+        "Unmapping the following regions during update of endpoint '%s': %s",
+        endpoint_id, regions_to_unmap)
+    _try_unmap_regions(regions_to_unmap)
 
 
 @enginefacade.writer
@@ -142,7 +261,7 @@ def delete_endpoint(context, endpoint_id):
     count = _soft_delete_aware_query(context, models.Endpoint).filter_by(
         **args).soft_delete()
     if count == 0:
-        raise exception.NotFound("0 entries were soft deleted")
+        raise exception.NotFound("0 Endpoint entries were soft deleted")
 
 
 @enginefacade.reader
@@ -280,7 +399,8 @@ def _get_replica_with_tasks_executions_options(q):
 @enginefacade.reader
 def get_replicas(context,
                  include_tasks_executions=False,
-                 include_info=False):
+                 include_info=False,
+                 to_dict=True):
     q = _soft_delete_aware_query(context, models.Replica)
     if include_tasks_executions:
         q = _get_replica_with_tasks_executions_options(q)
@@ -291,7 +411,9 @@ def get_replicas(context,
         q = q.filter(
             models.Replica.project_id == context.tenant)
     db_result = q.all()
-    return [i.to_dict(include_info=include_info) for i in db_result]
+    if to_dict:
+        return [i.to_dict(include_info=include_info) for i in db_result]
+    return db_result
 
 
 @enginefacade.reader
@@ -358,7 +480,7 @@ def get_replica_migrations(context, replica_id):
 
 @enginefacade.reader
 def get_migrations(context, include_tasks=False,
-                   include_info=False):
+                   include_info=False, to_dict=True):
     q = _soft_delete_aware_query(context, models.Migration)
     if include_tasks:
         q = _get_migration_task_query_options(q)
@@ -371,8 +493,9 @@ def get_migrations(context, include_tasks=False,
     if is_user_context(context):
         args["project_id"] = context.tenant
     result = q.filter_by(**args).all()
-    to_dict = [i.to_dict(include_info=include_info) for i in result]
-    return to_dict
+    if to_dict:
+        return [i.to_dict(include_info=include_info) for i in result]
+    return result
 
 
 def _get_tasks_with_details_options(query):
@@ -617,3 +740,300 @@ def update_replica(context, replica_id, updated_values):
     # the oslo_db library uses this method for both the `created_at` and
     # `updated_at` fields
     setattr(replica, 'updated_at', timeutils.utcnow())
+
+
+@enginefacade.writer
+def add_region(context, region):
+    context.session.add(region)
+
+
+@enginefacade.reader
+def get_regions(context):
+    q = _soft_delete_aware_query(context, models.Region).options(
+        orm.joinedload('mapped_endpoints'))
+    return q.all()
+
+
+@enginefacade.reader
+def get_region(context, region_id):
+    q = _soft_delete_aware_query(context, models.Region).options(
+        orm.joinedload('mapped_endpoints'))
+    return q.filter(
+        models.Region.id == region_id).first()
+
+
+@enginefacade.writer
+def update_region(context, region_id, updated_values):
+    if not region_id:
+        raise exception.InvalidInput(
+            "No region ID specified for updating.")
+    region = get_region(context, region_id)
+    if not region:
+        raise exception.NotFound(
+            "Region with ID '%s' does not exist." % region_id)
+
+    updateable_fields = ["name", "description", "enabled"]
+    _update_sqlalchemy_object_fields(
+        region, updateable_fields, updated_values)
+
+
+@enginefacade.writer
+def delete_region(context, region_id):
+    count = _soft_delete_aware_query(context, models.Region).filter_by(
+        id=region_id).soft_delete()
+    if count == 0:
+        raise exception.NotFound("0 region entries were soft deleted")
+
+
+@enginefacade.writer
+def add_endpoint_region_mapping(context, endpoint_region_mapping):
+    region_id = endpoint_region_mapping.region_id
+    endpoint_id = endpoint_region_mapping.endpoint_id
+
+    if None in [region_id, endpoint_id]:
+        raise exception.InvalidInput(
+            "Provided endpoint region mapping params for the region ID "
+            "('%s') and the endpoint ID ('%s') must both be non-null." % (
+                region_id, endpoint_id))
+
+    context.session.add(endpoint_region_mapping)
+
+
+@enginefacade.reader
+def get_endpoint_region_mapping(context, endpoint_id, region_id):
+    q = _soft_delete_aware_query(context, models.EndpointRegionMapping)
+    q = q.filter(
+        models.EndpointRegionMapping.region == region_id)
+    q = q.filter(
+        models.EndpointRegionMapping.endpoint_id == endpoint_id)
+    return q.all()
+
+
+@enginefacade.writer
+def delete_endpoint_region_mapping(context, endpoint_id, region_id):
+    args = {"endpoint_id": endpoint_id, "region_id": region_id}
+    count = _soft_delete_aware_query(
+        context, models.EndpointRegionMapping).filter_by(
+            **args).soft_delete()
+    if count == 0:
+        raise exception.NotFound(
+            "There is no mapping between endpoint '%s' and region '%s'." % (
+                endpoint_id, region_id))
+    LOG.debug(
+        "Deleted mapping between endpoint '%s' and region '%s' from DB",
+        endpoint_id, region_id)
+
+
+@enginefacade.reader
+def get_region_mappings_for_endpoint(
+        context, endpoint_id, enabled_regions_only=False):
+    q = _soft_delete_aware_query(context, models.EndpointRegionMapping)
+    q = q.join(models.Region)
+    q = q.filter(
+        models.EndpointRegionMapping.endpoint_id == endpoint_id)
+    if enabled_regions_only:
+        q = q.filter(
+            models.Region.enabled == True)
+    return q.all()
+
+
+@enginefacade.reader
+def get_mapped_endpoints_for_region(context, region_id):
+    q = _soft_delete_aware_query(context, models.Endpoint)
+    q = q.join(models.EndpointRegionMapping)
+    q = q.filter(
+        models.EndpointRegionMapping.endpoint_id == region_id)
+    return q.all()
+
+
+@enginefacade.writer
+def add_service(context, service):
+    context.session.add(service)
+
+
+@enginefacade.reader
+def get_services(context):
+    q = _soft_delete_aware_query(context, models.Service).options(
+        orm.joinedload('mapped_regions'))
+    return q.all()
+
+
+@enginefacade.reader
+def get_service(context, service_id):
+    q = _soft_delete_aware_query(context, models.Service).options(
+        orm.joinedload('mapped_regions'))
+    return q.filter(
+        models.Service.id == service_id).first()
+
+
+@enginefacade.reader
+def find_service(context, host, binary, topic=None):
+    args = {"host": host, "binary": binary}
+    if topic:
+        args["topic"] = topic
+    q = _soft_delete_aware_query(context, models.Service).options(
+         orm.joinedload('mapped_regions')).filter_by(**args)
+    return q.first()
+
+
+@enginefacade.writer
+def update_service(context, service_id, updated_values):
+    if not service_id:
+        raise exception.InvalidInput(
+            "No service ID specified for updating.")
+    service = get_service(context, service_id)
+    if not service:
+        raise exception.NotFound(
+            "Service with ID '%s' does not exist." % service_id)
+
+    if not isinstance(updated_values, dict):
+        raise exception.InvalidInput(
+            "Update payload for services must be a dict. Got the following "
+            "(type: %s): %s" % (type(updated_values), updated_values))
+
+    def _try_unmap_regions(region_ids):
+         for region_to_unmap in region_ids:
+            try:
+                LOG.debug(
+                    "Attempting to unmap region '%s' from service '%s'",
+                    region_to_unmap, service_id)
+                delete_service_region_mapping(
+                    context, service_id, region_to_unmap)
+            except Exception as ex:
+                LOG.warn(
+                    "Exception occurred while attempting to unmap region '%s' "
+                    "from service '%s'. Ignoring. Error was: %s",
+                    region_to_unmap, service_id,
+                    utils.get_exception_details())
+
+    newly_mapped_regions = []
+    regions_to_unmap = []
+    # NOTE: `.pop()` required for  `_update_sqlalchemy_object_fields` call:
+    desired_region_mappings = updated_values.pop('mapped_regions', None)
+    if desired_region_mappings is not None:
+        # ensure all requested regions exist:
+        for region_id in desired_region_mappings:
+            region = get_region(context, region_id)
+            if not region:
+                raise exception.NotFound(
+                    "Could not find region with ID '%s' for associating "
+                    "with serce '%s' during update process." % (
+                        region_id, service_id))
+
+        # get all existing mappings:
+        existing_region_mappings = [
+            mapping.region_id
+            for mapping in get_region_mappings_for_service(
+                context, service_id)]
+
+        # check and add new mappings:
+        to_map = set(
+            desired_region_mappings).difference(set(existing_region_mappings))
+        regions_to_unmap = set(
+            existing_region_mappings).difference(set(desired_region_mappings))
+
+        LOG.debug(
+            "Remapping regions for service '%s' from %s to %s",
+            service_id, existing_region_mappings, desired_region_mappings)
+
+        region_id = None
+        try:
+            for region_id in to_map:
+                mapping = models.ServiceRegionMapping()
+                mapping.region_id = region_id
+                mapping.service_id = service_id
+                add_service_region_mapping(context, mapping)
+                newly_mapped_regions.append(region_id)
+        except Exception as ex:
+            LOG.warn(
+                "Exception occurred while adding region mapping for '%s' to "
+                "service '%s'. Cleaning up created mappings (%s). Error was: "
+                "%s", region_id, service_id, newly_mapped_regions,
+                utils.get_exception_details())
+            _try_unmap_regions(newly_mapped_regions)
+            raise
+
+
+    updateable_fields = ["enabled", "status", "providers", "specs"]
+    try:
+        _update_sqlalchemy_object_fields(
+            service, updateable_fields, updated_values)
+    except Exception as ex:
+        LOG.warn(
+            "Exception occurred while updating fields of service '%s'. "
+            "Cleaning ""up created mappings (%s). Error was: %s",
+            service_id, newly_mapped_regions, utils.get_exception_details())
+        _try_unmap_regions(newly_mapped_regions)
+        raise
+
+    # remove all of the old region mappings:
+    LOG.debug(
+        "Unmapping the following regions during update of service '%s': %s",
+        service_id, regions_to_unmap)
+    _try_unmap_regions(regions_to_unmap)
+
+
+@enginefacade.writer
+def delete_service(context, service_id):
+    count = _soft_delete_aware_query(context, models.Service).filter_by(
+        id=service_id).soft_delete()
+    if count == 0:
+        raise exception.NotFound("0 service entries were soft deleted")
+
+
+@enginefacade.writer
+def add_service_region_mapping(context, service_region_mapping):
+    region_id = service_region_mapping.region_id
+    service_id = service_region_mapping.service_id
+
+    if None in [region_id, service_id]:
+        raise exception.InvalidInput(
+            "Provided service region mapping params for the region ID "
+            "('%s') and the service ID ('%s') must both be non-null." % (
+                region_id, service_id))
+
+    context.session.add(service_region_mapping)
+
+
+@enginefacade.reader
+def get_service_region_mapping(context, service_id, region_id):
+    q = _soft_delete_aware_query(context, models.ServiceRegionMapping)
+    q = q.filter(
+        models.ServiceRegionMapping.region == region_id)
+    q = q.filter(
+        models.ServiceRegionMapping.service_id == service_id)
+    return q.all()
+
+
+@enginefacade.writer
+def delete_service_region_mapping(context, service_id, region_id):
+    args = {"service_id": service_id, "region_id": region_id}
+    count = _soft_delete_aware_query(
+        context, models.ServiceRegionMapping).filter_by(
+            **args).soft_delete()
+    if count == 0:
+        raise exception.NotFound(
+            "There is no mapping between service '%s' and region '%s'." % (
+                service_id, region_id))
+
+
+@enginefacade.reader
+def get_region_mappings_for_service(
+        context, service_id, enabled_regions_only=False):
+    q = _soft_delete_aware_query(context, models.ServiceRegionMapping)
+    q = q.join(models.Region)
+    q = q.filter(
+        models.ServiceRegionMapping.service_id == service_id)
+    if enabled_regions_only:
+        q = q.filter(
+            models.Region.enabled == True)
+    return q.all()
+
+
+@enginefacade.reader
+def get_mapped_services_for_region(context, region_id):
+    q = _soft_delete_aware_query(context, models.Service)
+    q = q.join(models.ServiceRegionMapping)
+    q = q.filter(
+        models.ServiceRegionMapping.service_id == region_id)
+    return q.all()

+ 124 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/014_adds_worker_service_regions.py

@@ -0,0 +1,124 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import uuid
+
+import sqlalchemy
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    sqlalchemy.Table(
+        'endpoint', meta, autoload=True)
+
+    tables = []
+
+    # declare region table:
+    tables.append(
+        sqlalchemy.Table(
+            'region',
+            meta,
+            sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                              default=lambda: str(uuid.uuid4())),
+            sqlalchemy.Column('name', sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                'description', sqlalchemy.String(1024), nullable=True),
+            sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted', sqlalchemy.String(36)),
+            sqlalchemy.Column(
+                'enabled', sqlalchemy.Boolean, nullable=True,
+                default=lambda: False)))
+
+    # declare endpoint-region-mapping table:
+    tables.append(
+        sqlalchemy.Table(
+            'endpoint_region_mapping',
+            meta,
+            sqlalchemy.Column(
+                'id',
+                sqlalchemy.String(36),
+                primary_key=True,
+                default=lambda: str(uuid.uuid4())),
+            sqlalchemy.Column(
+                'endpoint_id',
+                sqlalchemy.String(36),
+                sqlalchemy.ForeignKey('endpoint.id'),
+                nullable=False),
+            sqlalchemy.Column(
+                'region_id',
+                sqlalchemy.String(36),
+                sqlalchemy.ForeignKey('region.id'),
+                nullable=False),
+            sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted', sqlalchemy.String(36))))
+
+    # declare service table:
+    tables.append(
+        sqlalchemy.Table(
+            'service',
+            meta,
+            sqlalchemy.Column(
+                'id',
+                sqlalchemy.String(36),
+                primary_key=True,
+                default=lambda: str(uuid.uuid4())),
+            sqlalchemy.Column(
+                'enabled', sqlalchemy.Boolean, nullable=True,
+                default=lambda: False),
+            sqlalchemy.Column(
+                'host', sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                'binary', sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                'topic', sqlalchemy.String(255), nullable=False),
+            sqlalchemy.Column(
+                'status', sqlalchemy.String(255), nullable=False,
+                default=lambda: "UNKNOWN"),
+            sqlalchemy.Column(
+                'providers', sqlalchemy.Text(), nullable=False),
+            sqlalchemy.Column(
+                'specs', sqlalchemy.Text(), nullable=False),
+            sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted', sqlalchemy.String(36))))
+
+    # declare service-region mappings table:
+    tables.append(
+        sqlalchemy.Table(
+            'service_region_mapping',
+            meta,
+            sqlalchemy.Column(
+                'id',
+                sqlalchemy.String(36),
+                primary_key=True,
+                default=lambda: str(uuid.uuid4())),
+            sqlalchemy.Column(
+                'service_id',
+                sqlalchemy.String(36),
+                sqlalchemy.ForeignKey('service.id'),
+                nullable=False),
+            sqlalchemy.Column(
+                'region_id',
+                sqlalchemy.String(36),
+                sqlalchemy.ForeignKey('region.id'),
+                nullable=False),
+            sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+            sqlalchemy.Column('deleted', sqlalchemy.String(36))))
+
+    for index, table in enumerate(tables):
+        try:
+            table.create()
+        except Exception:
+            # If an error occurs, drop all tables created so far to return
+            # to the previously existing state.
+            meta.drop_all(tables=tables[:index])
+            raise

+ 120 - 3
coriolis/db/sqlalchemy/models.py

@@ -7,7 +7,9 @@ from oslo_db.sqlalchemy import models
 import sqlalchemy
 from sqlalchemy.ext import declarative
 from sqlalchemy import orm
+from sqlalchemy import schema
 
+from coriolis import constants
 from coriolis.db.sqlalchemy import types
 
 BASE = declarative.declarative_base()
@@ -277,6 +279,116 @@ class Migration(BaseTransferAction):
         })
         return base
 
+class ServiceRegionMapping(
+        BASE, models.TimestampMixin, models.ModelBase, models.SoftDeleteMixin):
+    __tablename__ = "service_region_mapping"
+
+    id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        default=lambda: str(uuid.uuid4()),
+        nullable=False,
+        primary_key=True)
+
+    service_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('service.id'),
+        nullable=False)
+
+    region_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('region.id'),
+        nullable=False)
+
+
+class Service(BASE, models.TimestampMixin, models.ModelBase,
+              models.SoftDeleteMixin):
+    __tablename__ = "service"
+    __table_args__ = (
+        schema.UniqueConstraint("host", "topic", "deleted",
+                                name="uniq_services0host0topic0deleted"),
+        schema.UniqueConstraint("host", "binary", "deleted",
+                                name="uniq_services0host0binary0deleted"))
+
+    id = sqlalchemy.Column(
+        sqlalchemy.String(36), default=lambda: str(uuid.uuid4()),
+        primary_key=True)
+
+    host = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False)
+    binary = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False)
+    topic = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=True, default=None)
+    enabled = sqlalchemy.Column(
+        sqlalchemy.Boolean, nullable=False, default=lambda: False)
+    status = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False,
+        default=lambda: constants.SERVICE_STATUS_UNKNOWN)
+    providers = sqlalchemy.Column(types.Json(), nullable=True)
+    specs = sqlalchemy.Column(types.Json(), nullable=True)
+    mapped_regions = orm.relationship(
+        ServiceRegionMapping, backref=orm.backref('service'),
+        cascade="all,delete",
+        primaryjoin="and_(ServiceRegionMapping.service_id==Service.id, "
+                    "ServiceRegionMapping.deleted=='0')")
+
+
+class EndpointRegionMapping(
+        BASE, models.TimestampMixin, models.ModelBase, models.SoftDeleteMixin):
+    __tablename__ = "endpoint_region_mapping"
+
+    id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        default=lambda: str(uuid.uuid4()),
+        nullable=False,
+        primary_key=True)
+
+    endpoint_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('endpoint.id'),
+        nullable=False)
+
+    region_id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        sqlalchemy.ForeignKey('region.id'),
+        nullable=False)
+
+
+class Region(
+        BASE, models.TimestampMixin, models.ModelBase, models.SoftDeleteMixin):
+    __tablename__ = "region"
+
+    id = sqlalchemy.Column(
+        sqlalchemy.String(36),
+        default=lambda: str(uuid.uuid4()),
+        nullable=False,
+        primary_key=True)
+
+    name = sqlalchemy.Column(
+        sqlalchemy.String(255),
+        nullable=False)
+
+    description = sqlalchemy.Column(
+        sqlalchemy.String(1024),
+        nullable=True)
+
+    enabled = sqlalchemy.Column(
+        sqlalchemy.Boolean,
+        default=lambda: False,
+        nullable=False)
+
+    mapped_endpoints = orm.relationship(
+        EndpointRegionMapping, backref=orm.backref('region'),
+        cascade="all,delete",
+        primaryjoin="and_(EndpointRegionMapping.region_id==Region.id, "
+                    "EndpointRegionMapping.deleted=='0')")
+
+    mapped_services = orm.relationship(
+        ServiceRegionMapping, backref=orm.backref('region'),
+        cascade="all,delete",
+        primaryjoin="and_(ServiceRegionMapping.region_id==Region.id, "
+                    "ServiceRegionMapping.deleted=='0')")
+
 
 class Endpoint(BASE, models.TimestampMixin, models.ModelBase,
                models.SoftDeleteMixin):
@@ -294,11 +406,16 @@ class Endpoint(BASE, models.TimestampMixin, models.ModelBase,
     origin_actions = orm.relationship(
         BaseTransferAction, backref=orm.backref('origin_endpoint'),
         primaryjoin="and_(BaseTransferAction.origin_endpoint_id==Endpoint.id, "
-        "BaseTransferAction.deleted=='0')")
+                    "BaseTransferAction.deleted=='0')")
     destination_actions = orm.relationship(
         BaseTransferAction, backref=orm.backref('destination_endpoint'),
         primaryjoin="and_(BaseTransferAction.destination_endpoint_id=="
-        "Endpoint.id, BaseTransferAction.deleted=='0')")
+                    "Endpoint.id, BaseTransferAction.deleted=='0')")
+    mapped_regions = orm.relationship(
+        EndpointRegionMapping, backref=orm.backref('endpoint'),
+        cascade="all,delete",
+        primaryjoin="and_(EndpointRegionMapping.endpoint_id==Endpoint.id, "
+                    "EndpointRegionMapping.deleted=='0')")
 
 
 class ReplicaSchedule(BASE, models.TimestampMixin, models.ModelBase,
@@ -317,7 +434,7 @@ class ReplicaSchedule(BASE, models.TimestampMixin, models.ModelBase,
     expiration_date = sqlalchemy.Column(
         sqlalchemy.types.DateTime, nullable=True)
     enabled = sqlalchemy.Column(
-        sqlalchemy.Boolean, nullable=False, default=True)
+        sqlalchemy.Boolean, nullable=False, default=lambda: False)
     shutdown_instance = sqlalchemy.Column(
         sqlalchemy.Boolean, nullable=False, default=False)
     trust_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)

+ 3 - 2
coriolis/endpoints/api.py

@@ -10,9 +10,10 @@ class API(object):
         self._rpc_client = rpc_client.ConductorClient()
 
     def create(self, ctxt, name, endpoint_type, description,
-               connection_info):
+               connection_info, mapped_regions):
         return self._rpc_client.create_endpoint(
-            ctxt, name, endpoint_type, description, connection_info)
+            ctxt, name, endpoint_type, description, connection_info,
+            mapped_regions)
 
     def update(self, ctxt, endpoint_id, properties):
         return self._rpc_client.update_endpoint(

+ 79 - 0
coriolis/policies/regions.py

@@ -0,0 +1,79 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+
+from oslo_policy import policy
+
+from coriolis.policies import base
+
+
+REGIONS_POLICY_PREFIX = "%s:regions" % base.CORIOLIS_POLICIES_PREFIX
+REGIONS_POLICY_DEFAULT_RULE = "rule:admin_or_owner"
+
+
+def get_regions_policy_label(rule_label):
+    return "%s:%s" % (
+        REGIONS_POLICY_PREFIX, rule_label)
+
+
+REGIONS_POLICY_DEFAULT_RULES = [
+    policy.DocumentedRuleDefault(
+        get_regions_policy_label('create'),
+        REGIONS_POLICY_DEFAULT_RULE,
+        "Create a region",
+        [
+            {
+                "path": "/regions",
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_regions_policy_label('list'),
+        REGIONS_POLICY_DEFAULT_RULE,
+        "List regions",
+        [
+            {
+                "path": "/regions",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_regions_policy_label('show'),
+        REGIONS_POLICY_DEFAULT_RULE,
+        "Show details for region",
+        [
+            {
+                "path": "/regions/{region_id}",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_regions_policy_label('update'),
+        REGIONS_POLICY_DEFAULT_RULE,
+        "Update details for region",
+        [
+            {
+                "path": "/regions/{region_id}",
+                "method": "PUT"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_regions_policy_label('delete'),
+        REGIONS_POLICY_DEFAULT_RULE,
+        "Delete region",
+        [
+            {
+                "path": "/regions/{region_id}",
+                "method": "DELETE"
+            }
+        ]
+    )
+]
+
+
+def list_rules():
+    return REGIONS_POLICY_DEFAULT_RULES

+ 79 - 0
coriolis/policies/services.py

@@ -0,0 +1,79 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+
+from oslo_policy import policy
+
+from coriolis.policies import base
+
+
+SERVICES_POLICY_PREFIX = "%s:services" % base.CORIOLIS_POLICIES_PREFIX
+SERVICES_POLICY_DEFAULT_RULE = "rule:admin_or_owner"
+
+
+def get_services_policy_label(rule_label):
+    return "%s:%s" % (
+        SERVICES_POLICY_PREFIX, rule_label)
+
+
+SERVICES_POLICY_DEFAULT_RULES = [
+    policy.DocumentedRuleDefault(
+        get_services_policy_label('create'),
+        SERVICES_POLICY_DEFAULT_RULE,
+        "Create a service",
+        [
+            {
+                "path": "/services",
+                "method": "POST"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_services_policy_label('list'),
+        SERVICES_POLICY_DEFAULT_RULE,
+        "List services",
+        [
+            {
+                "path": "/services",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_services_policy_label('show'),
+        SERVICES_POLICY_DEFAULT_RULE,
+        "Show details for service",
+        [
+            {
+                "path": "/services/{service_id}",
+                "method": "GET"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_services_policy_label('update'),
+        SERVICES_POLICY_DEFAULT_RULE,
+        "Update details for service",
+        [
+            {
+                "path": "/services/{service_id}",
+                "method": "PUT"
+            }
+        ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_services_policy_label('delete'),
+        SERVICES_POLICY_DEFAULT_RULE,
+        "Delete service",
+        [
+            {
+                "path": "/services/{service_id}",
+                "method": "DELETE"
+            }
+        ]
+    )
+]
+
+
+def list_rules():
+    return SERVICES_POLICY_DEFAULT_RULES

+ 4 - 2
coriolis/policy.py

@@ -14,9 +14,11 @@ from coriolis.policies import diagnostics
 from coriolis.policies import endpoints
 from coriolis.policies import general
 from coriolis.policies import migrations
+from coriolis.policies import regions
 from coriolis.policies import replicas
 from coriolis.policies import replica_schedules
 from coriolis.policies import replica_tasks_executions
+from coriolis.policies import services
 
 
 LOG = logging.getLogger(__name__)
@@ -26,7 +28,7 @@ _ENFORCER = None
 
 DEFAULT_POLICIES_MODULES = [
     base, endpoints, general, migrations, replicas, replica_schedules,
-    replica_tasks_executions, diagnostics]
+    replica_tasks_executions, diagnostics, regions, services]
 
 
 def reset():
@@ -61,7 +63,7 @@ def check_policy_for_context(
     """ Checks the validity of the given action of the given target based on
     set policies.
     On success, returns a value where bool(val) == True.
-    On failure and if `do_raise` if False, returns False.
+    On failure and if `do_raise` is False, returns False.
     Raises `exception.PolicyNotAuthorized` or `exc` if the policy is
     not authorized.
     """

+ 0 - 0
coriolis/regions/__init__.py


+ 27 - 0
coriolis/regions/api.py

@@ -0,0 +1,27 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis import utils
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def create(self, ctxt, region_name, description, enabled=True):
+        return self._rpc_client.create_region(
+            ctxt, region_name, description=description, enabled=enabled)
+
+    def update(self, ctxt, region_id, updated_values):
+        return self._rpc_client.update_region(
+            ctxt, region_id, updated_values=updated_values)
+
+    def delete(self, ctxt, region_id):
+        self._rpc_client.delete_region(ctxt, region_id)
+
+    def get_regions(self, ctxt):
+        return self._rpc_client.get_regions(ctxt)
+
+    def get_region(self, ctxt, region_id):
+        return self._rpc_client.get_region(ctxt, region_id)

+ 0 - 0
coriolis/services/__init__.py


+ 30 - 0
coriolis/services/api.py

@@ -0,0 +1,30 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from coriolis import utils
+from coriolis.conductor.rpc import client as rpc_client
+
+
+class API(object):
+    def __init__(self):
+        self._rpc_client = rpc_client.ConductorClient()
+
+    def create(
+            self, ctxt, host, binary, topic, mapped_regions,
+            enabled, providers, specs):
+        return self._rpc_client.register_service(
+            ctxt, host, binary, topic, enabled,
+            providers, specs, mapped_regions)
+
+    def update(self, ctxt, service_id, updated_values):
+        return self._rpc_client.update_service(
+            ctxt, service_id, updated_values)
+
+    def delete(self, ctxt, region_id):
+        self._rpc_client.delete_service(ctxt, region_id)
+
+    def get_services(self, ctxt):
+        return self._rpc_client.get_services(ctxt)
+
+    def get_service(self, ctxt, service_id):
+        return self._rpc_client.get_service(ctxt, service_id)

+ 124 - 79
coriolis/tasks/factory.py

@@ -8,86 +8,131 @@ from coriolis.tasks import osmorphing_tasks
 from coriolis.tasks import replica_tasks
 
 _TASKS_MAP = {
-    constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES:
-        migration_tasks.DeployMigrationSourceResourcesTask,
-    constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES:
-        migration_tasks.DeployMigrationTargetResourcesTask,
-    constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES:
-        migration_tasks.DeleteMigrationSourceResourcesTask,
-    constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES:
-        migration_tasks.DeleteMigrationTargetResourcesTask,
-    constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES:
-        migration_tasks.DeployInstanceResourcesTask,
-    constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT:
-        migration_tasks.FinalizeInstanceDeploymentTask,
-    constants.TASK_TYPE_CREATE_INSTANCE_DISKS:
-        migration_tasks.CreateInstanceDisksTask,
-    constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT:
-        migration_tasks.CleanupFailedInstanceDeploymentTask,
-    constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE:
-        migration_tasks.CleanupInstanceTargetStorageTask,
-    constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE:
-        migration_tasks.CleanupInstanceSourceStorageTask,
-    constants.TASK_TYPE_GET_OPTIMAL_FLAVOR:
-        migration_tasks.GetOptimalFlavorTask,
-    constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS:
-        migration_tasks.ValidateMigrationSourceInputsTask,
-    constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS:
-        migration_tasks.ValidateMigrationDestinationInputsTask,
-    constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES:
-        osmorphing_tasks.DeployOSMorphingResourcesTask,
-    constants.TASK_TYPE_OS_MORPHING:
-        osmorphing_tasks.OSMorphingTask,
-    constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES:
-        osmorphing_tasks.DeleteOSMorphingResourcesTask,
-    constants.TASK_TYPE_GET_INSTANCE_INFO:
-        replica_tasks.GetInstanceInfoTask,
-    constants.TASK_TYPE_REPLICATE_DISKS:
-        replica_tasks.ReplicateDisksTask,
-    constants.TASK_TYPE_SHUTDOWN_INSTANCE:
-        replica_tasks.ShutdownInstanceTask,
-    constants.TASK_TYPE_DEPLOY_REPLICA_DISKS:
-        replica_tasks.DeployReplicaDisksTask,
-    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS:
-        replica_tasks.DeleteReplicaSourceDiskSnapshotsTask,
-    constants.TASK_TYPE_DELETE_REPLICA_DISKS:
-        replica_tasks.DeleteReplicaDisksTask,
-    constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES:
-        replica_tasks.DeployReplicaTargetResourcesTask,
-    constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES:
-        replica_tasks.DeleteReplicaTargetResourcesTask,
-    constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES:
-        replica_tasks.DeployReplicaSourceResourcesTask,
-    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES:
-        replica_tasks.DeleteReplicaSourceResourcesTask,
-    constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES:
-        replica_tasks.DeployReplicaInstanceResourcesTask,
-    constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT:
-        replica_tasks.FinalizeReplicaInstanceDeploymentTask,
-    constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT:
-        replica_tasks.CleanupFailedReplicaInstanceDeploymentTask,
-    constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS:
-        replica_tasks.CreateReplicaDiskSnapshotsTask,
-    constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS:
-        replica_tasks.DeleteReplicaTargetDiskSnapshotsTask,
-    constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
-        replica_tasks.RestoreReplicaDiskSnapshotsTask,
-    constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS:
-        replica_tasks.ValidateReplicaExecutionSourceInputsTask,
-    constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS:
-        replica_tasks.ValidateReplicaExecutionDestinationInputsTask,
-    constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS:
-        replica_tasks.ValidateReplicaDeploymentParametersTask,
-    constants.TASK_TYPE_UPDATE_SOURCE_REPLICA:
-        replica_tasks.UpdateSourceReplicaTask,
-    constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
-        replica_tasks.UpdateDestinationReplicaTask
+    constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": migration_tasks.DeployMigrationSourceResourcesTask},
+    constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.DeployMigrationTargetResourcesTask},
+    constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": migration_tasks.DeleteMigrationSourceResourcesTask},
+    constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.DeleteMigrationTargetResourcesTask},
+    constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.DeployInstanceResourcesTask},
+    constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.FinalizeInstanceDeploymentTask},
+    constants.TASK_TYPE_CREATE_INSTANCE_DISKS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.CreateInstanceDisksTask},
+    constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.CleanupFailedInstanceDeploymentTask},
+    constants.TASK_TYPE_CLEANUP_INSTANCE_TARGET_STORAGE: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.CleanupInstanceTargetStorageTask},
+    constants.TASK_TYPE_CLEANUP_INSTANCE_SOURCE_STORAGE: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": migration_tasks.CleanupInstanceSourceStorageTask},
+    constants.TASK_TYPE_GET_OPTIMAL_FLAVOR: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.GetOptimalFlavorTask},
+    constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": migration_tasks.ValidateMigrationSourceInputsTask},
+    constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": migration_tasks.ValidateMigrationDestinationInputsTask},
+    constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": osmorphing_tasks.DeployOSMorphingResourcesTask},
+    constants.TASK_TYPE_OS_MORPHING: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": osmorphing_tasks.OSMorphingTask},
+    constants.TASK_TYPE_DELETE_OS_MORPHING_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": osmorphing_tasks.DeleteOSMorphingResourcesTask},
+    constants.TASK_TYPE_GET_INSTANCE_INFO: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.GetInstanceInfoTask},
+    constants.TASK_TYPE_REPLICATE_DISKS: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.ReplicateDisksTask},
+    constants.TASK_TYPE_SHUTDOWN_INSTANCE: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.ShutdownInstanceTask},
+    constants.TASK_TYPE_DEPLOY_REPLICA_DISKS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.DeployReplicaDisksTask},
+    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.DeleteReplicaSourceDiskSnapshotsTask},
+    constants.TASK_TYPE_DELETE_REPLICA_DISKS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.DeleteReplicaDisksTask},
+    constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.DeployReplicaTargetResourcesTask},
+    constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.DeleteReplicaTargetResourcesTask},
+    constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.DeployReplicaSourceResourcesTask},
+    constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.DeleteReplicaSourceResourcesTask},
+    constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE_RESOURCES: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.DeployReplicaInstanceResourcesTask},
+    constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.FinalizeReplicaInstanceDeploymentTask},
+    constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.CleanupFailedReplicaInstanceDeploymentTask},
+    constants.TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.CreateReplicaDiskSnapshotsTask},
+    constants.TASK_TYPE_DELETE_REPLICA_TARGET_DISK_SNAPSHOTS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.DeleteReplicaTargetDiskSnapshotsTask},
+    constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.RestoreReplicaDiskSnapshotsTask},
+    constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.ValidateReplicaExecutionSourceInputsTask},
+    constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.ValidateReplicaExecutionDestinationInputsTask},
+    constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.ValidateReplicaDeploymentParametersTask},
+    constants.TASK_TYPE_UPDATE_SOURCE_REPLICA: {
+        "task_platform": constants.TASK_PLATFORM_SOURCE,
+        "task_runner_class": replica_tasks.UpdateSourceReplicaTask},
+    constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA: {
+        "task_platform": constants.TASK_PLATFORM_DESTINATION,
+        "task_runner_class": replica_tasks.UpdateDestinationReplicaTask}
 }
 
 
-def get_task_runner_class(task_type):
-    cls = _TASKS_MAP.get(task_type)
-    if not cls:
+def _get_task_class_info(task_type):
+    match = _TASKS_MAP.get(task_type)
+    if not match:
         raise exception.NotFound(
-            "TaskRunner not found for task type: %s" % task_type)
-    return cls
+            "TaskRunner info not found for task type: %s" % task_type)
+    return match
+
+
+def get_task_runner_class(task_type):
+    return _get_task_class_info(task_type)["task_runner_class"]
+
+
+def get_task_platform(task_type):
+    return _get_task_class_info(task_type)["task_platform"]

+ 7 - 2
coriolis/utils.py

@@ -16,6 +16,7 @@ import re
 import socket
 import string
 import subprocess
+import sys
 import time
 import traceback
 import uuid
@@ -122,10 +123,10 @@ def get_diagnostics_info():
     # diagnostics.
     packages = list(freeze.freeze())
     return {
-        "application": os.path.basename(main.__file__),
+        "binary": get_binary_name(),
         "packages": packages,
         "os_info": _get_host_os_info(),
-        "hostname": platform.node(),
+        "hostname": get_hostname(),
         "ip_addresses": _get_local_ips(),
     }
 
@@ -428,6 +429,10 @@ def get_hostname():
     return socket.gethostname()
 
 
+def get_binary_name():
+    return os.path.basename(sys.argv[0])
+
+
 def get_exception_details():
     return traceback.format_exc()
 

+ 5 - 5
coriolis/worker/rpc/client.py

@@ -19,8 +19,8 @@ CONF.register_opts(worker_opts, 'worker')
 
 
 class WorkerClient(object):
-    def __init__(self, timeout=None):
-        target = messaging.Target(topic='coriolis_worker', version=VERSION)
+    def __init__(self, timeout=None, topic='coriolis_worker'):
+        target = messaging.Target(topic=topic, version=VERSION)
         if timeout is None:
             timeout = CONF.worker.worker_rpc_timeout
         self._client = rpc.get_client(target, timeout=timeout)
@@ -39,9 +39,6 @@ class WorkerClient(object):
         cctxt.call(ctxt, 'cancel_task', task_id=task_id, process_id=process_id,
                    force=force)
 
-    def update_migration_status(self, ctxt, task_id, status):
-        self._client.call(ctxt, "update_migration_status", status=status)
-
     def get_endpoint_instances(self, ctxt, platform_name, connection_info,
                                source_environment, marker=None, limit=None,
                                instance_name_pattern=None):
@@ -128,3 +125,6 @@ class WorkerClient(object):
 
     def get_diagnostics(self, ctxt):
         return self._client.call(ctxt, 'get_diagnostics')
+
+    def get_service_status(self, ctxt):
+        return self._client.call(ctxt, 'get_service_status')

+ 16 - 0
coriolis/worker/rpc/server.py

@@ -73,6 +73,22 @@ class WorkerServerEndpoint(object):
             # Ignore the exception
             LOG.exception(ex)
 
+    def get_service_status(self, ctxt):
+        diagnostics = self.get_diagnostics(ctxt)
+        status = {
+            "host": diagnostics["hostname"],
+            "binary": diagnostics["binary"],
+            "providers": self.get_available_providers(ctxt),
+            "specs": diagnostics
+        }
+        # TODO(aznashwan): have the topic be updated automatically in
+        # oslo_messaging.Service instead of relying on "hardcoding" it:
+        status["topic"] = constants.SERVICE_MESSAGING_TOPIC_FORMAT % {
+            "host": status["host"],
+            "binary": diagnostics["application"]}
+
+        return status
+
     def cancel_task(self, ctxt, task_id, process_id, force):
         if not force and os.name == "nt":
             LOG.warn("Windows does not support SIGINT, performing a "