Ver código fonte

Merge pull request #16 from aznashwan/replica-update-fixes

Fix Replica updating to retain all existing info.
Nashwan Azhari 7 anos atrás
pai
commit
f2dc599636

+ 38 - 29
coriolis/api/v1/replicas.py

@@ -156,20 +156,38 @@ class ReplicaController(api_wsgi.Controller):
         return storage_mappings
 
     def _get_merged_replica_values(self, replica, updated_values):
+        """ Looks for the following keys in the original replica body and
+        updated values (preferring the updated values where needed, but using
+        `.update()` on dicts):
+        "source_environment", "destination_environment", "network_map", "notes"
+        Does special merging for the "storage_mappings"
+        Returns a dict with the merged values (or at least all if the keys
+        having a default value of {})
+        """
         final_values = {}
         # NOTE: this just replaces options at the top-level and does not do
         # merging of container types (ex: lists, dicts)
         for option in [
                 "source_environment", "destination_environment",
                 "network_map"]:
-            original_values = replica.get(option, {})
-
-            original_values.update(updated_values.get(
-                option, {}))
-            final_values[option] = original_values
-
-        original_storage_mappings = replica.get('storage_mappings', {})
-        new_storage_mappings = updated_values.get('storage_mappings', {})
+            before = replica.get(option)
+            after = updated_values.get(option)
+            # NOTE: for Replicas created before the separation of these fields
+            # in the DB there is the chance that some of these may be NULL:
+            if before is None:
+                before = {}
+            if after is None:
+                after = {}
+            before.update(after)
+
+            final_values[option] = before
+
+        original_storage_mappings = replica.get('storage_mappings')
+        if original_storage_mappings is None:
+            original_storage_mappings = {}
+        new_storage_mappings = updated_values.get('storage_mappings')
+        if new_storage_mappings is None:
+            new_storage_mappings = {}
         final_values['storage_mappings'] = self._update_storage_mappings(
             original_storage_mappings, new_storage_mappings)
 
@@ -178,20 +196,16 @@ class ReplicaController(api_wsgi.Controller):
         else:
             final_values['notes'] = replica.get('notes', '')
 
-        # until the provider plugin interface is updated
+        # NOTE: until the provider plugin interface is updated
         # to have separate 'network_map' and 'storage_mappings' fields,
         # we add them as part of the destination environment:
         final_storage_mappings = final_values['storage_mappings']
         final_network_map = final_values['network_map']
-        if not final_values.get('destination_environment'):
-            final_values['destination_environment'] = {}
         if final_storage_mappings:
-            final_values[
-                'destination_environment'][
+            final_values['destination_environment'][
                 'storage_mappings'] = final_storage_mappings
         if final_network_map:
-            final_values[
-                'destination_environment'][
+            final_values['destination_environment'][
                 'network_map'] = final_network_map
 
         return final_values
@@ -206,30 +220,25 @@ class ReplicaController(api_wsgi.Controller):
             origin_endpoint_id = replica["origin_endpoint_id"]
             destination_endpoint_id = replica["destination_endpoint_id"]
 
-            source_environment = merged_body.get("source_environment", {})
             self._endpoints_api.validate_source_environment(
-                context, origin_endpoint_id, source_environment)
-
-            network_map = merged_body.get("network_map", {})
-            api_utils.validate_network_map(network_map)
+                context, origin_endpoint_id,
+                merged_body["source_environment"])
 
-            destination_environment = merged_body.get(
-                "destination_environment", {})
+            destination_environment = merged_body["destination_environment"]
             self._endpoints_api.validate_target_environment(
                 context, destination_endpoint_id, destination_environment)
 
-            storage_mappings = merged_body.get("storage_mappings", {})
-            api_utils.validate_storage_mappings(storage_mappings)
+            api_utils.validate_network_map(merged_body["network_map"])
+
+            api_utils.validate_storage_mappings(
+                merged_body["storage_mappings"])
 
             return merged_body
 
         except Exception as ex:
             LOG.exception(ex)
-            if hasattr(ex, "message"):
-                msg = ex.message
-            else:
-                msg = str(ex)
-            raise exception.InvalidInput(msg)
+            raise exception.InvalidInput(
+                getattr(ex, "message", str(ex)))
 
     def update(self, req, id, body):
         context = req.environ["coriolis.context"]

+ 38 - 13
coriolis/conductor/rpc/server.py

@@ -1,6 +1,7 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import copy
 import functools
 import uuid
 
@@ -803,8 +804,15 @@ class ConductorServerEndpoint(object):
                             instance=task.instance,
                             task_info=task_info)
 
-    def _update_replica_volumes_info(self, ctxt, migration_id, instance,
+    def _update_replica_volumes_info(self, ctxt, replica_id, instance,
                                      updated_task_info):
+        """ WARN: the lock for the Replica must be pre-acquired. """
+        db_api.set_transfer_action_info(
+            ctxt, replica_id, instance,
+            updated_task_info)
+
+    def _update_volumes_info_for_migration_parent_replica(
+            self, ctxt, migration_id, instance, updated_task_info):
         migration = db_api.get_migration(ctxt, migration_id)
         replica_id = migration.replica_id
 
@@ -812,13 +820,11 @@ class ConductorServerEndpoint(object):
             LOG.debug(
                 "Updating volume_info in replica due to snapshot "
                 "restore during migration. replica id: %s", replica_id)
-            db_api.set_transfer_action_info(
-                ctxt, replica_id, instance,
-                updated_task_info)
+            self._update_replica_volumes_info(
+                ctxt, replica_id, instance, updated_task_info)
 
     def _handle_post_task_actions(self, ctxt, task, execution, task_info):
         task_type = task.task_type
-        updated_task_info = None
 
         if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
             # When restoring a snapshot in some import providers (OpenStack),
@@ -826,14 +832,20 @@ class ConductorServerEndpoint(object):
             # Replica instance as well.
             volumes_info = task_info.get("volumes_info")
             if volumes_info:
-                updated_task_info = {"volumes_info": volumes_info}
+                self._update_volumes_info_for_migration_parent_replica(
+                    ctxt, execution.action_id, task.instance,
+                    {"volumes_info": volumes_info})
+
         elif task_type == constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS:
 
             if not task_info.get("clone_disks"):
                 # The migration completed. If the replica is executed again,
                 # new volumes need to be deployed in place of the migrated
                 # ones.
-                updated_task_info = {"volumes_info": None}
+                self._update_volumes_info_for_migration_parent_replica(
+                    ctxt, execution.action_id, task.instance,
+                    {"volumes_info": None})
+
         elif task_type in (
                 constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE,
                 constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT):
@@ -862,13 +874,16 @@ class ConductorServerEndpoint(object):
                     "No 'transfer_result' was returned for task type '%s' "
                     "for transfer action '%s'", task_type, execution.action_id)
         elif task_type == constants.TASK_TYPE_UPDATE_REPLICA:
-            # NOTE: perform the actual db update
+            # NOTE: perform the actual db update on the Replica's properties:
             db_api.update_replica(ctxt, execution.action_id, task_info)
-
-        if updated_task_info:
+            # NOTE: remember to update the `volumes_info`:
+            # NOTE: considering this method is only called with a lock on the
+            # `execution.action_id` (in a Replica update tasks' case that's the
+            # ID of the Replica itself) we can safely call
+            # `_update_replica_volumes_info` below:
             self._update_replica_volumes_info(
                 ctxt, execution.action_id, task.instance,
-                updated_task_info)
+                {"volumes_info": task_info.get("volumes_info")})
 
     @task_synchronized
     def task_completed(self, ctxt, task_id, task_info):
@@ -1005,7 +1020,7 @@ class ConductorServerEndpoint(object):
 
     @replica_synchronized
     def update_replica(
-        self, ctxt, replica_id, properties):
+            self, ctxt, replica_id, properties):
         replica = self._get_replica(ctxt, replica_id)
         self._check_replica_running_executions(ctxt, replica)
         execution = models.TasksExecution()
@@ -1013,11 +1028,21 @@ class ConductorServerEndpoint(object):
         execution.status = constants.EXECUTION_STATUS_RUNNING
         execution.action = replica
 
+        LOG.debug(
+            "Replica '%s' info pre-replica-update: %s",
+            replica_id, replica.info)
         for instance in execution.action.instances:
-            replica.info[instance] = properties
+            # NOTE: "circular assignment" would lead to a `None` value
+            # so we must operate on a copy:
+            inst_info_copy = copy.deepcopy(replica.info[instance])
+            inst_info_copy.update(properties)
+            replica.info[instance] = inst_info_copy
             self._create_task(
                 instance, constants.TASK_TYPE_UPDATE_REPLICA,
                 execution)
+        LOG.debug(
+            "Replica '%s' info post-replica-update: %s",
+            replica_id, replica.info)
         db_api.add_replica_tasks_execution(ctxt, execution)
         LOG.debug("Execution for Replica update tasks created: %s",
                   execution.id)

+ 7 - 1
coriolis/tasks/replica_tasks.py

@@ -467,15 +467,21 @@ class UpdateReplicaTask(base.TaskRunner):
         volumes_info = task_info.get("volumes_info", {})
 
         if source_provider:
+            LOG.info("Checking source provider environment params")
+            # NOTE: the `source_environment` in the `origin` is the one set
+            # in the dedicated DB column of the Replica and thus stores
+            # the previous value of it:
             old_source_environment = origin.get('source_environment', {})
             new_source_environment = task_info.get('source_environment', {})
-            LOG.info("Checking source provider environment params")
             source_provider.check_update_environment_params(
                 ctxt, connection_info, export_info, volumes_info,
                 old_source_environment, new_source_environment)
 
         if destination_provider:
             LOG.info("Checking destination provider environment params")
+            # NOTE: the `target_environment` in the `destination` is the one
+            # set in the dedicated DB column of the Replica and thus stores
+            # the previous value of it:
             old_destination_environment = destination.get(
                 'target_environment', {})
             new_destination_environment = task_info.get(