Ver Fonte

Fix updating Replica values.

Nashwan Azhari há 6 anos atrás
pai
commit
9c92b287e1
3 ficheiros alterados com 115 adições e 30 exclusões
  1. 66 10
      coriolis/conductor/rpc/server.py
  2. 25 4
      coriolis/db/api.py
  3. 24 16
      coriolis/tasks/replica_tasks.py

+ 66 - 10
coriolis/conductor/rpc/server.py

@@ -532,6 +532,14 @@ class ConductorServerEndpoint(object):
         execution.action = replica
         execution.action = replica
         execution.type = constants.EXECUTION_TYPE_REPLICA_EXECUTION
         execution.type = constants.EXECUTION_TYPE_REPLICA_EXECUTION
 
 
+        # TODO(aznashwan): have these passed separately to the relevant
+        # provider methods. They're currently passed directly inside
+        # dest-env by the API service when accepting the call, but we
+        # re-overwrite them here in case of Replica updates.
+        dest_env = copy.deepcopy(replica.destination_environment)
+        dest_env['network_map'] = replica.network_map
+        dest_env['storage_mappings'] = replica.storage_mappings
+
         for instance in execution.action.instances:
         for instance in execution.action.instances:
             # NOTE: we default/convert the volumes info to an empty list
             # NOTE: we default/convert the volumes info to an empty list
             # to preserve backwards-compatibility with older versions
             # to preserve backwards-compatibility with older versions
@@ -544,7 +552,7 @@ class ConductorServerEndpoint(object):
             # execution to ensure that the latest parameters are used:
             # execution to ensure that the latest parameters are used:
             replica.info[instance].update({
             replica.info[instance].update({
                 "source_environment": replica.source_environment,
                 "source_environment": replica.source_environment,
-                "target_environment": replica.destination_environment})
+                "target_environment": dest_env})
                 # TODO(aznashwan): have these passed separately to the relevant
                 # TODO(aznashwan): have these passed separately to the relevant
                 # provider methods (they're currently passed directly inside
                 # provider methods (they're currently passed directly inside
                 # dest-env by the API service when accepting the call)
                 # dest-env by the API service when accepting the call)
@@ -849,7 +857,12 @@ class ConductorServerEndpoint(object):
         migration.id = str(uuid.uuid4())
         migration.id = str(uuid.uuid4())
         migration.origin_endpoint_id = replica.origin_endpoint_id
         migration.origin_endpoint_id = replica.origin_endpoint_id
         migration.destination_endpoint_id = replica.destination_endpoint_id
         migration.destination_endpoint_id = replica.destination_endpoint_id
-        migration.destination_environment = replica.destination_environment
+        # TODO(aznashwan): have these passed separately to the relevant
+        # provider methods instead of through the dest-env:
+        dest_env = copy.deepcopy(replica.destination_environment)
+        dest_env['network_map'] = replica.network_map
+        dest_env['storage_mappings'] = replica.storage_mappings
+        migration.destination_environment = dest_env
         migration.source_environment = replica.source_environment
         migration.source_environment = replica.source_environment
         migration.network_map = replica.network_map
         migration.network_map = replica.network_map
         migration.storage_mappings = replica.storage_mappings
         migration.storage_mappings = replica.storage_mappings
@@ -857,11 +870,6 @@ class ConductorServerEndpoint(object):
         migration.replica = replica
         migration.replica = replica
         migration.info = replica.info
         migration.info = replica.info
 
 
-        for instance in instances:
-            migration.info[instance]["clone_disks"] = clone_disks
-            scripts = self._get_instance_scripts(user_scripts, instance)
-            migration.info[instance]["user_scripts"] = scripts
-
         execution = models.TasksExecution()
         execution = models.TasksExecution()
         migration.executions = [execution]
         migration.executions = [execution]
         execution.status = constants.EXECUTION_STATUS_RUNNING
         execution.status = constants.EXECUTION_STATUS_RUNNING
@@ -869,6 +877,28 @@ class ConductorServerEndpoint(object):
         execution.type = constants.EXECUTION_TYPE_REPLICA_DEPLOY
         execution.type = constants.EXECUTION_TYPE_REPLICA_DEPLOY
 
 
         for instance in instances:
         for instance in instances:
+            migration.info[instance]["clone_disks"] = clone_disks
+            scripts = self._get_instance_scripts(user_scripts, instance)
+            migration.info[instance]["user_scripts"] = scripts
+
+            # NOTE: we default/convert the volumes info to an empty list
+            # to preserve backwards-compatibility with older versions
+            # of Coriolis dating before the scheduling overhaul (PR##114)
+            if instance not in migration.info:
+                migration.info[instance] = {'volumes_info': []}
+            # NOTE: we update all of the param values before triggering an
+            # execution to ensure that the params on the Replica are used
+            # in case there was a failed Replica update (where the new values
+            # could be in the `.info` field instead of the old ones)
+            migration.info[instance].update({
+                "source_environment": migration.source_environment,
+                "target_environment": dest_env})
+                # TODO(aznashwan): have these passed separately to the relevant
+                # provider methods (they're currently passed directly inside
+                # dest-env by the API service when accepting the call)
+                # "network_map": network_map,
+                # "storage_mappings": storage_mappings,
+
             validate_replica_deployment_inputs_task = self._create_task(
             validate_replica_deployment_inputs_task = self._create_task(
                 instance,
                 instance,
                 constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
                 constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS,
@@ -1805,6 +1835,14 @@ class ConductorServerEndpoint(object):
                     # it means this was the last update task in the Execution
                     # it means this was the last update task in the Execution
                     # and we may safely update the params of the Replica
                     # and we may safely update the params of the Replica
                     # as they are in the DB:
                     # as they are in the DB:
+                    LOG.info(
+                        "All tasks of the '%s' Replica update procedure have "
+                        "completed successfully.  Setting the updated parameter "
+                        "values on the parent Replica itself.",
+                        execution.action_id)
+                    # NOTE: considering all the instances of the Replica get
+                    # the same params, it doesn't matter which instance's
+                    # update task finishes last:
                     db_api.update_replica(
                     db_api.update_replica(
                         ctxt, execution.action_id, task_info)
                         ctxt, execution.action_id, task_info)
         else:
         else:
@@ -2130,24 +2168,34 @@ class ConductorServerEndpoint(object):
         execution.action = replica
         execution.action = replica
         execution.type = constants.EXECUTION_TYPE_REPLICA_UPDATE
         execution.type = constants.EXECUTION_TYPE_REPLICA_UPDATE
 
 
-        for instance in execution.action.instances:
+        for instance in replica.instances:
             LOG.debug(
             LOG.debug(
                 "Pre-replica-update task_info for instance '%s' of Replica "
                 "Pre-replica-update task_info for instance '%s' of Replica "
                 "'%s': %s", instance, replica_id,
                 "'%s': %s", instance, replica_id,
                 utils.filter_chunking_info_for_task(
                 utils.filter_chunking_info_for_task(
                     replica.info[instance]))
                     replica.info[instance]))
+
             # NOTE: "circular assignment" would lead to a `None` value
             # NOTE: "circular assignment" would lead to a `None` value
             # so we must operate on a copy:
             # so we must operate on a copy:
             inst_info_copy = copy.deepcopy(replica.info[instance])
             inst_info_copy = copy.deepcopy(replica.info[instance])
+
             # NOTE: we update the various values in the task info itself
             # NOTE: we update the various values in the task info itself
             # As a result, the values within the task_info will be the updated
             # As a result, the values within the task_info will be the updated
-            # values which will be checked. The old values will be send to the
+            # values which will be checked. The old values will be sent to the
             # tasks through the origin/destination parameters for them to be
             # tasks through the origin/destination parameters for them to be
             # compared to the new ones.
             # compared to the new ones.
             # The actual values on the Replica object itself will be set
             # The actual values on the Replica object itself will be set
             # during _handle_post_task_actions once the final destination-side
             # during _handle_post_task_actions once the final destination-side
             # update task will be completed.
             # update task will be completed.
-            inst_info_copy.update(updated_properties)
+            inst_info_copy.update({
+                key: updated_properties[key]
+                for key in updated_properties
+                if key != "destination_environment"})
+            # NOTE: the API service labels the target-env as the
+            # "destination_environment":
+            if "destination_environment" in updated_properties:
+                inst_info_copy["target_environment"] = updated_properties[
+                    "destination_environment"]
             replica.info[instance] = inst_info_copy
             replica.info[instance] = inst_info_copy
 
 
             LOG.debug(
             LOG.debug(
@@ -2173,10 +2221,18 @@ class ConductorServerEndpoint(object):
                     update_source_replica_task.id])
                     update_source_replica_task.id])
 
 
         self._check_execution_tasks_sanity(execution, replica.info)
         self._check_execution_tasks_sanity(execution, replica.info)
+
+        # update the action info for all of the instances in the Replica:
+        for instance in execution.action.instances:
+            db_api.update_transfer_action_info_for_instance(
+                ctxt, replica.id, instance, replica.info[instance])
+
         db_api.add_replica_tasks_execution(ctxt, execution)
         db_api.add_replica_tasks_execution(ctxt, execution)
         LOG.debug("Execution for Replica update tasks created: %s",
         LOG.debug("Execution for Replica update tasks created: %s",
                   execution.id)
                   execution.id)
+
         self._begin_tasks(ctxt, execution, task_info=replica.info)
         self._begin_tasks(ctxt, execution, task_info=replica.info)
+
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
         return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
 
 
     def get_diagnostics(self, ctxt):
     def get_diagnostics(self, ctxt):

+ 25 - 4
coriolis/db/api.py

@@ -588,10 +588,31 @@ def update_replica(context, replica_id, updated_values):
     replica = get_replica(context, replica_id)
     replica = get_replica(context, replica_id)
     if not replica:
     if not replica:
         raise exception.NotFound("Replica not found")
         raise exception.NotFound("Replica not found")
-    for n in ["source_environment", "destination_environment", "notes",
-              "network_map", "storage_mappings"]:
-        if n in updated_values:
-            setattr(replica, n, updated_values[n])
+
+    mapped_info_fields = {
+        'destination_environment': 'target_environment'}
+
+    updateable_fields = [
+        "source_environment", "destination_environment", "notes",
+        "network_map", "storage_mappings"]
+    for field in updateable_fields:
+        if mapped_info_fields.get(field, field) in updated_values:
+            LOG.debug(
+                "Updating the '%s' field of Replica '%s' to: '%s'",
+                field, replica_id, updated_values[
+                    mapped_info_fields.get(field, field)])
+            setattr(
+                replica, field,
+                updated_values[mapped_info_fields.get(field, field)])
+
+    non_updateable_fields = set(
+        updated_values.keys()).difference({
+            mapped_info_fields.get(field, field)
+            for field in updateable_fields})
+    if non_updateable_fields:
+        LOG.warn(
+            "The following Replica fields can NOT be updated: %s",
+            non_updateable_fields)
 
 
     # the oslo_db library uses this method for both the `created_at` and
     # the oslo_db library uses this method for both the `created_at` and
     # `updated_at` fields
     # `updated_at` fields

+ 24 - 16
coriolis/tasks/replica_tasks.py

@@ -755,17 +755,24 @@ class UpdateSourceReplicaTask(base.TaskRunner):
 
 
     @property
     @property
     def returned_task_info_properties(self):
     def returned_task_info_properties(self):
-        return ["volumes_info"]
+        return ["volumes_info", "source_environment"]
 
 
     def _run(self, ctxt, instance, origin, destination, task_info,
     def _run(self, ctxt, instance, origin, destination, task_info,
              event_handler):
              event_handler):
         event_manager = events.EventManager(event_handler)
         event_manager = events.EventManager(event_handler)
-        new_source_env = task_info.get('source_environment', {})
+
         volumes_info = task_info.get("volumes_info", [])
         volumes_info = task_info.get("volumes_info", [])
+        new_source_env = task_info.get('source_environment', {})
+        # NOTE: the `source_environment` in the `origin` is the one set
+        # in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_source_env = origin.get('source_environment')
         if not new_source_env:
         if not new_source_env:
             event_manager.progress_update(
             event_manager.progress_update(
                 "No new source environment options provided")
                 "No new source environment options provided")
-            return {'volumes_info': volumes_info}
+            return {
+                'volumes_info': volumes_info,
+                'source_environment': old_source_env}
 
 
         source_provider = providers_factory.get_provider(
         source_provider = providers_factory.get_provider(
             origin["type"], constants.PROVIDER_TYPE_SOURCE_REPLICA_UPDATE,
             origin["type"], constants.PROVIDER_TYPE_SOURCE_REPLICA_UPDATE,
@@ -778,10 +785,6 @@ class UpdateSourceReplicaTask(base.TaskRunner):
         origin_connection_info = base.get_connection_info(ctxt, origin)
         origin_connection_info = base.get_connection_info(ctxt, origin)
 
 
         LOG.info("Checking source provider environment params")
         LOG.info("Checking source provider environment params")
-        # NOTE: the `source_environment` in the `origin` is the one set
-        # in the dedicated DB column of the Replica and thus stores
-        # the previous value of it:
-        old_source_env = origin.get('source_environment')
         volumes_info = (
         volumes_info = (
             source_provider.check_update_source_environment_params(
             source_provider.check_update_source_environment_params(
                 ctxt, origin_connection_info, instance, volumes_info,
                 ctxt, origin_connection_info, instance, volumes_info,
@@ -791,7 +794,8 @@ class UpdateSourceReplicaTask(base.TaskRunner):
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
 
 
         return {
         return {
-            "volumes_info": volumes_info}
+            "volumes_info": volumes_info,
+            "source_environment": new_source_env}
 
 
 
 
 class UpdateDestinationReplicaTask(base.TaskRunner):
 class UpdateDestinationReplicaTask(base.TaskRunner):
@@ -802,17 +806,24 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
 
 
     @property
     @property
     def returned_task_info_properties(self):
     def returned_task_info_properties(self):
-        return ["volumes_info"]
+        return ["volumes_info", "target_environment"]
 
 
     def _run(self, ctxt, instance, origin, destination, task_info,
     def _run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
             event_handler):
         event_manager = events.EventManager(event_handler)
         event_manager = events.EventManager(event_handler)
-        new_destination_env = task_info.get('target_environment', {})
+
         volumes_info = task_info.get("volumes_info", [])
         volumes_info = task_info.get("volumes_info", [])
+        new_destination_env = task_info.get('target_environment', {})
+        # NOTE: the `target_environment` in the `destination` is the one
+        # set in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_destination_env = destination.get('target_environment', {})
         if not new_destination_env:
         if not new_destination_env:
             event_manager.progress_update(
             event_manager.progress_update(
                 "No new destination environment options provided")
                 "No new destination environment options provided")
-            return {"volumes_info": volumes_info}
+            return {
+                "target_environment": old_destination_env,
+                "volumes_info": volumes_info}
 
 
         destination_provider = providers_factory.get_provider(
         destination_provider = providers_factory.get_provider(
             destination["type"],
             destination["type"],
@@ -828,10 +839,6 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
         export_info = task_info.get("export_info", {})
         export_info = task_info.get("export_info", {})
 
 
         LOG.info("Checking destination provider environment params")
         LOG.info("Checking destination provider environment params")
-        # NOTE: the `target_environment` in the `destination` is the one
-        # set in the dedicated DB column of the Replica and thus stores
-        # the previous value of it:
-        old_destination_env = destination.get('target_environment', {})
         volumes_info = (
         volumes_info = (
             destination_provider.check_update_destination_environment_params(
             destination_provider.check_update_destination_environment_params(
                 ctxt, destination_connection_info, export_info, volumes_info,
                 ctxt, destination_connection_info, export_info, volumes_info,
@@ -844,4 +851,5 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
                 export_info, volumes_info)
                 export_info, volumes_info)
 
 
         return {
         return {
-            "volumes_info": volumes_info}
+            "volumes_info": volumes_info,
+            "target_environment": new_destination_env}