Przeglądaj źródła

Merge pull request #11 from atoaca/update-replica

Add support for updating replicas.
Nashwan Azhari 7 lat temu
rodzic
commit
beff2d516c

+ 151 - 0
coriolis/api/v1/replicas.py

@@ -7,6 +7,7 @@ from webob import exc
 from coriolis import exception
 from coriolis.api.v1 import utils as api_utils
 from coriolis.api.v1.views import replica_view
+from coriolis.api.v1.views import replica_tasks_execution_view
 from coriolis.api import wsgi as api_wsgi
 from coriolis.endpoints import api as endpoints_api
 from coriolis.policies import replicas as replica_policies
@@ -109,6 +110,156 @@ class ReplicaController(api_wsgi.Controller):
         except exception.NotFound as ex:
             raise exc.HTTPNotFound(explanation=ex.msg)
 
+    @staticmethod
+    def _update_storage_mappings(original_storage_mappings,
+                                 new_storage_mappings):
+
+        backend_mappings = original_storage_mappings.get(
+            'backend_mappings', [])
+        new_backend_mappings = new_storage_mappings.get('backend_mappings', [])
+        new_backend_mapping_sources = [mapping['source'] for mapping in
+                                       new_backend_mappings]
+
+        disk_mappings = original_storage_mappings.get('disk_mappings', [])
+        new_disk_mappings = new_storage_mappings.get('disk_mappings', [])
+        new_disk_mappings_disk_ids = [mapping['disk_id'] for mapping in
+                                      new_disk_mappings]
+
+        non_duplicates_backend_mapping = []
+        for mapping in backend_mappings:
+            if mapping['source'] not in new_backend_mapping_sources:
+                non_duplicates_backend_mapping.append(mapping)
+            else:
+                LOG.info("Storage Backend Mapping %s will be overwritten." %
+                         mapping)
+
+        non_duplicates_disk_mappings = []
+        for mapping in disk_mappings:
+            if mapping['disk_id'] not in new_disk_mappings_disk_ids:
+                non_duplicates_disk_mappings.append(mapping)
+            else:
+                LOG.info("Storage Disk Mapping %s will be overwritten" %
+                         mapping)
+
+        non_duplicates_backend_mapping.extend(new_backend_mappings)
+        non_duplicates_disk_mappings.extend(new_disk_mappings)
+        storage_mappings = {
+            'backend_mappings': non_duplicates_backend_mapping,
+            'disk_mappings': non_duplicates_disk_mappings}
+
+        default_storage_backend = (
+            new_storage_mappings.get('default', None) or
+            original_storage_mappings.get('default', None))
+        if default_storage_backend:
+            storage_mappings['default'] = default_storage_backend
+
+        return storage_mappings
+
+    def _get_merged_replica_values(self, replica, updated_values):
+        final_values = {}
+        # NOTE: this just replaces options at the top-level and does not do
+        # merging of container types (ex: lists, dicts)
+        for option in [
+                "source_environment", "destination_environment",
+                "network_map"]:
+            original_values = replica.get(option, {})
+
+            original_values.update(updated_values.get(
+                option, {}))
+            final_values[option] = original_values
+
+        original_storage_mappings = replica.get('storage_mappings', {})
+        new_storage_mappings = updated_values.get('storage_mappings', {})
+        final_values['storage_mappings'] = self._update_storage_mappings(
+            original_storage_mappings, new_storage_mappings)
+
+        if 'notes' in updated_values:
+            final_values['notes'] = updated_values.get('notes', '')
+        else:
+            final_values['notes'] = replica.get('notes', '')
+
+        # until the provider plugin interface is updated
+        # to have separate 'network_map' and 'storage_mappings' fields,
+        # we add them as part of the destination environment:
+        final_storage_mappings = final_values['storage_mappings']
+        final_network_map = final_values['network_map']
+        if not final_values.get('destination_environment'):
+            final_values['destination_environment'] = {}
+        if final_storage_mappings:
+            final_values[
+                'destination_environment'][
+                'storage_mappings'] = final_storage_mappings
+        if final_network_map:
+            final_values[
+                'destination_environment'][
+                'network_map'] = final_network_map
+
+        return final_values
+
+    def _validate_update_body(self, id, context, body):
+
+        replica = self._replica_api.get_replica(context, id)
+        try:
+            merged_body = self._get_merged_replica_values(
+                replica, body['replica'])
+
+            origin_endpoint_id = replica["origin_endpoint_id"]
+            destination_endpoint_id = replica["destination_endpoint_id"]
+
+            source_environment = merged_body.get("source_environment", {})
+            self._endpoints_api.validate_source_environment(
+                context, origin_endpoint_id, source_environment)
+
+            network_map = merged_body.get("network_map", {})
+            api_utils.validate_network_map(network_map)
+
+            destination_environment = merged_body.get(
+                "destination_environment", {})
+            self._endpoints_api.validate_target_environment(
+                context, destination_endpoint_id, destination_environment)
+
+            storage_mappings = merged_body.get("storage_mappings", {})
+            api_utils.validate_storage_mappings(storage_mappings)
+
+            return merged_body
+
+        except Exception as ex:
+            LOG.exception(ex)
+            if hasattr(ex, "message"):
+                msg = ex.message
+            else:
+                msg = str(ex)
+            raise exception.InvalidInput(msg)
+
+    def update(self, req, id, body):
+        context = req.environ["coriolis.context"]
+        context.can(replica_policies.get_replicas_policy_label("update"))
+        origin_endpoint_id = body['replica'].get('origin_endpoint_id', None)
+        destination_endpoint_id = body['replica'].get(
+            'destination_endpoint_id', None)
+        instances = body['replica'].get('instances', None)
+        if origin_endpoint_id or destination_endpoint_id:
+            raise exc.HTTPBadRequest(
+                explanation="The source or destination endpoints for a "
+                            "Coriolis Replica cannot be updated after its "
+                            "creation. If the credentials of any of the "
+                            "Replica's endpoints need updating, please update "
+                            "the endpoints themselves.")
+        if instances:
+            raise exc.HTTPBadRequest(
+                explanation="The list of instances of a Replica cannot be "
+                            "updated")
+
+        updated_values = self._validate_update_body(id, context, body)
+        try:
+            return replica_tasks_execution_view.single(
+                req, self._replica_api.update(req.environ['coriolis.context'],
+                                              id, updated_values))
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
 
 def create_resource():
     return api_wsgi.Resource(ReplicaController())

+ 6 - 0
coriolis/conductor/rpc/client.py

@@ -272,3 +272,9 @@ class ConductorClient(object):
             replica_id=replica_id,
             schedule_id=schedule_id,
             expired=expired)
+
+    def update_replica(self, ctxt, replica_id, properties):
+        return self._client.call(
+            ctxt, 'update_replica',
+            replica_id=replica_id,
+            properties=properties)

+ 24 - 0
coriolis/conductor/rpc/server.py

@@ -861,6 +861,9 @@ class ConductorServerEndpoint(object):
                 LOG.debug(
                     "No 'transfer_result' was returned for task type '%s' "
                     "for transfer action '%s'", task_type, execution.action_id)
+        elif task_type == constants.TASK_TYPE_UPDATE_REPLICA:
+            # NOTE: perform the actual db update
+            db_api.update_replica(ctxt, execution.action_id, task_info)
 
         if updated_task_info:
             self._update_replica_volumes_info(
@@ -999,3 +1002,24 @@ class ConductorServerEndpoint(object):
         if not schedule:
             raise exception.NotFound("Schedule not found")
         return schedule
+
+    @replica_synchronized
+    def update_replica(
+        self, ctxt, replica_id, properties):
+        replica = self._get_replica(ctxt, replica_id)
+        self._check_replica_running_executions(ctxt, replica)
+        execution = models.TasksExecution()
+        execution.id = str(uuid.uuid4())
+        execution.status = constants.EXECUTION_STATUS_RUNNING
+        execution.action = replica
+
+        for instance in execution.action.instances:
+            replica.info[instance] = properties
+            self._create_task(
+                instance, constants.TASK_TYPE_UPDATE_REPLICA,
+                execution)
+        db_api.add_replica_tasks_execution(ctxt, execution)
+        LOG.debug("Execution for Replica update tasks created: %s",
+                  execution.id)
+        self._begin_tasks(ctxt, execution, replica.info)
+        return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)

+ 2 - 0
coriolis/constants.py

@@ -49,6 +49,7 @@ TASK_TYPE_VALIDATE_MIGRATION_INPUTS = "VALIDATE_MIGRATION_INPUTS"
 TASK_TYPE_VALIDATE_REPLICA_INPUTS = "VALIDATE_REPLICA_INPUTS"
 TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = (
     "VALIDATE_REPLICA_DEPLOYMENT_INPUTS")
+TASK_TYPE_UPDATE_REPLICA = "UPDATE_REPLICA"
 
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2
@@ -66,6 +67,7 @@ PROVIDER_TYPE_VALIDATE_REPLICA_EXPORT = 4096
 PROVIDER_TYPE_VALIDATE_MIGRATION_IMPORT = 8192
 PROVIDER_TYPE_VALIDATE_REPLICA_IMPORT = 16384
 PROVIDER_TYPE_ENDPOINT_STORAGE = 32768
+PROVIDER_TYPE_REPLICA_UPDATE = 65536
 
 DISK_FORMAT_VMDK = 'vmdk'
 DISK_FORMAT_RAW = 'raw'

+ 15 - 0
coriolis/db/api.py

@@ -543,3 +543,18 @@ def add_task_progress_update(context, task_id, current_step, total_steps,
     task_progress_update.current_step = current_step
     task_progress_update.total_steps = total_steps
     task_progress_update.message = message
+
+
+@enginefacade.writer
+def update_replica(context, replica_id, updated_values):
+    replica = get_replica(context, replica_id)
+    if not replica:
+        raise exception.NotFound("Replica not found")
+    for n in ["source_environment", "destination_environment", "notes",
+              "network_map", "storage_mappings"]:
+        if n in updated_values:
+            setattr(replica, n, updated_values[n])
+
+    # the oslo_db library uses this method for both the `created_at` and
+    # `updated_at` fields
+    setattr(replica, 'updated_at', timeutils.utcnow())

+ 12 - 0
coriolis/policies/replicas.py

@@ -83,7 +83,19 @@ REPLICAS_POLICY_DEFAULT_RULES = [
                 "method": "DELETE"
             }
         ]
+    ),
+    policy.DocumentedRuleDefault(
+        get_replicas_policy_label('update'),
+        REPLICAS_POLICY_DEFAULT_RULE,
+        "Update Replica",
+        [
+            {
+                "path": "/replicas/{replica_id}",
+                "method": "POST"
+            }
+        ]
     )
+
 ]
 
 

+ 24 - 0
coriolis/providers/base.py

@@ -387,3 +387,27 @@ class BaseEndpointStorageProvider(object, with_metaclass(abc.ABCMeta)):
     def get_storage(self, ctxt, connection_info):
         """ Returns all the storage options available"""
         pass
+
+
+class BaseReplicaUpdateableProvider(object, with_metaclass(abc.ABCMeta)):
+    """ Class for replica providers (export and import) which offer the
+    functionality of updating the parameters for a replica.
+    """
+    @abc.abstractmethod
+    def check_update_environment_params(
+            self, ctxt, connection_info, export_info, volumes_info, old_params,
+            new_params):
+        """ Checks that any existing replica resources for the VM given by its
+        `export_info` which were replicated using the `old_params` is
+        compatible with the `new_params`. Depending on whether it's an Import
+        or Export provider, the `old_params` and `new_params` refer to either
+        the `destination_environment` or `source_environment` replica fields.
+        This method should raise and error given incompatible `new_params` and
+        also perform the necessary update procedures if they are compatible
+
+        return on success: updated `volumes_info`
+
+        NOTE: if there is no `volumes_info` present due to the replica never
+        having been executed or the replica disks having been deleted, this
+        method should simply return the empty `volumes_info` it was given.
+        """

+ 3 - 1
coriolis/providers/factory.py

@@ -41,7 +41,9 @@ PROVIDER_TYPE_MAP = {
     constants.PROVIDER_TYPE_VALIDATE_MIGRATION_IMPORT: (
         base.BaseMigrationImportValidationProvider),
     constants.PROVIDER_TYPE_VALIDATE_REPLICA_IMPORT: (
-        base.BaseReplicaImportValidationProvider)
+        base.BaseReplicaImportValidationProvider),
+    constants.PROVIDER_TYPE_REPLICA_UPDATE: (
+        base.BaseReplicaUpdateableProvider)
 }
 
 

+ 22 - 0
coriolis/providers/provider_utils.py

@@ -107,3 +107,25 @@ def get_storage_mapping_for_disk(
         disk_info, mapped_backend)
 
     return mapped_backend
+
+
+def check_changed_storage_mappings(volumes_info, old_storage_mappings,
+                                   new_storage_mappings):
+        old_backend_mappings = old_storage_mappings.get('backend_mappings', [])
+        old_disk_mappings = old_storage_mappings.get('disk_mappings', [])
+        new_backend_mappings = new_storage_mappings.get('backend_mappings', [])
+        new_disk_mappings = new_storage_mappings.get('disk_mappings', [])
+
+        old_backend_mappings_set = [
+            tuple(mapping.values()) for mapping in old_backend_mappings]
+        old_disk_mappings_set = [
+            tuple(mapping.values()) for mapping in old_disk_mappings]
+        new_backend_mappings_set = [
+            tuple(mapping.values()) for mapping in new_backend_mappings]
+        new_disk_mappings_set = [
+            tuple(mapping.values()) for mapping in new_disk_mappings]
+
+        if (old_backend_mappings_set != new_backend_mappings_set or
+                old_disk_mappings_set != new_disk_mappings_set):
+            raise exception.CoriolisException("Modifying storage mappings is "
+                                              "not supported.")

+ 4 - 0
coriolis/replicas/api.py

@@ -16,6 +16,10 @@ class API(object):
             source_environment, destination_environment, instances,
             network_map, storage_mappings, notes)
 
+    def update(self, ctxt, replica_id, properties):
+        return self._rpc_client.update_replica(
+            ctxt, replica_id, properties)
+
     def delete(self, ctxt, replica_id):
         self._rpc_client.delete_replica(ctxt, replica_id)
 

+ 3 - 1
coriolis/tasks/factory.py

@@ -65,7 +65,9 @@ _TASKS_MAP = {
     constants.TASK_TYPE_VALIDATE_REPLICA_INPUTS:
         replica_tasks.ValidateReplicaExecutionParametersTask,
     constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS:
-        replica_tasks.ValidateReplicaDeploymentParametersTask
+        replica_tasks.ValidateReplicaDeploymentParametersTask,
+    constants.TASK_TYPE_UPDATE_REPLICA:
+        replica_tasks.UpdateReplicaTask
 }
 
 

+ 55 - 0
coriolis/tasks/replica_tasks.py

@@ -434,3 +434,58 @@ class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
             ctxt, destination_connection_info, target_environment, export_info)
 
         return task_info
+
+
+class UpdateReplicaTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        destination_provider = None
+        source_provider = None
+        new_source_environment = task_info.get('source_environment')
+        new_destination_environment = task_info.get('destination_environment')
+
+        if new_source_environment:
+            source_provider = providers_factory.get_provider(
+                origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
+                event_handler, raise_if_not_found=False)
+            if not source_provider:
+                raise exception.CoriolisException(
+                    "Replica source provider plugin for '%s' does not support"
+                    " updating Replicas." % origin["type"])
+
+        if new_destination_environment:
+            destination_provider = providers_factory.get_provider(
+                destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
+                event_handler)
+            if not destination_provider:
+                raise exception.CoriolisException(
+                    "Replica destination provider plugin for '%s' does not "
+                    "support updating Replicas." % origin["type"])
+
+        connection_info = base.get_connection_info(ctxt, destination)
+        export_info = task_info.get("export_info", {})
+        volumes_info = task_info.get("volumes_info", {})
+
+        if source_provider:
+            old_source_environment = origin.get('source_environment', {})
+            new_source_environment = task_info.get('source_environment', {})
+            LOG.info("Checking source provider environment params")
+            source_provider.check_update_environment_params(
+                ctxt, connection_info, export_info, volumes_info,
+                old_source_environment, new_source_environment)
+
+        if destination_provider:
+            LOG.info("Checking destination provider environment params")
+            old_destination_environment = destination.get(
+                'target_environment', {})
+            new_destination_environment = task_info.get(
+                'destination_environment', {})
+
+            volumes_info = (
+                destination_provider.check_update_environment_params(
+                    ctxt, connection_info, export_info, volumes_info,
+                    old_destination_environment, new_destination_environment))
+
+            task_info['volumes_info'] = volumes_info
+
+        return task_info